Coverage Report

Created: 2026-04-10 04:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/operator/data_queue.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/operator/data_queue.h"
19
20
#include <glog/logging.h>
21
22
#include <algorithm>
23
#include <mutex>
24
#include <utility>
25
26
#include "core/block/block.h"
27
#include "exec/pipeline/dependency.h"
28
29
namespace doris {
30
DataQueue::DataQueue(int child_count)
31
5
        : _queue_blocks_lock(child_count),
32
5
          _queue_blocks(child_count),
33
5
          _free_blocks_lock(child_count),
34
5
          _free_blocks(child_count),
35
5
          _child_count(child_count),
36
5
          _is_finished(child_count),
37
5
          _is_canceled(child_count),
38
5
          _cur_bytes_in_queue(child_count),
39
5
          _cur_blocks_nums_in_queue(child_count),
40
5
          _flag_queue_idx(0) {
41
14
    for (int i = 0; i < child_count; ++i) {
42
9
        _queue_blocks_lock[i].reset(new std::mutex());
43
9
        _free_blocks_lock[i].reset(new std::mutex());
44
9
        _is_finished[i] = false;
45
9
        _is_canceled[i] = false;
46
9
        _cur_bytes_in_queue[i] = 0;
47
9
        _cur_blocks_nums_in_queue[i] = 0;
48
9
    }
49
5
    _un_finished_counter = child_count;
50
5
    _sink_dependencies.resize(child_count, nullptr);
51
5
}
52
53
10
std::unique_ptr<Block> DataQueue::get_free_block(int child_idx) {
54
10
    {
55
10
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]));
56
10
        if (!_free_blocks[child_idx].empty()) {
57
1
            auto block = std::move(_free_blocks[child_idx].front());
58
1
            _free_blocks[child_idx].pop_front();
59
1
            return block;
60
1
        }
61
10
    }
62
63
9
    return Block::create_unique();
64
10
}
65
66
10
void DataQueue::push_free_block(std::unique_ptr<Block> block, int child_idx) {
67
10
    DCHECK(block->rows() == 0);
68
69
10
    if (!_is_low_memory_mode) {
70
10
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]));
71
10
        _free_blocks[child_idx].emplace_back(std::move(block));
72
10
    }
73
10
}
74
75
1
void DataQueue::clear_free_blocks() {
76
4
    for (size_t child_idx = 0; child_idx < _free_blocks.size(); ++child_idx) {
77
3
        std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]);
78
3
        std::deque<std::unique_ptr<Block>> tmp_queue;
79
3
        _free_blocks[child_idx].swap(tmp_queue);
80
3
    }
81
1
}
82
83
0
void DataQueue::terminate() {
84
0
    for (int i = 0; i < _queue_blocks.size(); i++) {
85
0
        set_finish(i);
86
0
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_queue_blocks_lock[i]));
87
0
        if (_cur_blocks_nums_in_queue[i] > 0) {
88
0
            _queue_blocks[i].clear();
89
0
            _cur_bytes_in_queue[i] = 0;
90
0
            _cur_blocks_nums_in_queue[i] = 0;
91
0
            _sink_dependencies[i]->set_always_ready();
92
0
        }
93
0
    }
94
0
    clear_free_blocks();
