be/src/exec/operator/data_queue.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/operator/data_queue.h" |
19 | | |
20 | | #include <glog/logging.h> |
21 | | |
22 | | #include <algorithm> |
23 | | #include <utility> |
24 | | |
25 | | #include "common/thread_safety_annotations.h" |
26 | | #include "core/block/block.h" |
27 | | #include "exec/pipeline/dependency.h" |
28 | | |
29 | | namespace doris { |
30 | | |
31 | 41.1k | void SubQueue::try_pop(std::unique_ptr<Block>* output_block) { |
32 | 41.1k | LockGuard l(queue_lock); |
33 | 41.1k | if (!blocks.empty()) { |
34 | 8.43k | *output_block = std::move(blocks.front()); |
35 | 8.43k | blocks.pop_front(); |
36 | 8.43k | bytes_in_queue -= (*output_block)->allocated_bytes(); |
37 | 8.43k | blocks_in_queue -= 1; |
38 | 8.43k | if (blocks.empty()) { |
39 | 8.36k | sink_dependency->set_ready(); |
40 | 8.36k | } |
41 | 8.43k | } |
42 | 41.1k | } |
43 | | |
44 | 8.43k | bool SubQueue::try_push(std::unique_ptr<Block> block, std::atomic_uint32_t& total_counter) { |
45 | 8.43k | LockGuard l(queue_lock); |
46 | 8.43k | if (is_finished) { |
47 | 1 | return false; |
48 | 1 | } |
49 | 8.43k | total_counter++; |
50 | 8.43k | bytes_in_queue += block->allocated_bytes(); |
51 | 8.43k | blocks.emplace_back(std::move(block)); |
52 | 8.43k | blocks_in_queue += 1; |
53 | 8.43k | if (static_cast<int64_t>(blocks.size()) > max_blocks_in_queue.load()) { |
54 | 76 | sink_dependency->block(); |
55 | 76 | } |
56 | 8.43k | return true; |
57 | 8.43k | } |
58 | | |
59 | | bool SubQueue::mark_finished(std::atomic_uint32_t& unfinished_counter, |
60 | 16.5k | std::atomic_bool& all_finished) { |
61 | 16.5k | LockGuard l(queue_lock); |
62 | 16.5k | if (is_finished) { |
63 | 8.26k | return false; |
64 | 8.26k | } |
65 | 8.30k | is_finished = true; |
66 | 8.30k | if (unfinished_counter.fetch_sub(1) == 1) { |
67 | 4.08k | all_finished = true; |
68 | 4.08k | } |
69 | 8.30k | return true; |
70 | 16.5k | } |
71 | | |
72 | 8.28k | void SubQueue::clear_blocks() { |
73 | 8.28k | bool need_set_always_ready = false; |
74 | 8.28k | { |
75 | 8.28k | LockGuard l(queue_lock); |
76 | 8.28k | if (!blocks.empty()) { |
77 | 10 | blocks.clear(); |
78 | 10 | bytes_in_queue = 0; |
79 | 10 | blocks_in_queue = 0; |
80 | 10 | need_set_always_ready = true; |
81 | 10 | } |
82 | 8.28k | } |
83 | | // Notify outside of queue_lock to keep lock ordering simple. |
84 | 8.28k | if (need_set_always_ready) { |
85 | 10 | sink_dependency->set_always_ready(); |
86 | 10 | } |
87 | 8.28k | } |
88 | | |
89 | 4.06k | DataQueue::DataQueue(int child_count) : _sub_queues(child_count), _child_count(child_count) { |
90 | 8.28k | for (auto& sub : _sub_queues) { |
91 | 8.28k | sub = std::make_unique<SubQueue>(); |
92 | 8.28k | } |
93 | 4.06k | _un_finished_counter = child_count; |
94 | 4.06k | } |
95 | | |
96 | 3 | bool DataQueue::has_more_data() const { |
97 | 3 | return _cur_blocks_total_nums.load() > 0; |
98 | 3 | } |
99 | | |
100 | | void DataQueue::set_source_dependency(std::shared_ptr<Dependency> source_dependency) |
101 | 3.97k | NO_THREAD_SAFETY_ANALYSIS { |
102 | 3.97k | _source_dependency = std::move(source_dependency); |
103 | 3.97k | } |
104 | | |
105 | 8.18k | void DataQueue::set_sink_dependency(Dependency* sink_dependency, int child_idx) { |
106 | 8.18k | _sub_queues[child_idx]->sink_dependency = sink_dependency; |
107 | 8.18k | } |
108 | | |
109 | 8.30k | void DataQueue::set_max_blocks_in_sub_queue(int64_t max_blocks) { |
110 | 18.4k | for (auto& sub : _sub_queues) { |
111 | 18.4k | sub->max_blocks_in_queue = max_blocks; |
112 | 18.4k | } |
113 | 8.30k | } |
114 | | |
115 | 1 | void DataQueue::set_low_memory_mode() { |
116 | 1 | _is_low_memory_mode = true; |
117 | 3 | for (auto& sub : _sub_queues) { |
118 | 3 | sub->max_blocks_in_queue = 1; |
119 | 3 | } |
120 | 1 | clear_free_blocks(); |
121 | 1 | } |
122 | | |
123 | 8.28k | std::unique_ptr<Block> DataQueue::get_free_block(int child_idx) { |
124 | 8.28k | auto& sub = *_sub_queues[child_idx]; |
125 | 8.28k | { |
126 | 8.28k | LockGuard l(sub.free_lock); |
127 | 8.28k | if (!sub.free_blocks.empty()) { |
128 | 3 | auto block = std::move(sub.free_blocks.front()); |
129 | 3 | sub.free_blocks.pop_front(); |
130 | 3 | return block; |
131 | 3 | } |
132 | 8.28k | } |
133 | | |
134 | 8.27k | return Block::create_unique(); |
135 | 8.28k | } |
136 | | |
137 | 8.26k | void DataQueue::push_free_block(std::unique_ptr<Block> block, int child_idx) { |
138 | 8.26k | DCHECK(block->rows() == 0); |
139 | | |
140 | 8.26k | if (!_is_low_memory_mode) { |
141 | 8.26k | auto& sub = *_sub_queues[child_idx]; |
142 | 8.26k | LockGuard l(sub.free_lock); |
143 | 8.26k | sub.free_blocks.emplace_back(std::move(block)); |
144 | 8.26k | } |
145 | 8.26k | } |
146 | | |
147 | 4.05k | void DataQueue::clear_free_blocks() { |
148 | 8.29k | for (auto& sub : _sub_queues) { |
149 | 8.29k | LockGuard l(sub->free_lock); |
150 | 8.29k | std::deque<std::unique_ptr<Block>> tmp_queue; |
151 | 8.29k | sub->free_blocks.swap(tmp_queue); |
152 | 8.29k | } |
153 | 4.05k | } |
154 | | |
155 | 4.05k | void DataQueue::terminate() { |
156 | 12.3k | for (int i = 0; i < _child_count; ++i) { |
157 | 8.29k | set_finish(i); |
158 | 8.29k | _sub_queues[i]->clear_blocks(); |
159 | 8.29k | } |
160 | 4.05k | clear_free_blocks(); |
161 | 4.05k | } |
162 | | |
163 | | //check which queue have data, and save the idx in _flag_queue_idx, |
164 | | //so next loop, will check the record idx + 1 first |
165 | | //maybe it's useful with many queue, others maybe always 0 |
166 | 49.2k | bool DataQueue::remaining_has_data() { |
167 | 49.2k | int count = _child_count; |
168 | 135k | while (--count >= 0) { |
169 | 94.5k | _flag_queue_idx++; |
170 | 94.5k | if (_flag_queue_idx == _child_count) { |
171 | 45.0k | _flag_queue_idx = 0; |
172 | 45.0k | } |
173 | 94.5k | if (_sub_queues[_flag_queue_idx]->blocks_in_queue.load() > 0) { |
174 | 8.42k | return true; |
175 | 8.42k | } |
176 | 94.5k | } |
177 | 40.7k | return false; |
178 | 49.2k | } |
179 | | |
180 | | //the _flag_queue_idx indicate which queue has data, and in check can_read |
181 | | //will be set idx in remaining_has_data function |
182 | 41.1k | Status DataQueue::get_block_from_queue(std::unique_ptr<Block>* output_block, int* child_idx) { |
183 | 41.1k | const int idx = _flag_queue_idx; |
184 | 41.1k | auto& sub = *_sub_queues[idx]; |
185 | | |
186 | 41.1k | sub.try_pop(output_block); |
187 | 41.1k | if (*output_block) { |
188 | 8.42k | if (child_idx) { |
189 | 8.42k | *child_idx = idx; |
190 | 8.42k | } |
191 | 8.42k | auto old_total = _cur_blocks_total_nums.fetch_sub(1); |
192 | 8.42k | if (old_total == 1) { |
193 | 8.03k | set_source_block(); |
194 | 8.03k | } |
195 | 8.42k | } |
196 | 41.1k | return Status::OK(); |
197 | 41.1k | } |
198 | | |
199 | 8.41k | Status DataQueue::push_block(std::unique_ptr<Block> block, int child_idx) { |
200 | 8.41k | if (!block) { |
201 | 0 | return Status::OK(); |
202 | 0 | } |
203 | 8.41k | auto& sub = *_sub_queues[child_idx]; |
204 | | // total_counter is incremented inside try_push under queue_lock, only when the |
205 | | // block is actually enqueued. This ensures get_block_from_queue() always observes |
206 | | // _cur_blocks_total_nums >= 1 when it successfully pops a block, with no risk of |
207 | | // underflow or the need for a rollback on failure. |
208 | 8.41k | if (!sub.try_push(std::move(block), _cur_blocks_total_nums)) { |
209 | 0 | return Status::EndOfFile("SubQueue already finished"); |
210 | 0 | } |
211 | 8.41k | set_source_ready(); |
212 | 8.41k | return Status::OK(); |
213 | 8.41k | } |
214 | | |
215 | 16.5k | void DataQueue::set_finish(int child_idx) { |
216 | 16.5k | auto& sub = *_sub_queues[child_idx]; |
217 | 16.5k | if (!sub.mark_finished(_un_finished_counter, _is_all_finished)) { |
218 | 8.26k | return; |
219 | 8.26k | } |
220 | 8.30k | set_source_ready(); |
221 | 8.30k | } |
222 | | |
223 | 44.8k | bool DataQueue::is_all_finish() { |
224 | 44.8k | return _is_all_finished; |
225 | 44.8k | } |
226 | | |
227 | 16.7k | void DataQueue::set_source_ready() { |
228 | 16.7k | LockGuard lc(_source_lock); |
229 | 16.7k | if (_source_dependency) { |
230 | 16.7k | _source_dependency->set_ready(); |
231 | 16.7k | } |
232 | 16.7k | } |
233 | | |
234 | 8.03k | void DataQueue::set_source_block() { |
235 | | // Re-check under _source_lock to avoid blocking the source when a concurrent push |
236 | | // has already added new blocks (or all children have finished) since we observed |
237 | | // the counter drop to zero. |
238 | 8.03k | LockGuard lc(_source_lock); |
239 | 8.03k | if (_source_dependency && _cur_blocks_total_nums == 0 && !is_all_finish()) { |
240 | 4.01k | _source_dependency->block(); |
241 | 4.01k | } |
242 | 8.03k | } |
243 | | |
244 | | } // namespace doris |