be/src/exec/operator/spill_utils.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <gen_cpp/Metrics_types.h> |
21 | | #include <gen_cpp/Types_types.h> |
22 | | #include <glog/logging.h> |
23 | | |
24 | | #include <atomic> |
25 | | #include <functional> |
26 | | #include <utility> |
27 | | |
28 | | #include "exec/partitioner/partitioner.h" |
29 | | #include "runtime/fragment_mgr.h" |
30 | | #include "runtime/memory/mem_tracker_limiter.h" |
31 | | #include "runtime/query_context.h" |
32 | | #include "runtime/runtime_profile.h" |
33 | | #include "runtime/runtime_state.h" |
34 | | #include "runtime/thread_context.h" |
35 | | |
36 | | namespace doris { |
37 | | #include "common/compile_check_begin.h" |
38 | | using SpillPartitionerType = Crc32HashPartitioner<SpillPartitionChannelIds>; |
39 | | |
40 | | struct SpillContext { |
41 | | std::atomic_int running_tasks_count; |
42 | | TUniqueId query_id; |
43 | | std::function<void(SpillContext*)> all_tasks_finished_callback; |
44 | | |
45 | | SpillContext(int running_tasks_count_, TUniqueId query_id_, |
46 | | std::function<void(SpillContext*)> all_tasks_finished_callback_) |
47 | 806 | : running_tasks_count(running_tasks_count_), |
48 | 806 | query_id(std::move(query_id_)), |
49 | 806 | all_tasks_finished_callback(std::move(all_tasks_finished_callback_)) {} |
50 | | |
51 | 806 | ~SpillContext() { |
52 | 806 | if (running_tasks_count.load() != 0) { |
53 | 0 | LOG(WARNING) << "Query: " << print_id(query_id) |
54 | 0 | << " not all spill tasks finished, remaining tasks: " |
55 | 0 | << running_tasks_count.load(); |
56 | 0 | } |
57 | 806 | } |
58 | | |
59 | 806 | void on_task_finished() { |
60 | 806 | auto count = running_tasks_count.fetch_sub(1); |
61 | 806 | if (count == 1) { |
62 | 806 | all_tasks_finished_callback(this); |
63 | 806 | } |
64 | 806 | } |
65 | | }; |
66 | | |
67 | | class SpillRunnable { |
68 | | protected: |
69 | | SpillRunnable(RuntimeState* state, std::shared_ptr<SpillContext> spill_context, |
70 | | RuntimeProfile* operator_profile, bool is_write, |
71 | | std::function<Status()> spill_exec_func, |
72 | | std::function<Status()> spill_fin_cb = {}) |
73 | 1.49k | : _state(state), |
74 | 1.49k | _custom_profile(operator_profile->get_child("CustomCounters")), |
75 | 1.49k | _spill_context(std::move(spill_context)), |
76 | 1.49k | _is_write_task(is_write), |
77 | 1.49k | _spill_exec_func(std::move(spill_exec_func)), |
78 | 1.49k | _spill_fin_cb(std::move(spill_fin_cb)) { |
79 | 1.49k | RuntimeProfile* common_profile = operator_profile->get_child("CommonCounters"); |
80 | 1.49k | DCHECK(common_profile != nullptr); |
81 | 1.49k | DCHECK(_custom_profile != nullptr); |
82 | 1.49k | _spill_total_timer = _custom_profile->get_counter("SpillTotalTime"); |
83 | | |
84 | 1.49k | if (is_write) { |
85 | 835 | _write_wait_in_queue_task_count = |
86 | 835 | _custom_profile->get_counter("SpillWriteTaskWaitInQueueCount"); |
87 | 835 | _writing_task_count = _custom_profile->get_counter("SpillWriteTaskCount"); |
88 | 835 | COUNTER_UPDATE(_write_wait_in_queue_task_count, 1); |
89 | 835 | } |
90 | 1.49k | } |
91 | | |
92 | | public: |
93 | 1.49k | virtual ~SpillRunnable() = default; |
94 | | |
95 | 1.49k | [[nodiscard]] Status run() { |
96 | 1.49k | SCOPED_TIMER(_spill_total_timer); |
97 | | |
98 | 1.49k | auto* spill_timer = _get_spill_timer(); |
99 | 1.49k | DCHECK(spill_timer != nullptr); |
100 | 1.49k | SCOPED_TIMER(spill_timer); |
101 | | |
102 | 1.49k | _on_task_started(); |
103 | | |
104 | 1.49k | Defer defer([&] { |
105 | 1.49k | { |
106 | 1.49k | std::function<Status()> tmp; |
107 | 1.49k | std::swap(tmp, _spill_exec_func); |
108 | 1.49k | } |
109 | 1.49k | { |
110 | 1.49k | std::function<Status()> tmp; |
111 | 1.49k | std::swap(tmp, _spill_fin_cb); |
112 | 1.49k | } |
113 | 1.49k | }); |
114 | | |
115 | 1.49k | if (_state->is_cancelled()) { |
116 | 0 | return _state->cancel_reason(); |
117 | 0 | } |
118 | | |
119 | 1.49k | RETURN_IF_ERROR(_spill_exec_func()); |
120 | 1.49k | _on_task_finished(); |
121 | 1.49k | if (_spill_fin_cb) { |
122 | 2 | return _spill_fin_cb(); |
123 | 2 | } |
124 | | |
125 | 1.48k | return Status::OK(); |
126 | 1.49k | } |
127 | | |
128 | | protected: |
129 | 1.49k | virtual void _on_task_finished() { |
130 | 1.49k | if (_spill_context) { |
131 | 806 | _spill_context->on_task_finished(); |
132 | 806 | } |
133 | 1.49k | } |
134 | | |
135 | 835 | virtual RuntimeProfile::Counter* _get_spill_timer() { |
136 | 835 | return _custom_profile->get_counter("SpillWriteTime"); |
137 | 835 | } |
138 | | |
139 | 835 | virtual void _on_task_started() { |
140 | 835 | VLOG_DEBUG << "Query: " << print_id(_state->query_id()) |
141 | 0 | << " spill task started, pipeline task id: " << _state->task_id(); |
142 | 835 | if (_is_write_task) { |
143 | 835 | COUNTER_UPDATE(_write_wait_in_queue_task_count, -1); |
144 | 835 | COUNTER_UPDATE(_writing_task_count, 1); |
145 | 835 | } |
146 | 835 | } |
147 | | |
148 | | RuntimeState* _state; |
149 | | RuntimeProfile* _custom_profile; |
150 | | std::shared_ptr<SpillContext> _spill_context; |
151 | | bool _is_write_task; |
152 | | |
153 | | private: |
154 | | RuntimeProfile::Counter* _spill_total_timer; |
155 | | |
156 | | RuntimeProfile::Counter* _write_wait_in_queue_task_count = nullptr; |
157 | | RuntimeProfile::Counter* _writing_task_count = nullptr; |
158 | | |
159 | | std::function<Status()> _spill_exec_func; |
160 | | std::function<Status()> _spill_fin_cb; |
161 | | }; |
162 | | |
163 | | class SpillSinkRunnable : public SpillRunnable { |
164 | | public: |
165 | | SpillSinkRunnable(RuntimeState* state, std::shared_ptr<SpillContext> spill_context, |
166 | | RuntimeProfile* operator_profile, std::function<Status()> spill_exec_func, |
167 | | std::function<Status()> spill_fin_cb = {}) |
168 | 833 | : SpillRunnable(state, spill_context, operator_profile, true, spill_exec_func, |
169 | 833 | spill_fin_cb) {} |
170 | | }; |
171 | | |
172 | | class SpillNonSinkRunnable : public SpillRunnable { |
173 | | public: |
174 | | SpillNonSinkRunnable(RuntimeState* state, RuntimeProfile* operator_profile, |
175 | | std::function<Status()> spill_exec_func, |
176 | | std::function<Status()> spill_fin_cb = {}) |
177 | 2 | : SpillRunnable(state, nullptr, operator_profile, true, spill_exec_func, spill_fin_cb) { |
178 | 2 | } |
179 | | }; |
180 | | |
181 | | class SpillRecoverRunnable : public SpillRunnable { |
182 | | public: |
183 | | SpillRecoverRunnable(RuntimeState* state, RuntimeProfile* operator_profile, |
184 | | std::function<Status()> spill_exec_func, |
185 | | std::function<Status()> spill_fin_cb = {}) |
186 | 661 | : SpillRunnable(state, nullptr, operator_profile, false, spill_exec_func, |
187 | 661 | spill_fin_cb) { |
188 | 661 | RuntimeProfile* custom_profile = operator_profile->get_child("CustomCounters"); |
189 | 661 | DCHECK(custom_profile != nullptr); |
190 | 661 | _spill_revover_timer = custom_profile->get_counter("SpillRecoverTime"); |
191 | 661 | _read_wait_in_queue_task_count = |
192 | 661 | custom_profile->get_counter("SpillReadTaskWaitInQueueCount"); |
193 | 661 | _reading_task_count = custom_profile->get_counter("SpillReadTaskCount"); |
194 | | |
195 | 661 | COUNTER_UPDATE(_read_wait_in_queue_task_count, 1); |
196 | 661 | } |
197 | | |
198 | | protected: |
199 | 661 | RuntimeProfile::Counter* _get_spill_timer() override { |
200 | 661 | return _custom_profile->get_counter("SpillRecoverTime"); |
201 | 661 | } |
202 | | |
203 | 661 | void _on_task_started() override { |
204 | 661 | VLOG_DEBUG << "SpillRecoverRunnable, Query: " << print_id(_state->query_id()) |
205 | 0 | << " spill task started, pipeline task id: " << _state->task_id(); |
206 | 661 | COUNTER_UPDATE(_read_wait_in_queue_task_count, -1); |
207 | 661 | COUNTER_UPDATE(_reading_task_count, 1); |
208 | 661 | } |
209 | | |
210 | | private: |
211 | | RuntimeProfile::Counter* _spill_revover_timer; |
212 | | RuntimeProfile::Counter* _read_wait_in_queue_task_count = nullptr; |
213 | | RuntimeProfile::Counter* _reading_task_count = nullptr; |
214 | | }; |
215 | | |
216 | | template <bool accumulating> |
217 | | inline void update_profile_from_inner_profile(const std::string& name, |
218 | | RuntimeProfile* runtime_profile, |
219 | 32.9k | RuntimeProfile* inner_profile) { |
220 | 32.9k | auto* inner_counter = inner_profile->get_counter(name); |
221 | 18.4E | DCHECK(inner_counter != nullptr) << "inner counter " << name << " not found"; |
222 | 32.9k | if (inner_counter == nullptr) [[unlikely]] { |
223 | 0 | return; |
224 | 0 | } |
225 | 32.9k | auto* counter = runtime_profile->get_counter(name); |
226 | 32.9k | if (counter == nullptr) [[unlikely]] { |
227 | 4.65k | counter = runtime_profile->add_counter(name, inner_counter->type(), "", |
228 | 4.65k | inner_counter->level()); |
229 | 4.65k | } |
230 | 32.9k | if constexpr (accumulating) { |
231 | | // Memory usage should not be accumulated. |
232 | 16.7k | if (counter->type() == TUnit::BYTES) { |
233 | 4.96k | counter->set(inner_counter->value()); |
234 | 11.7k | } else { |
235 | 11.7k | counter->update(inner_counter->value()); |
236 | 11.7k | } |
237 | 16.7k | } else { |
238 | 16.2k | counter->set(inner_counter->value()); |
239 | 16.2k | } |
240 | 32.9k | } _ZN5doris33update_profile_from_inner_profileILb0EEEvRKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_14RuntimeProfileESA_ Line | Count | Source | 219 | 16.2k | RuntimeProfile* inner_profile) { | 220 | 16.2k | auto* inner_counter = inner_profile->get_counter(name); | 221 | 18.4E | DCHECK(inner_counter != nullptr) << "inner counter " << name << " not found"; | 222 | 16.2k | if (inner_counter == nullptr) [[unlikely]] { | 223 | 0 | return; | 224 | 0 | } | 225 | 16.2k | auto* counter = runtime_profile->get_counter(name); | 226 | 16.2k | if (counter == nullptr) [[unlikely]] { | 227 | 3.98k | counter = runtime_profile->add_counter(name, inner_counter->type(), "", | 228 | 3.98k | inner_counter->level()); | 229 | 3.98k | } | 230 | | if constexpr (accumulating) { | 231 | | // Memory usage should not be accumulated. | 232 | | if (counter->type() == TUnit::BYTES) { | 233 | | counter->set(inner_counter->value()); | 234 | | } else { | 235 | | counter->update(inner_counter->value()); | 236 | | } | 237 | 16.2k | } else { | 238 | 16.2k | counter->set(inner_counter->value()); | 239 | 16.2k | } | 240 | 16.2k | } |
_ZN5doris33update_profile_from_inner_profileILb1EEEvRKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEPNS_14RuntimeProfileESA_ Line | Count | Source | 219 | 16.7k | RuntimeProfile* inner_profile) { | 220 | 16.7k | auto* inner_counter = inner_profile->get_counter(name); | 221 | 16.7k | DCHECK(inner_counter != nullptr) << "inner counter " << name << " not found"; | 222 | 16.7k | if (inner_counter == nullptr) [[unlikely]] { | 223 | 0 | return; | 224 | 0 | } | 225 | 16.7k | auto* counter = runtime_profile->get_counter(name); | 226 | 16.7k | if (counter == nullptr) [[unlikely]] { | 227 | 672 | counter = runtime_profile->add_counter(name, inner_counter->type(), "", | 228 | 672 | inner_counter->level()); | 229 | 672 | } | 230 | 16.7k | if constexpr (accumulating) { | 231 | | // Memory usage should not be accumulated. | 232 | 16.7k | if (counter->type() == TUnit::BYTES) { | 233 | 4.96k | counter->set(inner_counter->value()); | 234 | 11.7k | } else { | 235 | 11.7k | counter->update(inner_counter->value()); | 236 | 11.7k | } | 237 | | } else { | 238 | | counter->set(inner_counter->value()); | 239 | | } | 240 | 16.7k | } |
|
241 | | |
242 | | #include "common/compile_check_end.h" |
243 | | } // namespace doris |