Coverage Report

Created: 2026-03-12 14:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/operator/data_queue.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
#pragma once
18
19
#include <atomic>
20
#include <cstdint>
21
#include <deque>
22
#include <memory>
23
#include <mutex>
24
#include <vector>
25
26
#include "common/status.h"
27
#include "core/block/block.h"
28
29
namespace doris {
30
#include "common/compile_check_begin.h"
31
32
class Dependency;
33
34
class DataQueue {
35
public:
36
    //always one is enough, but in union node it's has more children
37
    DataQueue(int child_count = 1);
38
3.60k
    ~DataQueue() = default;
39
40
    Status get_block_from_queue(std::unique_ptr<Block>* block, int* child_idx = nullptr);
41
42
    Status push_block(std::unique_ptr<Block> block, int child_idx = 0);
43
44
    std::unique_ptr<Block> get_free_block(int child_idx = 0);
45
46
    void push_free_block(std::unique_ptr<Block> output_block, int child_idx = 0);
47
48
    void clear_free_blocks();
49
50
    void set_finish(int child_idx = 0);
51
    void set_canceled(int child_idx = 0); // should set before finish
52
    bool is_finish(int child_idx = 0);
53
    bool is_all_finish();
54
55
    // This function is not thread safe, should be called in Operator::get_block()
56
    bool remaining_has_data();
57
58
0
    bool has_more_data() const { return _cur_blocks_total_nums.load() > 0; }
59
60
0
    int64_t max_bytes_in_queue() const { return _max_bytes_in_queue; }
61
0
    int64_t max_size_of_queue() const { return _max_size_of_queue; }
62
63
3.59k
    void set_source_dependency(std::shared_ptr<Dependency> source_dependency) {
64
3.59k
        _source_dependency = source_dependency;
65
3.59k
    }
66
7.14k
    void set_sink_dependency(Dependency* sink_dependency, int child_idx) {
67
7.14k
        _sink_dependencies[child_idx] = sink_dependency;
68
7.14k
    }
69
70
    void set_source_ready();
71
    void set_source_block();
72
73
7.18k
    void set_max_blocks_in_sub_queue(int64_t max_blocks) { _max_blocks_in_sub_queue = max_blocks; }
74
75
0
    void set_low_memory_mode() {
76
0
        _is_low_memory_mode = true;
77
0
        _max_blocks_in_sub_queue = 1;
78
0
        clear_free_blocks();
79
0
    }
80
81
    void terminate();
82
83
private:
84
    std::vector<std::unique_ptr<std::mutex>> _queue_blocks_lock;
85
    std::vector<std::deque<std::unique_ptr<Block>>> _queue_blocks;
86
87
    std::vector<std::unique_ptr<std::mutex>> _free_blocks_lock;
88
    std::vector<std::deque<std::unique_ptr<Block>>> _free_blocks;
89
90
    //how many deque will be init, always will be one
91
    int _child_count = 0;
92
    std::vector<std::atomic_bool> _is_finished;
93
    std::atomic_uint32_t _un_finished_counter;
94
    std::atomic_bool _is_all_finished = false;
95
    std::vector<std::atomic_bool> _is_canceled;
96
    // int64_t just for counter of profile
97
    std::vector<std::atomic_int64_t> _cur_bytes_in_queue;
98
    std::vector<std::atomic_uint32_t> _cur_blocks_nums_in_queue;
99
    std::atomic_uint32_t _cur_blocks_total_nums = 0;
100
101
    //this will be indicate which queue has data, it's useful when have many queues
102
    std::atomic_int _flag_queue_idx = 0;
103
    // only used by streaming agg source operator
104
105
    std::atomic_bool _is_low_memory_mode = false;
106
    std::atomic_int64_t _max_blocks_in_sub_queue = 1;
107
108
    //this only use to record the queue[0] for profile
109
    int64_t _max_bytes_in_queue = 0;
110
    int64_t _max_size_of_queue = 0;
111
    static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024L * 1024 * 1024 / 10;
112
113
    // data queue is multi sink one source
114
    std::shared_ptr<Dependency> _source_dependency = nullptr;
115
    std::vector<Dependency*> _sink_dependencies;
116
    std::mutex _source_lock;
117
};
118
119
#include "common/compile_check_end.h"
120
} // namespace doris