Coverage Report

Created: 2026-03-12 17:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/spill/spill_stream_manager.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/spill/spill_stream_manager.h"
19
20
#include <fmt/format.h>
21
#include <glog/logging.h>
22
23
#include <algorithm>
24
#include <filesystem>
25
#include <memory>
26
#include <numeric>
27
#include <random>
28
#include <string>
29
30
#include "common/logging.h"
31
#include "common/metrics/doris_metrics.h"
32
#include "exec/spill/spill_stream.h"
33
#include "io/fs/file_system.h"
34
#include "io/fs/local_file_system.h"
35
#include "runtime/runtime_profile.h"
36
#include "runtime/runtime_state.h"
37
#include "storage/olap_define.h"
38
#include "util/parse_util.h"
39
#include "util/pretty_printer.h"
40
#include "util/time.h"
41
#include "util/uid_util.h"
42
43
namespace doris {
44
#include "common/compile_check_begin.h"
45
46
60
SpillStreamManager::~SpillStreamManager() {
47
60
    DorisMetrics::instance()->metric_registry()->deregister_entity(_entity);
48
60
}
49
SpillStreamManager::SpillStreamManager(
50
        std::unordered_map<std::string, std::unique_ptr<SpillDataDir>>&& spill_store_map)
51
75
        : _spill_store_map(std::move(spill_store_map)), _stop_background_threads_latch(1) {}
52
53
75
Status SpillStreamManager::init() {
54
75
    LOG(INFO) << "init spill stream manager";
55
75
    RETURN_IF_ERROR(_init_spill_store_map());
56
57
79
    for (const auto& [path, store] : _spill_store_map) {
58
79
        auto gc_dir_root_dir = store->get_spill_data_gc_path();
59
79
        bool exists = true;
60
79
        RETURN_IF_ERROR(io::global_local_filesystem()->exists(gc_dir_root_dir, &exists));
61
79
        if (!exists) {
62
15
            RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(gc_dir_root_dir));
63
15
        }
64
65
79
        auto spill_dir = store->get_spill_data_path();
66
79
        RETURN_IF_ERROR(io::global_local_filesystem()->exists(spill_dir, &exists));
67
79
        if (!exists) {
68
15
            RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(spill_dir));
69
64
        } else {
70
64
            auto suffix = ToStringFromUnixMillis(UnixMillis());
71
64
            auto gc_dir = store->get_spill_data_gc_path(suffix);
72
64
            if (std::filesystem::exists(gc_dir)) {
73
0
                LOG(WARNING) << "gc dir already exists: " << gc_dir;
74
0
            }
75
64
            (void)io::global_local_filesystem()->rename(spill_dir, gc_dir);
76
64
            RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(spill_dir));
77
64
        }
78
79
    }
79
80
75
    RETURN_IF_ERROR(Thread::create(
81
75
            "Spill", "spill_gc_thread", [this]() { this->_spill_gc_thread_callback(); },
82
75
            &_spill_gc_thread));
83
75
    LOG(INFO) << "spill gc thread started";
84
85
75
    _init_metrics();
86
87
75
    return Status::OK();
