Coverage Report

Created: 2026-03-15 15:59

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/spill/spill_stream_manager.h
Line
Count
Source
1
2
// Licensed to the Apache Software Foundation (ASF) under one
3
// or more contributor license agreements.  See the NOTICE file
4
// distributed with this work for additional information
5
// regarding copyright ownership.  The ASF licenses this file
6
// to you under the Apache License, Version 2.0 (the
7
// "License"); you may not use this file except in compliance
8
// with the License.  You may obtain a copy of the License at
9
//
10
//   http://www.apache.org/licenses/LICENSE-2.0
11
//
12
// Unless required by applicable law or agreed to in writing,
13
// software distributed under the License is distributed on an
14
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
// KIND, either express or implied.  See the License for the
16
// specific language governing permissions and limitations
17
// under the License.
18
19
#pragma once
20
#include <atomic>
21
#include <memory>
22
#include <mutex>
23
#include <unordered_map>
24
#include <vector>
25
26
#include "common/metrics/metrics.h"
27
#include "exec/spill/spill_stream.h"
28
#include "storage/options.h"
29
#include "util/threadpool.h"
30
namespace doris {
31
#include "common/compile_check_begin.h"
32
class RuntimeProfile;
33
template <typename T>
34
class AtomicCounter;
35
using IntAtomicCounter = AtomicCounter<int64_t>;
36
template <typename T>
37
class AtomicGauge;
38
using UIntGauge = AtomicGauge<uint64_t>;
39
class MetricEntity;
40
struct MetricPrototype;
41
42
class SpillStreamManager;
43
class SpillDataDir {
44
public:
45
    SpillDataDir(std::string path, int64_t capacity_bytes,
46
                 TStorageMedium::type storage_medium = TStorageMedium::HDD);
47
48
    Status init();
49
50
57
    const std::string& path() const { return _path; }
51
52
    std::string get_spill_data_path(const std::string& query_id = "") const;
53
54
    std::string get_spill_data_gc_path(const std::string& sub_dir_name = "") const;
55
56
4.94k
    TStorageMedium::type storage_medium() const { return _storage_medium; }
57
58
    // check if the capacity reach the limit after adding the incoming data
59
    // return true if limit reached, otherwise, return false.
60
    bool reach_capacity_limit(int64_t incoming_data_size);
61
62
    Status update_capacity();
63
64
12.4k
    void update_spill_data_usage(int64_t incoming_data_size) {
65
12.4k
        std::lock_guard<std::mutex> l(_mutex);
66
12.4k
        _spill_data_bytes += incoming_data_size;
67
12.4k
        spill_disk_data_size->set_value(_spill_data_bytes);
68
12.4k
    }
69
70
0
    int64_t get_spill_data_bytes() {
71
0
        std::lock_guard<std::mutex> l(_mutex);
72
0
        return _spill_data_bytes;
73
0
    }
74
75
0
    int64_t get_spill_data_limit() {
76
0
        std::lock_guard<std::mutex> l(_mutex);
77
0
        return _spill_data_limit_bytes;
78
0
    }
79
80
    std::string debug_string();
81
82
private:
83
    bool _reach_disk_capacity_limit(int64_t incoming_data_size);
84
10.1k
    double _get_disk_usage(int64_t incoming_data_size) const {
85
10.1k
        return _disk_capacity_bytes == 0
86
10.1k
                       ? 0
87
10.1k
                       : (double)(_disk_capacity_bytes - _available_bytes + incoming_data_size) /
88
10.1k
                                 (double)_disk_capacity_bytes;
89
10.1k
    }
90
91
    friend class SpillStreamManager;
92
    std::string _path;
93
94
    // protect _disk_capacity_bytes, _available_bytes, _spill_data_limit_bytes, _spill_data_bytes
95
    std::mutex _mutex;
96
    // the actual capacity of the disk of this data dir
97
    size_t _disk_capacity_bytes;
98
    int64_t _spill_data_limit_bytes = 0;
99
    // the actual available capacity of the disk of this data dir
100
    size_t _available_bytes = 0;
101
    int64_t _spill_data_bytes = 0;
102
    TStorageMedium::type _storage_medium;
103
104
    std::shared_ptr<MetricEntity> spill_data_dir_metric_entity;
105
    IntGauge* spill_disk_capacity = nullptr;
106
    IntGauge* spill_disk_limit = nullptr;
107
    IntGauge* spill_disk_avail_capacity = nullptr;
108
    IntGauge* spill_disk_data_size = nullptr;
109
    // for test
110
    IntGauge* spill_disk_has_spill_data = nullptr;
111
    IntGauge* spill_disk_has_spill_gc_data = nullptr;
112
};
113
class SpillStreamManager {
114
public:
115
    ~SpillStreamManager();
116
    SpillStreamManager(
117
            std::unordered_map<std::string, std::unique_ptr<SpillDataDir>>&& spill_store_map);
118
119
    Status init();
120
121
60
    void stop() {
122
60
        _stop_background_threads_latch.count_down();
123
60
        if (_spill_gc_thread) {
124
60
            _spill_gc_thread->join();
125
60
        }
126
60
    }
127
128
    // 创建SpillStream并登记
129
    Status register_spill_stream(RuntimeState* state, SpillStreamSPtr& spill_stream,
130
                                 const std::string& query_id, const std::string& operator_name,
131
                                 int32_t node_id, int32_t batch_rows, size_t batch_bytes,
132
                                 RuntimeProfile* operator_profile);
133
134
    // 标记SpillStream需要被删除,在GC线程中异步删除落盘文件
135
    void delete_spill_stream(SpillStreamSPtr spill_stream);
136
137
    void gc(int32_t max_work_time_ms);
138
139
7.62k
    void update_spill_write_bytes(int64_t bytes) { _spill_write_bytes_counter->increment(bytes); }
140
141
7.52k
    void update_spill_read_bytes(int64_t bytes) { _spill_read_bytes_counter->increment(bytes); }
142
143
private:
144
    void _init_metrics();
145
    Status _init_spill_store_map();
146
    void _spill_gc_thread_callback();
147
    std::vector<SpillDataDir*> _get_stores_for_spill(TStorageMedium::type storage_medium);
148
149
    std::unordered_map<std::string, std::unique_ptr<SpillDataDir>> _spill_store_map;
150
151
    CountDownLatch _stop_background_threads_latch;
152
    std::shared_ptr<Thread> _spill_gc_thread;
153
154
    std::atomic_uint64_t id_ = 0;
155
156
    std::shared_ptr<MetricEntity> _entity {nullptr};
157
158
    std::unique_ptr<doris::MetricPrototype> _spill_write_bytes_metric {nullptr};
159
    std::unique_ptr<doris::MetricPrototype> _spill_read_bytes_metric {nullptr};
160
161
    IntAtomicCounter* _spill_write_bytes_counter {nullptr};
162
    IntAtomicCounter* _spill_read_bytes_counter {nullptr};
163
};
164
} // namespace doris
165
#include "common/compile_check_end.h"