be/src/exec/operator/data_queue.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/operator/data_queue.h" |
19 | | |
20 | | #include <glog/logging.h> |
21 | | |
22 | | #include <algorithm> |
23 | | #include <utility> |
24 | | |
25 | | #include "common/thread_safety_annotations.h" |
26 | | #include "core/block/block.h" |
27 | | #include "exec/pipeline/dependency.h" |
28 | | |
29 | | namespace doris { |
30 | | |
31 | 173 | void SubQueue::try_pop(std::unique_ptr<Block>* output_block) { |
32 | 173 | LockGuard l(queue_lock); |
33 | 173 | if (!blocks.empty()) { |
34 | 169 | *output_block = std::move(blocks.front()); |
35 | 169 | blocks.pop_front(); |
36 | 169 | bytes_in_queue -= (*output_block)->allocated_bytes(); |
37 | 169 | blocks_in_queue -= 1; |
38 | 169 | if (blocks.empty()) { |
39 | 102 | sink_dependency->set_ready(); |
40 | 102 | } |
41 | 169 | } |
42 | 173 | } |
43 | | |
44 | 185 | bool SubQueue::try_push(std::unique_ptr<Block> block, std::atomic_uint32_t& total_counter) { |
45 | 185 | LockGuard l(queue_lock); |
46 | 185 | if (is_finished) { |
47 | 1 | return false; |
48 | 1 | } |
49 | 184 | total_counter++; |
50 | 184 | bytes_in_queue += block->allocated_bytes(); |
51 | 184 | blocks.emplace_back(std::move(block)); |
52 | 184 | blocks_in_queue += 1; |
53 | 184 | if (static_cast<int64_t>(blocks.size()) > max_blocks_in_queue.load()) { |
54 | 75 | sink_dependency->block(); |
55 | 75 | } |
56 | 184 | return true; |
57 | 185 | } |
58 | | |
59 | | bool SubQueue::mark_finished(std::atomic_uint32_t& unfinished_counter, |
60 | 23 | std::atomic_bool& all_finished) { |
61 | 23 | LockGuard l(queue_lock); |
62 | 23 | if (is_finished) { |
63 | 2 | return false; |
64 | 2 | } |
65 | 21 | is_finished = true; |
66 | 21 | if (unfinished_counter.fetch_sub(1) == 1) { |
67 | 10 | all_finished = true; |
68 | 10 | } |
69 | 21 | return true; |
70 | 23 | } |
71 | | |
72 | 5 | void SubQueue::clear_blocks() { |
73 | 5 | bool need_set_always_ready = false; |
74 | 5 | { |
75 | 5 | LockGuard l(queue_lock); |
76 | 5 | if (!blocks.empty()) { |
77 | 3 | blocks.clear(); |
78 | 3 | bytes_in_queue = 0; |
79 | 3 | blocks_in_queue = 0; |
80 | 3 | need_set_always_ready = true; |
81 | 3 | } |
82 | 5 | } |
83 | | // Notify outside of queue_lock to keep lock ordering simple. |
84 | 5 | if (need_set_always_ready) { |
85 | 3 | sink_dependency->set_always_ready(); |
86 | 3 | } |
87 | 5 | } |
88 | | |
89 | 15 | DataQueue::DataQueue(int child_count) : _sub_queues(child_count), _child_count(child_count) { |
90 | 39 | for (auto& sub : _sub_queues) { |
91 | 39 | sub = std::make_unique<SubQueue>(); |
92 | 39 | } |
93 | 15 | _un_finished_counter = child_count; |
94 | 15 | } |
95 | | |
96 | 3 | bool DataQueue::has_more_data() const { |
97 | 3 | return _cur_blocks_total_nums.load() > 0; |
98 | 3 | } |
99 | | |
100 | | void DataQueue::set_source_dependency(std::shared_ptr<Dependency> source_dependency) |
101 | 15 | NO_THREAD_SAFETY_ANALYSIS { |
102 | 15 | _source_dependency = std::move(source_dependency); |
103 | 15 | } |
104 | | |
105 | 39 | void DataQueue::set_sink_dependency(Dependency* sink_dependency, int child_idx) { |
106 | 39 | _sub_queues[child_idx]->sink_dependency = sink_dependency; |
107 | 39 | } |
108 | | |
109 | 7 | void DataQueue::set_max_blocks_in_sub_queue(int64_t max_blocks) { |
110 | 15 | for (auto& sub : _sub_queues) { |
111 | 15 | sub->max_blocks_in_queue = max_blocks; |
112 | 15 | } |
113 | 7 | } |
114 | | |
115 | 1 | void DataQueue::set_low_memory_mode() { |
116 | 1 | _is_low_memory_mode = true; |
117 | 3 | for (auto& sub : _sub_queues) { |
118 | 3 | sub->max_blocks_in_queue = 1; |
119 | 3 | } |
120 | 1 | clear_free_blocks(); |
121 | 1 | } |
122 | | |
123 | 14 | std::unique_ptr<Block> DataQueue::get_free_block(int child_idx) { |
124 | 14 | auto& sub = *_sub_queues[child_idx]; |
125 | 14 | { |
126 | 14 | LockGuard l(sub.free_lock); |
127 | 14 | if (!sub.free_blocks.empty()) { |
128 | 2 | auto block = std::move(sub.free_blocks.front()); |
129 | 2 | sub.free_blocks.pop_front(); |
130 | 2 | return block; |
131 | 2 | } |
132 | 14 | } |
133 | | |
134 | 12 | return Block::create_unique(); |
135 | 14 | } |
136 | | |
137 | 13 | void DataQueue::push_free_block(std::unique_ptr<Block> block, int child_idx) { |
138 | 13 | DCHECK(block->rows() == 0); |
139 | | |
140 | 13 | if (!_is_low_memory_mode) { |
141 | 12 | auto& sub = *_sub_queues[child_idx]; |
142 | 12 | LockGuard l(sub.free_lock); |
143 | 12 | sub.free_blocks.emplace_back(std::move(block)); |
144 | 12 | } |
145 | 13 | } |
146 | | |
147 | 2 | void DataQueue::clear_free_blocks() { |
148 | 6 | for (auto& sub : _sub_queues) { |
149 | 6 | LockGuard l(sub->free_lock); |
150 | 6 | std::deque<std::unique_ptr<Block>> tmp_queue; |
151 | 6 | sub->free_blocks.swap(tmp_queue); |
152 | 6 | } |
153 | 2 | } |
154 | | |
155 | 1 | void DataQueue::terminate() { |
156 | 4 | for (int i = 0; i < _child_count; ++i) { |
157 | 3 | set_finish(i); |
158 | 3 | _sub_queues[i]->clear_blocks(); |
159 | 3 | } |
160 | 1 | clear_free_blocks(); |
161 | 1 | } |
162 | | |
163 | | //check which queue have data, and save the idx in _flag_queue_idx, |
164 | | //so next loop, will check the record idx + 1 first |
165 | | //maybe it's useful with many queue, others maybe always 0 |
166 | 172 | bool DataQueue::remaining_has_data() { |
167 | 172 | int count = _child_count; |
168 | 220 | while (--count >= 0) { |
169 | 208 | _flag_queue_idx++; |
170 | 208 | if (_flag_queue_idx == _child_count) { |
171 | 69 | _flag_queue_idx = 0; |
172 | 69 | } |
173 | 208 | if (_sub_queues[_flag_queue_idx]->blocks_in_queue.load() > 0) { |
174 | 160 | return true; |
175 | 160 | } |
176 | 208 | } |
177 | 12 | return false; |
178 | 172 | } |
179 | | |
180 | | //the _flag_queue_idx indicate which queue has data, and in check can_read |
181 | | //will be set idx in remaining_has_data function |
182 | 167 | Status DataQueue::get_block_from_queue(std::unique_ptr<Block>* output_block, int* child_idx) { |
183 | 167 | const int idx = _flag_queue_idx; |
184 | 167 | auto& sub = *_sub_queues[idx]; |
185 | | |
186 | 167 | sub.try_pop(output_block); |
187 | 167 | if (*output_block) { |
188 | 164 | if (child_idx) { |
189 | 164 | *child_idx = idx; |
190 | 164 | } |
191 | 164 | auto old_total = _cur_blocks_total_nums.fetch_sub(1); |
192 | 164 | if (old_total == 1) { |
193 | 13 | set_source_block(); |
194 | 13 | } |
195 | 164 | } |
196 | 167 | return Status::OK(); |
197 | 167 | } |
198 | | |
199 | 174 | Status DataQueue::push_block(std::unique_ptr<Block> block, int child_idx) { |
200 | 174 | if (!block) { |
201 | 0 | return Status::OK(); |
202 | 0 | } |
203 | 174 | auto& sub = *_sub_queues[child_idx]; |
204 | | // total_counter is incremented inside try_push under queue_lock, only when the |
205 | | // block is actually enqueued. This ensures get_block_from_queue() always observes |
206 | | // _cur_blocks_total_nums >= 1 when it successfully pops a block, with no risk of |
207 | | // underflow or the need for a rollback on failure. |
208 | 174 | if (!sub.try_push(std::move(block), _cur_blocks_total_nums)) { |
209 | 0 | return Status::EndOfFile("SubQueue already finished"); |
210 | 0 | } |
211 | 174 | set_source_ready(); |
212 | 174 | return Status::OK(); |
213 | 174 | } |
214 | | |
215 | 19 | void DataQueue::set_finish(int child_idx) { |
216 | 19 | auto& sub = *_sub_queues[child_idx]; |
217 | 19 | if (!sub.mark_finished(_un_finished_counter, _is_all_finished)) { |
218 | 1 | return; |
219 | 1 | } |
220 | 18 | set_source_ready(); |
221 | 18 | } |
222 | | |
223 | 25 | bool DataQueue::is_all_finish() { |
224 | 25 | return _is_all_finished; |
225 | 25 | } |
226 | | |
227 | 192 | void DataQueue::set_source_ready() { |
228 | 192 | LockGuard lc(_source_lock); |
229 | 192 | if (_source_dependency) { |
230 | 192 | _source_dependency->set_ready(); |
231 | 192 | } |
232 | 192 | } |
233 | | |
234 | 13 | void DataQueue::set_source_block() { |
235 | | // Re-check under _source_lock to avoid blocking the source when a concurrent push |
236 | | // has already added new blocks (or all children have finished) since we observed |
237 | | // the counter drop to zero. |
238 | 13 | LockGuard lc(_source_lock); |
239 | 13 | if (_source_dependency && _cur_blocks_total_nums == 0 && !is_all_finish()) { |
240 | 6 | _source_dependency->block(); |
241 | 6 | } |
242 | 13 | } |
243 | | |
244 | | } // namespace doris |