88
75
}
89
90
75
void SpillStreamManager::_init_metrics() {
91
75
    _entity = DorisMetrics::instance()->metric_registry()->register_entity("spill",
92
75
                                                                           {{"name", "spill"}});
93
94
75
    _spill_write_bytes_metric = std::make_unique<doris::MetricPrototype>(
95
75
            doris::MetricType::COUNTER, doris::MetricUnit::BYTES, "spill_write_bytes");
96
75
    _spill_write_bytes_counter = (IntAtomicCounter*)(_entity->register_metric<IntAtomicCounter>(
97
75
            _spill_write_bytes_metric.get()));
98
99
75
    _spill_read_bytes_metric = std::make_unique<doris::MetricPrototype>(
100
75
            doris::MetricType::COUNTER, doris::MetricUnit::BYTES, "spill_read_bytes");
101
75
    _spill_read_bytes_counter = (IntAtomicCounter*)(_entity->register_metric<IntAtomicCounter>(
102
75
            _spill_read_bytes_metric.get()));
103
75
}
104
105
// clean up stale spilled files
106
75
void SpillStreamManager::_spill_gc_thread_callback() {
107
14.7k
    while (!_stop_background_threads_latch.wait_for(
108
14.7k
            std::chrono::milliseconds(config::spill_gc_interval_ms))) {
109
14.6k
        gc(config::spill_gc_work_time_ms);
110
14.9k
        for (auto& [path, dir] : _spill_store_map) {
111
14.9k
            static_cast<void>(dir->update_capacity());
112
14.9k
        }
113
14.6k
    }
114
75
}
115
116
75
Status SpillStreamManager::_init_spill_store_map() {
117
79
    for (const auto& store : _spill_store_map) {
118
79
        RETURN_IF_ERROR(store.second->init());
119
79
    }
120
121
75
    return Status::OK();
122
75
}
123
124
std::vector<SpillDataDir*> SpillStreamManager::_get_stores_for_spill(
125
2.58k
        TStorageMedium::type storage_medium) {
126
2.58k
    std::vector<std::pair<SpillDataDir*, double>> stores_with_usage;
127
4.91k
    for (auto& [_, store] : _spill_store_map) {
128
4.91k
        if (store->storage_medium() == storage_medium && !store->reach_capacity_limit(0)) {
129
2.45k
            stores_with_usage.emplace_back(store.get(), store->_get_disk_usage(0));
130
2.45k
        }
131
4.91k
    }
132
2.58k
    if (stores_with_usage.empty()) {
133
131
        return {};
134
131
    }
135
136
2.45k
    std::sort(stores_with_usage.begin(), stores_with_usage.end(),
137
2.45k
              [](auto&& a, auto&& b) { return a.second < b.second; });
138
139
2.45k
    std::vector<SpillDataDir*> stores;
140
2.45k
    for (const auto& [store, _] : stores_with_usage) {
141
2.45k
        stores.emplace_back(store);
142
2.45k
    }
143
2.45k
    return stores;
144
2.58k
}
145
146
Status SpillStreamManager::register_spill_stream(RuntimeState* state, SpillStreamSPtr& spill_stream,
147
                                                 const std::string& query_id,
148
                                                 const std::string& operator_name, int32_t node_id,
149
                                                 int32_t batch_rows, size_t batch_bytes,
150
2.45k
                                                 RuntimeProfile* operator_profile) {
151
2.45k
    auto data_dirs = _get_stores_for_spill(TStorageMedium::type::SSD);
152
2.45k
    if (data_dirs.empty()) {
153
131
        data_dirs = _get_stores_for_spill(TStorageMedium::type::HDD);
154
131
    }
155
2.45k
    if (data_dirs.empty()) {
156
0
        return Status::Error<ErrorCode::NO_AVAILABLE_ROOT_PATH>(
157
0
                "no available disk can be used for spill.");
158
0
    }
159
160
2.45k
    uint64_t id = id_++;
161
2.45k
    std::string spill_dir;
162
2.45k
    SpillDataDir* data_dir = nullptr;
163
2.45k
    for (auto& dir : data_dirs) {
164
2.45k
        std::string spill_root_dir = dir->get_spill_data_path();
165
        // storage_root/spill/query_id/partitioned_hash_join-node_id-task_id-stream_id
166
2.45k
        spill_dir = fmt::format("{}/{}/{}-{}-{}-{}", spill_root_dir, query_id, operator_name,
167
2.45k
                                node_id, state->task_id(), id);
168
2.45k
        auto st = io::global_local_filesystem()->create_directory(spill_dir);
169
2.45k
        if (!st.ok()) {
170
0
            std::cerr << "create spill dir failed: " << st.to_string();
171
0
            continue;
172
0
        }
173
2.45k
        data_dir = dir;
174
2.45k
        break;
175
2.45k
    }
176
2.45k
    if (!data_dir) {
177
0
        return Status::Error<ErrorCode::CE_CMD_PARAMS_ERROR>(
178
0
                "there is no available disk that can be used to spill.");
179
0
    }
180
2.45k
    spill_stream = std::make_shared<SpillStream>(state, id, data_dir, spill_dir, batch_rows,
181
2.45k
                                                 batch_bytes, operator_profile);
182
2.45k
    RETURN_IF_ERROR(spill_stream->prepare());
183
2.45k
    return Status::OK();
184
2.45k
}
185
186
2.36k
void SpillStreamManager::delete_spill_stream(SpillStreamSPtr stream) {
187
2.36k
    stream->gc();
188
2.36k
}
189
190
14.6k
void SpillStreamManager::gc(int32_t max_work_time_ms) {
191
14.6k
    bool exists = true;
192
14.6k
    bool has_work = false;
193
14.6k
    int64_t max_work_time_ns = max_work_time_ms * 1000L * 1000L;
194
14.6k
    MonotonicStopWatch watch;
195
14.6k
    watch.start();
196
14.6k
    Defer defer {[&]() {
197
14.5k
        if (has_work) {
198
77
            std::string msg(
199
77
                    fmt::format("spill gc time: {}",
200
77
                                PrettyPrinter::print(watch.elapsed_time(), TUnit::TIME_NS)));
201
77
            msg += ", spill storage:\n";
202
145
            for (const auto& [path, store_dir] : _spill_store_map) {
203
145
                msg += "    " + store_dir->debug_string();
204
145
                msg += "\n";
205
145
            }
206
77
            LOG(INFO) << msg;
207
77
        }
208
14.5k
    }};
209
14.9k
    for (const auto& [path, store_dir] : _spill_store_map) {
210
14.9k
        std::string gc_root_dir = store_dir->get_spill_data_gc_path();
211
212
14.9k
        std::error_code ec;
213
14.9k
        exists = std::filesystem::exists(gc_root_dir, ec);
214
14.9k
        if (ec || !exists) {
215
8.17k
            continue;
216
8.17k
        }
217
        // dirs of queries
218
6.76k
        std::vector<io::FileInfo> dirs;
219
6.76k
        auto st = io::global_local_filesystem()->list(gc_root_dir, false, &dirs, &exists);
220
6.76k
        if (!st.ok()) {
221
0
            continue;
222
0
        }
223
224
20.7k
        for (const auto& dir : dirs) {
225
20.7k
            has_work = true;
226
20.7k
            if (dir.is_file) {
227
0
                continue;
228
0
            }
229
20.7k
            std::string abs_dir = fmt::format("{}/{}", gc_root_dir, dir.file_name);
230
            // operator spill sub dirs of a query
231
20.7k
            std::vector<io::FileInfo> files;
232
20.7k
            st = io::global_local_filesystem()->list(abs_dir, false, &files, &exists);
233
20.7k
            if (!st.ok()) {
234
0
                continue;
235
0
            }
236
20.7k
            if (files.empty()) {
237
284
                static_cast<void>(io::global_local_filesystem()->delete_directory(abs_dir));
238
284
                continue;
239
284
            }
240
241
38.0k
            for (const auto& file : files) {
242
38.0k
                auto abs_file_path = fmt::format("{}/{}", abs_dir, file.file_name);
243
38.0k
                if (file.is_file) {
244
0
                    static_cast<void>(io::global_local_filesystem()->delete_file(abs_file_path));
245
38.0k
                } else {
246
38.0k
                    static_cast<void>(
247
38.0k
                            io::global_local_filesystem()->delete_directory(abs_file_path));
248
38.0k
                }
249
38.0k
                if (watch.elapsed_time() > max_work_time_ns) {
250
20.3k
                    break;
251
20.3k
                }
252
38.0k
            }
253
20.5k
        }
254
6.76k
    }
255
14.6k
}
256
257
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(spill_disk_capacity, MetricUnit::BYTES);
258
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(spill_disk_limit, MetricUnit::BYTES);
259
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(spill_disk_avail_capacity, MetricUnit::BYTES);
260
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(spill_disk_data_size, MetricUnit::BYTES);
261
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(spill_disk_has_spill_data, MetricUnit::BYTES);
262
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(spill_disk_has_spill_gc_data, MetricUnit::BYTES);
263
264
SpillDataDir::SpillDataDir(std::string path, int64_t capacity_bytes,
265
                           TStorageMedium::type storage_medium)
