Coverage Report

Created: 2026-03-12 17:15

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/operator/spill_utils.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/Metrics_types.h>
21
#include <gen_cpp/Types_types.h>
22
#include <glog/logging.h>
23
24
#include <atomic>
25
#include <functional>
26
#include <utility>
27
28
#include "exec/partitioner/partitioner.h"
29
#include "runtime/fragment_mgr.h"
30
#include "runtime/memory/mem_tracker_limiter.h"
31
#include "runtime/query_context.h"
32
#include "runtime/runtime_profile.h"
33
#include "runtime/runtime_state.h"
34
#include "runtime/thread_context.h"
35
36
namespace doris {
37
#include "common/compile_check_begin.h"
38
using SpillPartitionerType = Crc32HashPartitioner<SpillPartitionChannelIds>;
39
40
struct SpillContext {
41
    std::atomic_int running_tasks_count;
42
    TUniqueId query_id;
43
    std::function<void(SpillContext*)> all_tasks_finished_callback;
44
45
    SpillContext(int running_tasks_count_, TUniqueId query_id_,
46
                 std::function<void(SpillContext*)> all_tasks_finished_callback_)
47
806
            : running_tasks_count(running_tasks_count_),
48
806
              query_id(std::move(query_id_)),
49
806
              all_tasks_finished_callback(std::move(all_tasks_finished_callback_)) {}
50
51
806
    ~SpillContext() {
52
806
        if (running_tasks_count.load() != 0) {
53
0
            LOG(WARNING) << "Query: " << print_id(query_id)
54
0
                         << " not all spill tasks finished, remaining tasks: "
55
0
                         << running_tasks_count.load();
56
0
        }
57
806
    }
58
59
806
    void on_task_finished() {
60
806
        auto count = running_tasks_count.fetch_sub(1);
61
806
        if (count == 1) {
62
806
            all_tasks_finished_callback(this);
63
806
        }
64
806
    }
65
};
66
67
class SpillRunnable {
68
protected:
69
    SpillRunnable(RuntimeState* state, std::shared_ptr<SpillContext> spill_context,
70
                  RuntimeProfile* operator_profile, bool is_write,
71
                  std::function<Status()> spill_exec_func,
72
                  std::function<Status()> spill_fin_cb = {})
73
1.49k
            : _state(state),
74
1.49k
              _custom_profile(operator_profile->get_child("CustomCounters")),
75
1.49k
              _spill_context(std::move(spill_context)),
76
1.49k
              _is_write_task(is_write),
77
1.49k
              _spill_exec_func(std::move(spill_exec_func)),
78
1.49k
              _spill_fin_cb(std::move(spill_fin_cb)) {
79
1.49k
        RuntimeProfile* common_profile = operator_profile->get_child("CommonCounters");
80
1.49k
        DCHECK(common_profile != nullptr);
81
1.49k
        DCHECK(_custom_profile != nullptr);
82
1.49k
        _spill_total_timer = _custom_profile->get_counter("SpillTotalTime");
83
84
1.49k
        if (is_write) {
85
835
            _write_wait_in_queue_task_count =
86
835
                    _custom_profile->get_counter("SpillWriteTaskWaitInQueueCount");
87
835
            _writing_task_count = _custom_profile->get_counter("SpillWriteTaskCount");
88
835
            COUNTER_UPDATE(_write_wait_in_queue_task_count, 1);
89
835
        }
90
1.49k
    }
91
92
public:
93
1.49k
    virtual ~SpillRunnable() = default;
94
95
1.49k
    [[nodiscard]] Status run() {
96
1.49k
        SCOPED_TIMER(_spill_total_timer);
97
98
1.49k
        auto* spill_timer = _get_spill_timer();
99
1.49k
        DCHECK(spill_timer != nullptr);
100
1.49k
        SCOPED_TIMER(spill_timer);
101
102
1.49k
        _on_task_started();
103
104
1.49k
        Defer defer([&] {
105
1.49k
            {
106
1.49k
                std::function<Status()> tmp;
107
1.49k
                std::swap(tmp, _spill_exec_func);
108
1.49k
            }
109
1.49k
            {
110
1.49k
                std::function<Status()> tmp;
111
1.49k
                std::swap(tmp, _spill_fin_cb);
112
1.49k
            }
113
1.49k
        });
114
115
1.49k
        if (_state->is_cancelled()) {
116
0
            return _state->cancel_reason();
117
0
        }
118
119
1.49k
        RETURN_IF_ERROR(_spill_exec_func());
120
1.49k
        _on_task_finished();
121
1.49k
        if (_spill_fin_cb) {
122
2
            return _spill_fin_cb();
123
2
        }
124
125
1.48k
        return Status::OK();
126
1.49k
    }
127
128
protected:
129
1.49k
    virtual void _on_task_finished() {
130
1.49k
        if (_spill_context) {
131
806
            _spill_context->on_task_finished();
132
806
        }
133
1.49k
    }
134
135
835
    virtual RuntimeProfile::Counter* _get_spill_timer() {
136
835
        return _custom_profile->get_counter("SpillWriteTime");
137
835
    }
138
139
835
    virtual void _on_task_started() {
140
835
        VLOG_DEBUG << "Query: " << print_id(_state->query_id())
141
0
                   << " spill task started, pipeline task id: " << _state->task_id();
142
835
        if (_is_write_task) {
143
835
            COUNTER_UPDATE(_write_wait_in_queue_task_count, -1);
144
835
            COUNTER_UPDATE(_writing_task_count, 1);
145
835
        }
146
835
    }
147
148
    RuntimeState* _state;
149
    RuntimeProfile* _custom_profile;
150
    std::shared_ptr<SpillContext> _spill_context;
151
    bool _is_write_task;
152
153
private:
154
    RuntimeProfile::Counter* _spill_total_timer;
155
156
    RuntimeProfile::Counter* _write_wait_in_queue_task_count = nullptr;
157
    RuntimeProfile::Counter* _writing_task_count = nullptr;
158
159
    std::function<Status()> _spill_exec_func;
160
    std::function<Status()> _spill_fin_cb;
161
};
162
163
class SpillSinkRunnable : public SpillRunnable {
164
public:
165
    SpillSinkRunnable(RuntimeState* state, std::shared_ptr<SpillContext> spill_context,
166
                      RuntimeProfile* operator_profile, std::function<Status()> spill_exec_func,
167
                      std::function<Status()> spill_fin_cb = {})
168
833
            : SpillRunnable(state, spill_context, operator_profile, true, spill_exec_func,
169
833
                            spill_fin_cb) {}
170
};
171
172
class SpillNonSinkRunnable : public SpillRunnable {
173
public:
174
    SpillNonSinkRunnable(RuntimeState* state, RuntimeProfile* operator_profile,
175
                         std::function<Status()> spill_exec_func,
176
                         std::function<Status()> spill_fin_cb = {})
177
2
            : SpillRunnable(state, nullptr, operator_profile, true, spill_exec_func, spill_fin_cb) {
178
2
    }
179
};
180
181
class SpillRecoverRunnable : public SpillRunnable {
182
public:
183
    SpillRecoverRunnable(RuntimeState* state, RuntimeProfile* operator_profile,
184
                         std::function<Status()> spill_exec_func,
185
                         std::function<Status()> spill_fin_cb = {})
186
661
            : SpillRunnable(state, nullptr, operator_profile, false, spill_exec_func,
187
661
                            spill_fin_cb) {
188
661
        RuntimeProfile* custom_profile = operator_profile->get_child("CustomCounters");
189
661
        DCHECK(custom_profile != nullptr);
190
661
        _spill_revover_timer = custom_profile->get_counter("SpillRecoverTime");
191
661
        _read_wait_in_queue_task_count =
192
661
                custom_profile->get_counter("SpillReadTaskWaitInQueueCount");
193
661
        _reading_task_count = custom_profile->get_counter("SpillReadTaskCount");
194
195
661
        COUNTER_UPDATE(_read_wait_in_queue_task_count, 1);
196
661
    }
197
198
protected:
199
661
    RuntimeProfile::Counter* _get_spill_timer() override {
200
661
        return _custom_profile->get_counter("SpillRecoverTime");
201
661
    }
202
203
661
    void _on_task_started() override {
204
661
        VLOG_DEBUG << "SpillRecoverRunnable, Query: " << print_id(_state->query_id())
205
0
                   << " spill task started, pipeline task id: " << _state->task_id();
206
661
        COUNTER_UPDATE(_read_wait_in_queue_task_count, -1);
207
661
        COUNTER_UPDATE(_reading_task_count, 1);
208
661
    }
209
210
private:
211
    RuntimeProfile::Counter* _spill_revover_timer;
212
    RuntimeProfile::Counter* _read_wait_in_queue_task_count = nullptr;
213
    RuntimeProfile::Counter* _reading_task_count = nullptr;
214
};
215
216
template <bool accumulating>
217
inline void update_profile_from_inner_profile(const std::string& name,
218
                                              RuntimeProfile* runtime_profile,
219
32.9k
                                              RuntimeProfile* inner_profile) {
220
32.9k
    auto* inner_counter = inner_profile->get_counter(name);
221
18.4E
    DCHECK(inner_counter != nullptr) << "inner counter " << name << " not found";
222
32.9k
    if (inner_counter == nullptr) [[unlikely]] {
223
0
        return;
224
0
    }
225
32.9k
    auto* counter = runtime_profile->get_counter(name);
226
32.9k
    if (counter == nullptr) [[unlikely]] {
227
4.65k
        counter = runtime_profile->add_counter(name, inner_counter->type(), "",
228
4.65k
                                               inner_counter->level());
229
4.65k
    }
230
32.9k
    if constexpr (accumulating) {
231
        // Memory usage should not be accumulated.
232
16.7k
        if (counter->type() == TUnit::BYTES) {
233
4.96k
            counter->set(inner_counter->value());
234
11.7k
        } else {
235
11.7k
            counter->update(inner_counter->value());
236
11.7k
        }
237
16.7k
    } else {
238
16.2k
        counter->set(inner_counter->value());
239
16.2k
    }
240
32.9k
}
_ZN5doris33update_profile_from_inner_profileILb0EEEvRKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_14RuntimeProfileESA_
Line
Count
Source
219
16.2k
                                              RuntimeProfile* inner_profile) {
220
16.2k
    auto* inner_counter = inner_profile->get_counter(name);
221
18.4E
    DCHECK(inner_counter != nullptr) << "inner counter " << name << " not found";
222
16.2k
    if (inner_counter == nullptr) [[unlikely]] {
223
0
        return;
224
0
    }
225
16.2k
    auto* counter = runtime_profile->get_counter(name);
226
16.2k
    if (counter == nullptr) [[unlikely]] {
227
3.98k
        counter = runtime_profile->add_counter(name, inner_counter->type(), "",
228
3.98k
                                               inner_counter->level());
229
3.98k
    }
230
    if constexpr (accumulating) {
231
        // Memory usage should not be accumulated.
232
        if (counter->type() == TUnit::BYTES) {
233
            counter->set(inner_counter->value());
234
        } else {
235
            counter->update(inner_counter->value());
236
        }
237
16.2k
    } else {
238
16.2k
        counter->set(inner_counter->value());
239
16.2k
    }
240
16.2k
}
_ZN5doris33update_profile_from_inner_profileILb1EEEvRKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_14RuntimeProfileESA_
Line
Count
Source
219
16.7k
                                              RuntimeProfile* inner_profile) {
220
16.7k
    auto* inner_counter = inner_profile->get_counter(name);
221
16.7k
    DCHECK(inner_counter != nullptr) << "inner counter " << name << " not found";
222
16.7k
    if (inner_counter == nullptr) [[unlikely]] {
223
0
        return;
224
0
    }
225
16.7k
    auto* counter = runtime_profile->get_counter(name);
226
16.7k
    if (counter == nullptr) [[unlikely]] {
227
672
        counter = runtime_profile->add_counter(name, inner_counter->type(), "",
228
672
                                               inner_counter->level());
229
672
    }
230
16.7k
    if constexpr (accumulating) {
231
        // Memory usage should not be accumulated.
232
16.7k
        if (counter->type() == TUnit::BYTES) {
233
4.96k
            counter->set(inner_counter->value());
234
11.7k
        } else {
235
11.7k
            counter->update(inner_counter->value());
236
11.7k
        }
237
    } else {
238
        counter->set(inner_counter->value());
239
    }
240
16.7k
}
241
242
#include "common/compile_check_end.h"
243
} // namespace doris