Coverage Report

Created: 2026-03-19 11:17

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/spill/spill_file_manager.h
Line
Count
Source
1
2
// Licensed to the Apache Software Foundation (ASF) under one
3
// or more contributor license agreements.  See the NOTICE file
4
// distributed with this work for additional information
5
// regarding copyright ownership.  The ASF licenses this file
6
// to you under the Apache License, Version 2.0 (the
7
// "License"); you may not use this file except in compliance
8
// with the License.  You may obtain a copy of the License at
9
//
10
//   http://www.apache.org/licenses/LICENSE-2.0
11
//
12
// Unless required by applicable law or agreed to in writing,
13
// software distributed under the License is distributed on an
14
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
// KIND, either express or implied.  See the License for the
16
// specific language governing permissions and limitations
17
// under the License.
18
19
#pragma once
20
#include <atomic>
21
#include <memory>
22
#include <mutex>
23
#include <unordered_map>
24
#include <vector>
25
26
#include "common/metrics/metrics.h"
27
#include "exec/spill/spill_file.h"
28
#include "storage/options.h"
29
#include "util/threadpool.h"
30
31
namespace doris {
32
#include "common/compile_check_begin.h"
33
class RuntimeProfile;
34
template <typename T>
35
class AtomicCounter;
36
using IntAtomicCounter = AtomicCounter<int64_t>;
37
template <typename T>
38
class AtomicGauge;
39
using UIntGauge = AtomicGauge<uint64_t>;
40
class MetricEntity;
41
struct MetricPrototype;
42
43
class SpillFileManager;
44
class SpillDataDir {
45
public:
46
    SpillDataDir(std::string path, int64_t capacity_bytes,
47
                 TStorageMedium::type storage_medium = TStorageMedium::HDD);
48
49
    Status init();
50
51
152
    const std::string& path() const { return _path; }
52
53
    std::string get_spill_data_path(const std::string& query_id = "") const;
54
55
    std::string get_spill_data_gc_path(const std::string& sub_dir_name = "") const;
56
57
662
    TStorageMedium::type storage_medium() const { return _storage_medium; }
58
59
    // check if the capacity reach the limit after adding the incoming data
60
    // return true if limit reached, otherwise, return false.
61
    bool reach_capacity_limit(int64_t incoming_data_size);
62
63
    Status update_capacity();
64
65
1.18k
    void update_spill_data_usage(int64_t incoming_data_size) {
66
1.18k
        std::lock_guard<std::mutex> l(_mutex);
67
1.18k
        _spill_data_bytes += incoming_data_size;
68
1.18k
        spill_disk_data_size->set_value(_spill_data_bytes);
69
1.18k
    }
70
71
2
    int64_t get_spill_data_bytes() {
72
2
        std::lock_guard<std::mutex> l(_mutex);
73
2
        return _spill_data_bytes;
74
2
    }
75
76
0
    int64_t get_spill_data_limit() {
77
0
        std::lock_guard<std::mutex> l(_mutex);
78
0
        return _spill_data_limit_bytes;
79
0
    }
80
81
    std::string debug_string();
82
83
private:
84
    bool _reach_disk_capacity_limit(int64_t incoming_data_size);
85
1.14k
    double _get_disk_usage(int64_t incoming_data_size) const {
86
1.14k
        return _disk_capacity_bytes == 0
87
1.14k
                       ? 0
88
1.14k
                       : (double)(_disk_capacity_bytes - _available_bytes + incoming_data_size) /
89
1.14k
                                 (double)_disk_capacity_bytes;
90
1.14k
    }
91
92
    friend class SpillFileManager;
93
    std::string _path;
94
95
    // protect _disk_capacity_bytes, _available_bytes, _spill_data_limit_bytes, _spill_data_bytes
96
    std::mutex _mutex;
97
    // the actual capacity of the disk of this data dir
98
    size_t _disk_capacity_bytes;
99
    int64_t _spill_data_limit_bytes = 0;
100
    // the actual available capacity of the disk of this data dir
101
    size_t _available_bytes = 0;
102
    int64_t _spill_data_bytes = 0;
103
    TStorageMedium::type _storage_medium;
104
105
    std::shared_ptr<MetricEntity> spill_data_dir_metric_entity;
106
    IntGauge* spill_disk_capacity = nullptr;
107
    IntGauge* spill_disk_limit = nullptr;
108
    IntGauge* spill_disk_avail_capacity = nullptr;
109
    IntGauge* spill_disk_data_size = nullptr;
110
    // for test
111
    IntGauge* spill_disk_has_spill_data = nullptr;
112
    IntGauge* spill_disk_has_spill_gc_data = nullptr;
113
};
114
class SpillFileManager {
115
public:
116
    ~SpillFileManager();
117
    SpillFileManager(
118
            std::unordered_map<std::string, std::unique_ptr<SpillDataDir>>&& spill_store_map);
119
120
    Status init();
121
122
155
    void stop() {
123
155
        _stop_background_threads_latch.count_down();
124
155
        if (_spill_gc_thread) {
125
155
            _spill_gc_thread->join();
126
155
        }
127
155
    }
128
129
    // Create SpillFile and register it
130
    // @param relative_path  Operator-formatted path under the spill root,
131
    //                       e.g. "query_id/sort-node_id-task_id-unique_id"
132
    Status create_spill_file(const std::string& relative_path, SpillFileSPtr& spill_file);
133
134
    /// Get a unique ID for constructing spill file paths.
135
300
    uint64_t next_id() { return id_++; }
136
137
    // Mark SpillFile for deletion; asynchronously delete spill files in the GC thread
138
    void delete_spill_file(SpillFileSPtr spill_file);
139
140
    void gc(int32_t max_work_time_ms);
141
142
746
    void update_spill_write_bytes(int64_t bytes) { _spill_write_bytes_counter->increment(bytes); }
143
144
367
    void update_spill_read_bytes(int64_t bytes) { _spill_read_bytes_counter->increment(bytes); }
145
146
private:
147
    void _init_metrics();
148
    Status _init_spill_store_map();
149
    void _spill_gc_thread_callback();
150
    std::vector<SpillDataDir*> _get_stores_for_spill(TStorageMedium::type storage_medium);
151
152
    std::unordered_map<std::string, std::unique_ptr<SpillDataDir>> _spill_store_map;
153
154
    CountDownLatch _stop_background_threads_latch;
155
    std::shared_ptr<Thread> _spill_gc_thread;
156
157
    std::atomic_uint64_t id_ = 0;
158
159
    std::shared_ptr<MetricEntity> _entity {nullptr};
160
161
    std::unique_ptr<doris::MetricPrototype> _spill_write_bytes_metric {nullptr};
162
    std::unique_ptr<doris::MetricPrototype> _spill_read_bytes_metric {nullptr};
163
164
    IntAtomicCounter* _spill_write_bytes_counter {nullptr};
165
    IntAtomicCounter* _spill_read_bytes_counter {nullptr};
166
};
167
} // namespace doris
168
#include "common/compile_check_end.h"