266
79
        : _path(std::move(path)),
267
79
          _disk_capacity_bytes(capacity_bytes),
268
79
          _storage_medium(storage_medium) {
269
79
    spill_data_dir_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(
270
79
            std::string("spill_data_dir.") + _path, {{"path", _path + "/" + SPILL_DIR_PREFIX}});
271
79
    INT_GAUGE_METRIC_REGISTER(spill_data_dir_metric_entity, spill_disk_capacity);
272
79
    INT_GAUGE_METRIC_REGISTER(spill_data_dir_metric_entity, spill_disk_limit);
273
79
    INT_GAUGE_METRIC_REGISTER(spill_data_dir_metric_entity, spill_disk_avail_capacity);
274
79
    INT_GAUGE_METRIC_REGISTER(spill_data_dir_metric_entity, spill_disk_data_size);
275
79
    INT_GAUGE_METRIC_REGISTER(spill_data_dir_metric_entity, spill_disk_has_spill_data);
276
79
    INT_GAUGE_METRIC_REGISTER(spill_data_dir_metric_entity, spill_disk_has_spill_gc_data);
277
79
}
278
279
13.3k
bool is_directory_empty(const std::filesystem::path& dir) {
280
13.3k
    try {
281
13.3k
        return std::filesystem::is_directory(dir) &&
282
13.3k
               std::filesystem::directory_iterator(dir) ==
283
13.3k
                       std::filesystem::end(std::filesystem::directory_iterator {});
284
        // this method is not thread safe, the file referenced by directory_iterator
285
        // maybe moved to spill_gc dir during this function call, so need to catch expection
286
13.3k
    } catch (const std::filesystem::filesystem_error&) {
287
0
        return true;
288
0
    }
289
13.3k
}
290
291
79
Status SpillDataDir::init() {
292
79
    bool exists = false;
293
79
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(_path, &exists));
294
79
    if (!exists) {
295
0
        RETURN_NOT_OK_STATUS_WITH_WARN(Status::IOError("opendir failed, path={}", _path),
296
0
                                       "check file exist failed");
297
0
    }
