Coverage Report

Created: 2026-05-09 10:24

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/operator/data_queue.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/operator/data_queue.h"
19
20
#include <glog/logging.h>
21
22
#include <algorithm>
23
#include <utility>
24
25
#include "common/thread_safety_annotations.h"
26
#include "core/block/block.h"
27
#include "exec/pipeline/dependency.h"
28
29
namespace doris {
30
31
11.9k
void SubQueue::try_pop(std::unique_ptr<Block>* output_block) {
32
11.9k
    LockGuard l(queue_lock);
33
11.9k
    if (!blocks.empty()) {
34
7.73k
        *output_block = std::move(blocks.front());
35
7.73k
        blocks.pop_front();
36
7.73k
        bytes_in_queue -= (*output_block)->allocated_bytes();
37
7.73k
        blocks_in_queue -= 1;
38
7.73k
        if (blocks.empty()) {
39
7.67k
            sink_dependency->set_ready();
40
7.67k
        }
41
7.73k
    }
42
11.9k
}
43
44
7.76k
bool SubQueue::try_push(std::unique_ptr<Block> block, std::atomic_uint32_t& total_counter) {
45
7.76k
    LockGuard l(queue_lock);
46
7.76k
    if (is_finished) {
47
6
        return false;
48
6
    }
49
7.75k
    total_counter++;
50
7.75k
    bytes_in_queue += block->allocated_bytes();
51
7.75k
    blocks.emplace_back(std::move(block));
52
7.75k
    blocks_in_queue += 1;
53
7.75k
    if (static_cast<int64_t>(blocks.size()) > max_blocks_in_queue.load()) {
54
74
        sink_dependency->block();
55
74
    }
56
7.75k
    return true;
57
7.76k
}
58
59
bool SubQueue::mark_finished(std::atomic_uint32_t& unfinished_counter,
60
15.1k
                             std::atomic_bool& all_finished) {
61
15.1k
    LockGuard l(queue_lock);
62
15.1k
    if (is_finished) {
63
7.57k
        return false;
64
7.57k
    }
65
7.60k
    is_finished = true;
66
7.60k
    if (unfinished_counter.fetch_sub(1) == 1) {
67
3.65k
        all_finished = true;
68
3.65k
    }
69
7.60k
    return true;
70
15.1k
}
71
72
7.57k
void SubQueue::clear_blocks() {
73
7.57k
    bool need_set_always_ready = false;
74
7.57k
    {
75
7.57k
        LockGuard l(queue_lock);
76
7.57k
        if (!blocks.empty()) {
77
15
            blocks.clear();
78
15
            bytes_in_queue = 0;
79
15
            blocks_in_queue = 0;
80
15
            need_set_always_ready = true;
81
15
        }
82
7.57k
    }
83
    // Notify outside of queue_lock to keep lock ordering simple.
84
7.57k
    if (need_set_always_ready) {
85
15
        sink_dependency->set_always_ready();
86
15
    }
87
7.57k
}
88
89
3.65k
DataQueue::DataQueue(int child_count) : _sub_queues(child_count), _child_count(child_count) {
90
7.62k
    for (auto& sub : _sub_queues) {
91
7.62k
        sub = std::make_unique<SubQueue>();
92
7.62k
    }
93
3.65k
    _un_finished_counter = child_count;
94
3.65k
}
95
96
3
bool DataQueue::has_more_data() const {
97
3
    return _cur_blocks_total_nums.load() > 0;
98
3
}
99
100
void DataQueue::set_source_dependency(std::shared_ptr<Dependency> source_dependency)
101
3.63k
        NO_THREAD_SAFETY_ANALYSIS {
102
3.63k
    _source_dependency = std::move(source_dependency);
103
3.63k
}
104
105
7.60k
void DataQueue::set_sink_dependency(Dependency* sink_dependency, int child_idx) {
106
7.60k
    _sub_queues[child_idx]->sink_dependency = sink_dependency;
107
7.60k
}
108
109
7.60k
void DataQueue::set_max_blocks_in_sub_queue(int64_t max_blocks) {
110
17.4k
    for (auto& sub : _sub_queues) {
111
17.4k
        sub->max_blocks_in_queue = max_blocks;
112
17.4k
    }
113
7.60k
}
114
115
1
void DataQueue::set_low_memory_mode() {
116
1
    _is_low_memory_mode = true;
117
3
    for (auto& sub : _sub_queues) {
118
3
        sub->max_blocks_in_queue = 1;
119
3
    }
120
1
    clear_free_blocks();
121
1
}
122
123
7.59k
std::unique_ptr<Block> DataQueue::get_free_block(int child_idx) {
124
7.59k
    auto& sub = *_sub_queues[child_idx];
125
7.59k
    {
126
7.59k
        LockGuard l(sub.free_lock);
127
7.59k
        if (!sub.free_blocks.empty()) {
128
4
            auto block = std::move(sub.free_blocks.front());
129
4
            sub.free_blocks.pop_front();
130
4
            return block;
131
4
        }
132
7.59k
    }
133
134
7.59k
    return Block::create_unique();
135
7.59k
}
136
137
7.57k
void DataQueue::push_free_block(std::unique_ptr<Block> block, int child_idx) {
138
7.57k
    DCHECK(block->rows() == 0);
139
140
7.57k
    if (!_is_low_memory_mode) {
141
7.56k
        auto& sub = *_sub_queues[child_idx];
142
7.56k
        LockGuard l(sub.free_lock);
143
7.56k
        sub.free_blocks.emplace_back(std::move(block));
144
7.56k
    }
145
7.57k
}
146
147
3.62k
void DataQueue::clear_free_blocks() {
148
7.58k
    for (auto& sub : _sub_queues) {
149
7.58k
        LockGuard l(sub->free_lock);
150
7.58k
        std::deque<std::unique_ptr<Block>> tmp_queue;
151
7.58k
        sub->free_blocks.swap(tmp_queue);
152
7.58k
    }
153
3.62k
}
154
155
3.62k
void DataQueue::terminate() {
156
11.2k
    for (int i = 0; i < _child_count; ++i) {
157
7.58k
        set_finish(i);
158
7.58k
        _sub_queues[i]->clear_blocks();
159
7.58k
    }
160
3.62k
    clear_free_blocks();
161
3.62k
}
162
163
//check which queue have data, and save the idx in _flag_queue_idx,
164
//so next loop, will check the record idx + 1 first
165
//maybe it's useful with many queue, others maybe always 0
166
19.1k
bool DataQueue::remaining_has_data() {
167
19.1k
    int count = _child_count;
168
47.0k
    while (--count >= 0) {
169
35.6k
        _flag_queue_idx++;
170
35.6k
        if (_flag_queue_idx == _child_count) {
171
15.2k
            _flag_queue_idx = 0;
172
15.2k
        }
173
35.6k
        if (_sub_queues[_flag_queue_idx]->blocks_in_queue.load() > 0) {
174
7.72k
            return true;
175
7.72k
        }
176
35.6k
    }
177
11.4k
    return false;
178
19.1k
}
179
180
//the _flag_queue_idx indicate which queue has data, and in check can_read
181
//will be set idx in remaining_has_data function
182
11.9k
Status DataQueue::get_block_from_queue(std::unique_ptr<Block>* output_block, int* child_idx) {
183
11.9k
    const int idx = _flag_queue_idx;
184
11.9k
    auto& sub = *_sub_queues[idx];
185
186
11.9k
    sub.try_pop(output_block);
187
11.9k
    if (*output_block) {
188
7.73k
        if (child_idx) {
189
7.73k
            *child_idx = idx;
190
7.73k
        }
191
7.73k
        auto old_total = _cur_blocks_total_nums.fetch_sub(1);
192
7.73k
        if (old_total == 1) {
193
7.20k
            set_source_block();
194
7.20k
        }
195
7.73k
    }
196
11.9k
    return Status::OK();
197
11.9k
}
198
199
7.74k
Status DataQueue::push_block(std::unique_ptr<Block> block, int child_idx) {
200
7.74k
    if (!block) {
201
0
        return Status::OK();
202
0
    }
203
7.74k
    auto& sub = *_sub_queues[child_idx];
204
    // total_counter is incremented inside try_push under queue_lock, only when the
205
    // block is actually enqueued. This ensures get_block_from_queue() always observes
206
    // _cur_blocks_total_nums >= 1 when it successfully pops a block, with no risk of
207
    // underflow or the need for a rollback on failure.
208
7.74k
    if (!sub.try_push(std::move(block), _cur_blocks_total_nums)) {
209
5
        return Status::EndOfFile("SubQueue already finished");
210
5
    }
211
7.73k
    set_source_ready();
212
7.73k
    return Status::OK();
213
7.74k
}
214
215
15.1k
void DataQueue::set_finish(int child_idx) {
216
15.1k
    auto& sub = *_sub_queues[child_idx];
217
15.1k
    if (!sub.mark_finished(_un_finished_counter, _is_all_finished)) {
218
7.57k
        return;
219
7.57k
    }
220
7.60k
    set_source_ready();
221
7.60k
}
222
223
15.0k
bool DataQueue::is_all_finish() {
224
15.0k
    return _is_all_finished;
225
15.0k
}
226
227
15.3k
void DataQueue::set_source_ready() {
228
15.3k
    LockGuard lc(_source_lock);
229
15.3k
    if (_source_dependency) {
230
15.3k
        _source_dependency->set_ready();
231
15.3k
    }
232
15.3k
}
233
234
7.20k
void DataQueue::set_source_block() {
235
    // Re-check under _source_lock to avoid blocking the source when a concurrent push
236
    // has already added new blocks (or all children have finished) since we observed
237
    // the counter drop to zero.
238
7.20k
    LockGuard lc(_source_lock);
239
7.20k
    if (_source_dependency && _cur_blocks_total_nums == 0 && !is_all_finish()) {
240
3.59k
        _source_dependency->block();
241
3.59k
    }
242
7.20k
}
243
244
} // namespace doris