Coverage Report

Created: 2026-03-14 13:34

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/operator/data_queue.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/operator/data_queue.h"
19
20
#include <glog/logging.h>
21
22
#include <algorithm>
23
#include <mutex>
24
#include <utility>
25
26
#include "core/block/block.h"
27
#include "exec/pipeline/dependency.h"
28
29
namespace doris {
30
#include "common/compile_check_begin.h"
31
DataQueue::DataQueue(int child_count)
32
3.68k
        : _queue_blocks_lock(child_count),
33
3.68k
          _queue_blocks(child_count),
34
3.68k
          _free_blocks_lock(child_count),
35
3.68k
          _free_blocks(child_count),
36
3.68k
          _child_count(child_count),
37
3.68k
          _is_finished(child_count),
38
3.68k
          _is_canceled(child_count),
39
3.68k
          _cur_bytes_in_queue(child_count),
40
3.68k
          _cur_blocks_nums_in_queue(child_count),
41
3.68k
          _flag_queue_idx(0) {
42
11.0k
    for (int i = 0; i < child_count; ++i) {
43
7.31k
        _queue_blocks_lock[i].reset(new std::mutex());
44
7.31k
        _free_blocks_lock[i].reset(new std::mutex());
45
7.31k
        _is_finished[i] = false;
46
7.31k
        _is_canceled[i] = false;
47
7.31k
        _cur_bytes_in_queue[i] = 0;
48
7.31k
        _cur_blocks_nums_in_queue[i] = 0;
49
7.31k
    }
50
3.68k
    _un_finished_counter = child_count;
51
3.68k
    _sink_dependencies.resize(child_count, nullptr);
52
3.68k
}
53
54
7.07k
std::unique_ptr<Block> DataQueue::get_free_block(int child_idx) {
55
7.07k
    {
56
7.07k
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]));
57
7.07k
        if (!_free_blocks[child_idx].empty()) {
58
3
            auto block = std::move(_free_blocks[child_idx].front());
59
3
            _free_blocks[child_idx].pop_front();
60
3
            return block;
61
3
        }
62
7.07k
    }
63
64
7.06k
    return Block::create_unique();
65
7.07k
}
66
67
7.06k
void DataQueue::push_free_block(std::unique_ptr<Block> block, int child_idx) {
68
7.06k
    DCHECK(block->rows() == 0);
69
70
7.06k
    if (!_is_low_memory_mode) {
71
7.06k
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]));
72
7.06k
        _free_blocks[child_idx].emplace_back(std::move(block));
73
7.06k
    }
74
7.06k
}
75
76
3.33k
void DataQueue::clear_free_blocks() {
77
10.3k
    for (size_t child_idx = 0; child_idx < _free_blocks.size(); ++child_idx) {
78
7.05k
        std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]);
79
7.05k
        std::deque<std::unique_ptr<Block>> tmp_queue;
80
7.05k
        _free_blocks[child_idx].swap(tmp_queue);
81
7.05k
    }
82
3.33k
}
83
84
3.32k
void DataQueue::terminate() {
85
10.3k
    for (int i = 0; i < _queue_blocks.size(); i++) {
86
7.05k
        set_finish(i);
87
7.05k
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_queue_blocks_lock[i]));
88
7.05k
        if (_cur_blocks_nums_in_queue[i] > 0) {
89
4
            _queue_blocks[i].clear();
90
4
            _cur_bytes_in_queue[i] = 0;
91
4
            _cur_blocks_nums_in_queue[i] = 0;
92
4
            _sink_dependencies[i]->set_always_ready();
93
4
        }
94
7.05k
    }
95
3.32k
    clear_free_blocks();
