Coverage Report

Created: 2026-05-09 01:43

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/operator/data_queue.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
#pragma once
18
19
#include <atomic>
20
#include <cstdint>
21
#include <deque>
22
#include <memory>
23
#include <vector>
24
25
#include "common/status.h"
26
#include "common/thread_safety_annotations.h"
27
#include "core/block/block.h"
28
29
namespace doris {
30
31
class Dependency;
32
33
// Per child sub-queue. Groups all parallel state so that the lock/field
34
// relationship is explicit and can be checked by clang -Wthread-safety.
35
struct SubQueue {
36
    // Protects the `blocks` deque and serializes high-level state
37
    // transitions (push/pop/finish/cancel) on this sub-queue.
38
    AnnotatedMutex queue_lock;
39
    std::deque<std::unique_ptr<Block>> blocks GUARDED_BY(queue_lock);
40
41
    // The following fields are only accessed while holding queue_lock.
42
    int64_t bytes_in_queue GUARDED_BY(queue_lock) = 0;
43
    bool is_finished GUARDED_BY(queue_lock) = false;
44
45
    // Protects the `free_blocks` deque only.
46
    AnnotatedMutex free_lock;
47
    std::deque<std::unique_ptr<Block>> free_blocks GUARDED_BY(free_lock);
48
49
    // blocks_in_queue is readable from lock-free fast paths (remaining_has_data),
50
    // so it remains atomic and is intentionally not GUARDED_BY.
51
    std::atomic_uint32_t blocks_in_queue {0};
52
53
    // Maximum number of blocks allowed in this sub-queue before the sink is blocked.
54
    // Updated by DataQueue::set_max_blocks_in_sub_queue / set_low_memory_mode.
55
    std::atomic_int64_t max_blocks_in_queue {1};
56
57
    // Set once during init via set_sink_dependency, then read-only.
58
    Dependency* sink_dependency = nullptr;
59
60
    // Pop a block under queue_lock.
61
    // Notifies sink_dependency->set_ready() (outside the lock) if the queue becomes empty.
62
    // output_block is null if the queue was empty.
63
    void try_pop(std::unique_ptr<Block>* output_block);
64
65
    // Push a block under queue_lock and atomically increment total_counter.
66
    // Returns false (without incrementing) if already finished.
67
    // Calls sink_dependency->block() (outside the lock) if the queue exceeds max_blocks_in_queue.
68
    bool try_push(std::unique_ptr<Block> block, std::atomic_uint32_t& total_counter);
69
70
    // Mark this sub-queue finished. Returns false if already finished (idempotent).
71
    // Decrements unfinished_counter and may set all_finished within queue_lock.
72
    bool mark_finished(std::atomic_uint32_t& unfinished_counter, std::atomic_bool& all_finished);
73
74
    // Clear all pending blocks under queue_lock.
75
    // Calls sink_dependency->set_always_ready() (outside the lock) if any blocks were cleared.
76
    void clear_blocks();
77
};
78
79
class DataQueue {
80
public:
81
    //always one is enough, but in union node it's has more children
82
    DataQueue(int child_count = 1);
83
649
    ~DataQueue() = default;
84
85
    Status get_block_from_queue(std::unique_ptr<Block>* block, int* child_idx = nullptr);
86
    Status push_block(std::unique_ptr<Block> block, int child_idx = 0);
87
88
    std::unique_ptr<Block> get_free_block(int child_idx = 0);
89
    void push_free_block(std::unique_ptr<Block> output_block, int child_idx = 0);
90
91
    void set_finish(int child_idx = 0);
92
    bool is_all_finish();
93
94
    // This function is not thread safe, should be called in Operator::get_block()
95
    bool remaining_has_data();
96
    bool has_more_data() const;
97
98
    void set_source_dependency(std::shared_ptr<Dependency> source_dependency)
99
            NO_THREAD_SAFETY_ANALYSIS;
100
    void set_sink_dependency(Dependency* sink_dependency, int child_idx);
101
    void set_max_blocks_in_sub_queue(int64_t max_blocks);
102
    void set_low_memory_mode();
103
104
    void terminate();
105
106
private:
107
    void clear_free_blocks();
108
    void set_source_ready();
109
    void set_source_block();
110
111
    std::vector<std::unique_ptr<SubQueue>> _sub_queues;
112
113
    //how many deque will be init, always will be one
114
    int _child_count = 0;
115
    std::atomic_uint32_t _un_finished_counter = 0;
116
    std::atomic_bool _is_all_finished = false;
117
    std::atomic_uint32_t _cur_blocks_total_nums = 0;
118
119
    //this will be indicate which queue has data, it's useful when have many queues
120
    std::atomic_int _flag_queue_idx = 0;
121
    // only used by streaming agg source operator
122
123
    std::atomic_bool _is_low_memory_mode = false;
124
125
    // _source_dependency is written once during initialization (set_source_dependency)
126
    // and read/used only while holding _source_lock thereafter.
127
    std::shared_ptr<Dependency> _source_dependency GUARDED_BY(_source_lock) = nullptr;
128
    AnnotatedMutex _source_lock;
129
};
130
131
} // namespace doris