95
0
}
96
97
//check which queue have data, and save the idx in _flag_queue_idx,
98
//so next loop, will check the record idx + 1 first
99
//maybe it's useful with many queue, others maybe always 0
100
167
bool DataQueue::remaining_has_data() {
101
167
    int count = _child_count;
102
203
    while (--count >= 0) {
103
193
        _flag_queue_idx++;
104
193
        if (_flag_queue_idx == _child_count) {
105
65
            _flag_queue_idx = 0;
106
65
        }
107
193
        if (_cur_blocks_nums_in_queue[_flag_queue_idx] > 0) {
108
157
            return true;
109
157
        }
110
193
    }
111
10
    return false;
112
167
}
113
114
//the _flag_queue_idx indicate which queue has data, and in check can_read
115
//will be set idx in remaining_has_data function
116
164
Status DataQueue::get_block_from_queue(std::unique_ptr<Block>* output_block, int* child_idx) {
117
164
    if (_is_canceled[_flag_queue_idx]) {
118
0
        return Status::InternalError("Current queue of idx {} have beed canceled: ",
119
0
                                     _flag_queue_idx);
120
0
    }
121
122
164
    {
123
164
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_queue_blocks_lock[_flag_queue_idx]));
124
164
        if (_cur_blocks_nums_in_queue[_flag_queue_idx] > 0) {
125
162
            *output_block = std::move(_queue_blocks[_flag_queue_idx].front());
126
162
            _queue_blocks[_flag_queue_idx].pop_front();
127
162
            if (child_idx) {
128
162
                *child_idx = _flag_queue_idx;
129
162
            }
130
162
            _cur_bytes_in_queue[_flag_queue_idx] -= (*output_block)->allocated_bytes();
131
162
            _cur_blocks_nums_in_queue[_flag_queue_idx] -= 1;
132
162
            if (_cur_blocks_nums_in_queue[_flag_queue_idx] == 0) {
133
88
                _sink_dependencies[_flag_queue_idx]->set_ready();
134
88
            }
135
162
            auto old_value = _cur_blocks_total_nums.fetch_sub(1);
136
162
            if (old_value == 1 && _source_dependency) {
137
10
                set_source_block();
138
10
            }
139
162
        }
140
164
    }
141
164
    return Status::OK();
142
164
}
143
144
163
Status DataQueue::push_block(std::unique_ptr<Block> block, int child_idx) {
145
163
    if (!block) {
146
0
        return Status::OK();
147
0
    }
148
163
    {
149
163
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_queue_blocks_lock[child_idx]));
150
163
        if (_is_finished[child_idx]) {
151
0
            return Status::EndOfFile("Already finish");
152
0
        }
153
163
        _cur_bytes_in_queue[child_idx] += block->allocated_bytes();
154
163
        _queue_blocks[child_idx].emplace_back(std::move(block));
155
163
        _cur_blocks_nums_in_queue[child_idx] += 1;
156
157
163
        if (_cur_blocks_nums_in_queue[child_idx] > _max_blocks_in_sub_queue) {
158
81
            _sink_dependencies[child_idx]->block();
159
81
        }
160
163
        _cur_blocks_total_nums++;
161
162
163
        set_source_ready();
163
163
    }
164
0
    return Status::OK();
165
163
}
166
167
9
void DataQueue::set_finish(int child_idx) {
168
9
    INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_queue_blocks_lock[child_idx]));
169
9
    if (_is_finished[child_idx]) {
170
0
        return;
171
0
    }
172
9
    _is_finished[child_idx] = true;
173
9
    if (_un_finished_counter.fetch_sub(1) == 1) {
174
5
        _is_all_finished = true;
175
5
    }
176
9
    set_source_ready();
177
9
}
178
179
0
void DataQueue::set_canceled(int child_idx) {
180
0
    INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_queue_blocks_lock[child_idx]));
181
0
    DCHECK(!_is_finished[child_idx]);
182
0
    _is_canceled[child_idx] = true;
183
0
    _is_finished[child_idx] = true;
184
0
    if (_un_finished_counter.fetch_sub(1) == 1) {
185
0
        _is_all_finished = true;
186
0
    }
187
0
    set_source_ready();
188
0
}
189
190
3
bool DataQueue::is_finish(int child_idx) {
191
3
    return _is_finished[child_idx];
192
3
}
193
194
26
bool DataQueue::is_all_finish() {
195
26
    return _is_all_finished;
196
26
}
197
198
172
void DataQueue::set_source_ready() {
199
172
    if (_source_dependency) {
200
172
        std::unique_lock lc(_source_lock);
201
172
        _source_dependency->set_ready();
202
172
    }
203
172
}
204
205
10
void DataQueue::set_source_block() {
206
10
    if (_cur_blocks_total_nums == 0 && !is_all_finish()) {
207
7
        std::unique_lock lc(_source_lock);
208
        // Performing the judgment twice, attempting to avoid blocking the source as much as possible.
209
7
        if (_cur_blocks_total_nums == 0 && !is_all_finish()) {
210
7
            _source_dependency->block();
211
7
        }
212
7
    }
213
10
}
214
215
} // namespace doris