96
3.32k
}
97
98
//check which queue have data, and save the idx in _flag_queue_idx,
99
//so next loop, will check the record idx + 1 first
100
//maybe it's useful with many queue, others maybe always 0
101
27.0k
bool DataQueue::remaining_has_data() {
102
27.0k
    int count = _child_count;
103
76.9k
    while (--count >= 0) {
104
57.1k
        _flag_queue_idx++;
105
57.1k
        if (_flag_queue_idx == _child_count) {
106
23.4k
            _flag_queue_idx = 0;
107
23.4k
        }
108
57.1k
        if (_cur_blocks_nums_in_queue[_flag_queue_idx] > 0) {
109
7.21k
            return true;
110
7.21k
        }
111
57.1k
    }
112
19.8k
    return false;
113
27.0k
}
114
115
//the _flag_queue_idx indicate which queue has data, and in check can_read
116
//will be set idx in remaining_has_data function
117
20.4k
Status DataQueue::get_block_from_queue(std::unique_ptr<Block>* output_block, int* child_idx) {
118
20.4k
    if (_is_canceled[_flag_queue_idx]) {
119
0
        return Status::InternalError("Current queue of idx {} have beed canceled: ",
120
0
                                     _flag_queue_idx);
121
0
    }
122
123
20.4k
    {
124
20.4k
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_queue_blocks_lock[_flag_queue_idx]));
125
20.4k
        if (_cur_blocks_nums_in_queue[_flag_queue_idx] > 0) {
126
7.24k
            *output_block = std::move(_queue_blocks[_flag_queue_idx].front());
127
7.24k
            _queue_blocks[_flag_queue_idx].pop_front();
128
7.24k
            if (child_idx) {
129
7.23k
                *child_idx = _flag_queue_idx;
130
7.23k
            }
131
7.24k
            _cur_bytes_in_queue[_flag_queue_idx] -= (*output_block)->allocated_bytes();
132
7.24k
            _cur_blocks_nums_in_queue[_flag_queue_idx] -= 1;
133
7.24k
            if (_cur_blocks_nums_in_queue[_flag_queue_idx] == 0) {
134
7.16k
                _sink_dependencies[_flag_queue_idx]->set_ready();
135
7.16k
            }
136
7.24k
            auto old_value = _cur_blocks_total_nums.fetch_sub(1);
137
7.24k
            if (old_value == 1 && _source_dependency) {
138
6.76k
                set_source_block();
139
6.76k
            }
140
7.24k
        }
141
20.4k
    }
142
20.4k
    return Status::OK();
143
20.4k
}
144
145
7.23k
Status DataQueue::push_block(std::unique_ptr<Block> block, int child_idx) {
146
7.23k
    if (!block) {
147
0
        return Status::OK();
148
0
    }
149
7.23k
    {
150
7.23k
        INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_queue_blocks_lock[child_idx]));
151
7.23k
        if (_is_finished[child_idx]) {
152
1
            return Status::EndOfFile("Already finish");
153
1
        }
154
7.23k
        _cur_bytes_in_queue[child_idx] += block->allocated_bytes();
155
7.23k
        _queue_blocks[child_idx].emplace_back(std::move(block));
156
7.23k
        _cur_blocks_nums_in_queue[child_idx] += 1;
157
158
7.23k
        if (_cur_blocks_nums_in_queue[child_idx] > _max_blocks_in_sub_queue) {
159
80
            _sink_dependencies[child_idx]->block();
160
80
        }
161
7.23k
        _cur_blocks_total_nums++;
162
163
7.23k
        set_source_ready();
164
7.23k
    }
165
0
    return Status::OK();
166
7.23k
}
167
168
14.4k
void DataQueue::set_finish(int child_idx) {
169
14.4k
    INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_queue_blocks_lock[child_idx]));
170
14.4k
    if (_is_finished[child_idx]) {
171
7.06k
        return;
172
7.06k
    }
173
7.43k
    _is_finished[child_idx] = true;
174
7.43k
    if (_un_finished_counter.fetch_sub(1) == 1) {
175
3.70k
        _is_all_finished = true;
176
3.70k
    }
177
7.43k
    set_source_ready();
178
7.43k
}
179
180
0
void DataQueue::set_canceled(int child_idx) {
181
0
    INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(*_queue_blocks_lock[child_idx]));
182
0
    DCHECK(!_is_finished[child_idx]);
183
0
    _is_canceled[child_idx] = true;
184
0
    _is_finished[child_idx] = true;
185
0
    if (_un_finished_counter.fetch_sub(1) == 1) {
186
0
        _is_all_finished = true;
187
0
    }
188
0
    set_source_ready();
189
0
}
190
191
3
bool DataQueue::is_finish(int child_idx) {
192
3
    return _is_finished[child_idx];
193
3
}
194
195
26.7k
bool DataQueue::is_all_finish() {
196
26.7k
    return _is_all_finished;
197
26.7k
}
198
199
14.6k
void DataQueue::set_source_ready() {
200
14.6k
    if (_source_dependency) {
201
14.6k
        std::unique_lock lc(_source_lock);
202
14.6k
        _source_dependency->set_ready();
203
14.6k
    }
204
14.6k
}
205
206
6.76k
void DataQueue::set_source_block() {
207
6.76k
    if (_cur_blocks_total_nums == 0 && !is_all_finish()) {
208
3.43k
        std::unique_lock lc(_source_lock);
209
        // Performing the judgment twice, attempting to avoid blocking the source as much as possible.
210
3.43k
        if (_cur_blocks_total_nums == 0 && !is_all_finish()) {
211
3.43k
            _source_dependency->block();
212
3.43k
        }
213
3.43k
    }
214
6.76k
}
215
216
} // namespace doris