Coverage Report

Created: 2026-04-15 19:01

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/spill/spill_file_manager.h
Line
Count
Source
1
2
// Licensed to the Apache Software Foundation (ASF) under one
3
// or more contributor license agreements.  See the NOTICE file
4
// distributed with this work for additional information
5
// regarding copyright ownership.  The ASF licenses this file
6
// to you under the Apache License, Version 2.0 (the
7
// "License"); you may not use this file except in compliance
8
// with the License.  You may obtain a copy of the License at
9
//
10
//   http://www.apache.org/licenses/LICENSE-2.0
11
//
12
// Unless required by applicable law or agreed to in writing,
13
// software distributed under the License is distributed on an
14
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
// KIND, either express or implied.  See the License for the
16
// specific language governing permissions and limitations
17
// under the License.
18
19
#pragma once
20
#include <atomic>
21
#include <memory>
22
#include <mutex>
23
#include <unordered_map>
24
#include <vector>
25
26
#include "common/metrics/metrics.h"
27
#include "exec/spill/spill_file.h"
28
#include "storage/options.h"
29
#include "util/threadpool.h"
30
31
namespace doris {
32
class RuntimeProfile;
33
template <typename T>
34
class AtomicCounter;
35
using IntAtomicCounter = AtomicCounter<int64_t>;
36
template <typename T>
37
class AtomicGauge;
38
using UIntGauge = AtomicGauge<uint64_t>;
39
class MetricEntity;
40
struct MetricPrototype;
41
42
class SpillFileManager;
43
class SpillDataDir {
44
public:
45
    SpillDataDir(std::string path, int64_t capacity_bytes,
46
                 TStorageMedium::type storage_medium = TStorageMedium::HDD);
47
48
    Status init();
49
50
154
    const std::string& path() const { return _path; }
51
52
    std::string get_spill_data_path(const std::string& query_id = "") const;
53
54
    std::string get_spill_data_gc_path(const std::string& sub_dir_name = "") const;
55
56
652
    TStorageMedium::type storage_medium() const { return _storage_medium; }
57
58
    // check if the capacity reach the limit after adding the incoming data
59
    // return true if limit reached, otherwise, return false.
60
    bool reach_capacity_limit(int64_t incoming_data_size);
61
62
    Status update_capacity();
63
64
1.13k
    void update_spill_data_usage(int64_t incoming_data_size) {
65
1.13k
        std::lock_guard<std::mutex> l(_mutex);
66
1.13k
        _spill_data_bytes += incoming_data_size;
67
1.13k
        spill_disk_data_size->set_value(_spill_data_bytes);
68
1.13k
    }
69
70
2
    int64_t get_spill_data_bytes() {
71
2
        std::lock_guard<std::mutex> l(_mutex);
72
2
        return _spill_data_bytes;
73
2
    }
74
75
0
    int64_t get_spill_data_limit() {
76
0
        std::lock_guard<std::mutex> l(_mutex);
77
0
        return _spill_data_limit_bytes;
78
0
    }
79
80
    std::string debug_string();
81
82
private:
83
    bool _reach_disk_capacity_limit(int64_t incoming_data_size);
84
1.09k
    double _get_disk_usage(int64_t incoming_data_size) const {
85
1.09k
        return _disk_capacity_bytes == 0
86
1.09k
                       ? 0
87
1.09k
                       : (double)(_disk_capacity_bytes - _available_bytes + incoming_data_size) /
88
1.09k
                                 (double)_disk_capacity_bytes;
89
1.09k
    }
90
91
    friend class SpillFileManager;
92
    std::string _path;
93
94
    // protect _disk_capacity_bytes, _available_bytes, _spill_data_limit_bytes, _spill_data_bytes
95
    std::mutex _mutex;
96
    // the actual capacity of the disk of this data dir
97
    size_t _disk_capacity_bytes;
98
    int64_t _spill_data_limit_bytes = 0;
99
    // the actual available capacity of the disk of this data dir
100
    size_t _available_bytes = 0;
101
    int64_t _spill_data_bytes = 0;
102
    TStorageMedium::type _storage_medium;
103
104
    std::shared_ptr<MetricEntity> spill_data_dir_metric_entity;
105
    IntGauge* spill_disk_capacity = nullptr;
106
    IntGauge* spill_disk_limit = nullptr;
107
    IntGauge* spill_disk_avail_capacity = nullptr;
108
    IntGauge* spill_disk_data_size = nullptr;
109
    // for test
110
    IntGauge* spill_disk_has_spill_data = nullptr;
111
    IntGauge* spill_disk_has_spill_gc_data = nullptr;
112
};
113
class SpillFileManager {
114
public:
115
    ~SpillFileManager();
116
    SpillFileManager(
117
            std::unordered_map<std::string, std::unique_ptr<SpillDataDir>>&& spill_store_map);
118
119
    Status init();
120
121
154
    void stop() {
122
154
        _stop_background_threads_latch.count_down();
123
154
        if (_spill_gc_thread) {
124
154
            _spill_gc_thread->join();
125
154
        }
126
154
    }
127
128
    // Create SpillFile and register it
129
    // @param relative_path  Operator-formatted path under the spill root,
130
    //                       e.g. "query_id/sort-node_id-task_id-unique_id"
131
    Status create_spill_file(const std::string& relative_path, SpillFileSPtr& spill_file);
132
133
    /// Get a unique ID for constructing spill file paths.
134
292
    uint64_t next_id() { return id_++; }
135
136
    // Mark SpillFile for deletion; asynchronously delete spill files in the GC thread
137
    void delete_spill_file(SpillFileSPtr spill_file);
138
139
    void gc(int32_t max_work_time_ms);
140
141
700
    void update_spill_write_bytes(int64_t bytes) { _spill_write_bytes_counter->increment(bytes); }
142
143
331
    void update_spill_read_bytes(int64_t bytes) { _spill_read_bytes_counter->increment(bytes); }
144
145
private:
146
    void _init_metrics();
147
    Status _init_spill_store_map();
148
    void _spill_gc_thread_callback();
149
    std::vector<SpillDataDir*> _get_stores_for_spill(TStorageMedium::type storage_medium);
150
151
    std::unordered_map<std::string, std::unique_ptr<SpillDataDir>> _spill_store_map;
152
153
    CountDownLatch _stop_background_threads_latch;
154
    std::shared_ptr<Thread> _spill_gc_thread;
155
156
    std::atomic_uint64_t id_ = 0;
157
158
    std::shared_ptr<MetricEntity> _entity {nullptr};
159
160
    std::unique_ptr<doris::MetricPrototype> _spill_write_bytes_metric {nullptr};
161
    std::unique_ptr<doris::MetricPrototype> _spill_read_bytes_metric {nullptr};
162
163
    IntAtomicCounter* _spill_write_bytes_counter {nullptr};
164
    IntAtomicCounter* _spill_read_bytes_counter {nullptr};
165
};
166
} // namespace doris