Coverage Report

Created: 2026-04-10 04:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/operator/data_queue.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
#pragma once
18
19
#include <atomic>
20
#include <cstdint>
21
#include <deque>
22
#include <memory>
23
#include <mutex>
24
#include <vector>
25
26
#include "common/status.h"
27
#include "core/block/block.h"
28
29
namespace doris {
30
31
class Dependency;
32
33
class DataQueue {
34
public:
35
    //always one is enough, but in union node it's has more children
36
    DataQueue(int child_count = 1);
37
5
    ~DataQueue() = default;
38
39
    Status get_block_from_queue(std::unique_ptr<Block>* block, int* child_idx = nullptr);
40
41
    Status push_block(std::unique_ptr<Block> block, int child_idx = 0);
42
43
    std::unique_ptr<Block> get_free_block(int child_idx = 0);
44
45
    void push_free_block(std::unique_ptr<Block> output_block, int child_idx = 0);
46
47
    void clear_free_blocks();
48
49
    void set_finish(int child_idx = 0);
50
    void set_canceled(int child_idx = 0); // should set before finish
51
    bool is_finish(int child_idx = 0);
52
    bool is_all_finish();
53
54
    // This function is not thread safe, should be called in Operator::get_block()
55
    bool remaining_has_data();
56
57
0
    bool has_more_data() const { return _cur_blocks_total_nums.load() > 0; }
58
59
0
    int64_t max_bytes_in_queue() const { return _max_bytes_in_queue; }
60
0
    int64_t max_size_of_queue() const { return _max_size_of_queue; }
61
62
5
    void set_source_dependency(std::shared_ptr<Dependency> source_dependency) {
63
5
        _source_dependency = source_dependency;
64
5
    }
65
9
    void set_sink_dependency(Dependency* sink_dependency, int child_idx) {
66
9
        _sink_dependencies[child_idx] = sink_dependency;
67
9
    }
68
69
    void set_source_ready();
70
    void set_source_block();
71
72
6
    void set_max_blocks_in_sub_queue(int64_t max_blocks) { _max_blocks_in_sub_queue = max_blocks; }
73
74
0
    void set_low_memory_mode() {
75
0
        _is_low_memory_mode = true;
76
0
        _max_blocks_in_sub_queue = 1;
77
0
        clear_free_blocks();
78
0
    }
79
80
    void terminate();
81
82
private:
83
    std::vector<std::unique_ptr<std::mutex>> _queue_blocks_lock;
84
    std::vector<std::deque<std::unique_ptr<Block>>> _queue_blocks;
85
86
    std::vector<std::unique_ptr<std::mutex>> _free_blocks_lock;
87
    std::vector<std::deque<std::unique_ptr<Block>>> _free_blocks;
88
89
    //how many deque will be init, always will be one
90
    int _child_count = 0;
91
    std::vector<std::atomic_bool> _is_finished;
92
    std::atomic_uint32_t _un_finished_counter;
93
    std::atomic_bool _is_all_finished = false;
94
    std::vector<std::atomic_bool> _is_canceled;
95
    // int64_t just for counter of profile
96
    std::vector<std::atomic_int64_t> _cur_bytes_in_queue;
97
    std::vector<std::atomic_uint32_t> _cur_blocks_nums_in_queue;
98
    std::atomic_uint32_t _cur_blocks_total_nums = 0;
99
100
    //this will be indicate which queue has data, it's useful when have many queues
101
    std::atomic_int _flag_queue_idx = 0;
102
    // only used by streaming agg source operator
103
104
    std::atomic_bool _is_low_memory_mode = false;
105
    std::atomic_int64_t _max_blocks_in_sub_queue = 1;
106
107
    //this only use to record the queue[0] for profile
108
    int64_t _max_bytes_in_queue = 0;
109
    int64_t _max_size_of_queue = 0;
110
    static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024L * 1024 * 1024 / 10;
111
112
    // data queue is multi sink one source
113
    std::shared_ptr<Dependency> _source_dependency = nullptr;
114
    std::vector<Dependency*> _sink_dependencies;
115
    std::mutex _source_lock;
116
};
117
118
} // namespace doris