298
79
    RETURN_IF_ERROR(update_capacity());
299
79
    LOG(INFO) << fmt::format(
300
79
            "spill storage path: {}, capacity: {}, limit: {}, available: "
301
79
            "{}",
302
79
            _path, PrettyPrinter::print_bytes(_disk_capacity_bytes),
303
79
            PrettyPrinter::print_bytes(_spill_data_limit_bytes),
304
79
            PrettyPrinter::print_bytes(_available_bytes));
305
79
    return Status::OK();
306
79
}
307
308
9.20k
std::string SpillDataDir::get_spill_data_path(const std::string& query_id) const {
309
9.20k
    auto dir = fmt::format("{}/{}", _path, SPILL_DIR_PREFIX);
310
9.20k
    if (!query_id.empty()) {
311
0
        dir = fmt::format("{}/{}", dir, query_id);
312
0
    }
313
9.20k
    return dir;
314
9.20k
}
315
316
24.1k
std::string SpillDataDir::get_spill_data_gc_path(const std::string& sub_dir_name) const {
317
24.1k
    auto dir = fmt::format("{}/{}", _path, SPILL_GC_DIR_PREFIX);
318
24.1k
    if (!sub_dir_name.empty()) {
319
2.50k
        dir = fmt::format("{}/{}", dir, sub_dir_name);
320
2.50k
    }
321
24.1k
    return dir;
322
24.1k
}
323
324
15.0k
Status SpillDataDir::update_capacity() {
325
15.0k
    std::lock_guard<std::mutex> l(_mutex);
326
15.0k
    RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info(_path, &_disk_capacity_bytes,
327
15.0k
                                                                  &_available_bytes));
328
6.83k
    spill_disk_capacity->set_value(_disk_capacity_bytes);
329
6.83k
    spill_disk_avail_capacity->set_value(_available_bytes);
330
6.83k
    auto disk_use_max_bytes =
331
6.83k
            (int64_t)(_disk_capacity_bytes * config::storage_flood_stage_usage_percent / 100);
332
6.83k
    bool is_percent = true;
333
6.83k
    _spill_data_limit_bytes = ParseUtil::parse_mem_spec(config::spill_storage_limit, -1,
334
6.83k
                                                        _disk_capacity_bytes, &is_percent);
335
6.83k
    if (_spill_data_limit_bytes <= 0) {
336
0
        spill_disk_limit->set_value(_spill_data_limit_bytes);
337
0
        auto err_msg = fmt::format("Failed to parse spill storage limit from '{}'",
338
0
                                   config::spill_storage_limit);
339
0
        LOG(WARNING) << err_msg;
340
0
        return Status::InvalidArgument(err_msg);
341
0
    }
342
6.83k
    if (is_percent) {
343
6.67k
        _spill_data_limit_bytes = (int64_t)(_spill_data_limit_bytes *
344
6.67k
                                            config::storage_flood_stage_usage_percent / 100);
345
6.67k
    }
346
6.83k
    if (_spill_data_limit_bytes > disk_use_max_bytes) {
347
0
        _spill_data_limit_bytes = disk_use_max_bytes;
348
0
    }
349
6.83k
    spill_disk_limit->set_value(_spill_data_limit_bytes);
350
351
6.83k
    std::string spill_root_dir = get_spill_data_path();
352
6.83k
    std::string spill_gc_root_dir = get_spill_data_gc_path();
353
6.83k
    spill_disk_has_spill_data->set_value(is_directory_empty(spill_root_dir) ? 0 : 1);
354
6.83k
    spill_disk_has_spill_gc_data->set_value(is_directory_empty(spill_gc_root_dir) ? 0 : 1);
355
356
6.83k
    return Status::OK();
357
6.83k
}
358
359
14.4k
bool SpillDataDir::_reach_disk_capacity_limit(int64_t incoming_data_size) {
360
14.4k
    double used_pct = _get_disk_usage(incoming_data_size);
361
14.4k
    int64_t left_bytes = _available_bytes - incoming_data_size;
362
14.4k
    if (used_pct >= config::storage_flood_stage_usage_percent / 100.0 &&
363
14.4k
        left_bytes <= config::storage_flood_stage_left_capacity_bytes) {
364
0
        LOG(WARNING) << "reach capacity limit. used pct: " << used_pct
365
0
                     << ", left bytes: " << left_bytes << ", path: " << _path;
366
0
        return true;
367
0
    }
368
14.4k
    return false;
369
14.4k
}
370
14.4k
bool SpillDataDir::reach_capacity_limit(int64_t incoming_data_size) {
371
14.4k
    std::lock_guard<std::mutex> l(_mutex);
372
14.4k
    if (_reach_disk_capacity_limit(incoming_data_size)) {
373
0
        return true;
374
0
    }
375
14.4k
    if (_spill_data_bytes + incoming_data_size > _spill_data_limit_bytes) {
376
0
        LOG_EVERY_T(WARNING, 1) << fmt::format(
377
0
                "spill data reach limit, path: {}, capacity: {}, limit: {}, used: {}, available: "
378
0
                "{}, "
379
0
                "incoming "
380
0
                "bytes: {}",
381
0
                _path, PrettyPrinter::print_bytes(_disk_capacity_bytes),
382
0
                PrettyPrinter::print_bytes(_spill_data_limit_bytes),
383
0
                PrettyPrinter::print_bytes(_spill_data_bytes),
384
0
                PrettyPrinter::print_bytes(_available_bytes),
385
0
                PrettyPrinter::print_bytes(incoming_data_size));
386
0
        return true;
387
0
    }
388
14.4k
    return false;
389
14.4k
}
390
145
std::string SpillDataDir::debug_string() {
391
145
    return fmt::format(
392
145
            "path: {}, capacity: {}, limit: {}, used: {}, available: "
393
145
            "{}",
394
145
            _path, PrettyPrinter::print_bytes(_disk_capacity_bytes),
395
145
            PrettyPrinter::print_bytes(_spill_data_limit_bytes),
396
145
            PrettyPrinter::print_bytes(_spill_data_bytes),
397
145
            PrettyPrinter::print_bytes(_available_bytes));
398
145
}
399
} // namespace doris