Coverage Report

Created: 2026-03-12 17:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/result_queue_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/result_queue_mgr.h"
19
20
#include <gen_cpp/Types_types.h>
21
22
#include <utility>
23
24
#include "common/config.h"
25
#include "common/metrics/doris_metrics.h"
26
#include "common/metrics/metrics.h"
27
#include "common/status.h"
28
#include "runtime/record_batch_queue.h"
29
#include "util/hash_util.hpp"
30
31
namespace doris {
32
33
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(result_block_queue_count, MetricUnit::NOUNIT);
34
35
17
ResultQueueMgr::ResultQueueMgr() {
36
    // Each BlockingQueue has a limited size (default 20, by config::max_memory_sink_batch_count),
37
    // it's not needed to count the actual size of all BlockingQueue.
38
17
    REGISTER_HOOK_METRIC(result_block_queue_count, [this]() {
39
        // std::lock_guard<std::mutex> l(_lock);
40
17
        return _fragment_queue_map.size();
41
17
    });
42
17
}
43
44
13
ResultQueueMgr::~ResultQueueMgr() {
45
13
    DEREGISTER_HOOK_METRIC(result_block_queue_count);
46
13
}
47
48
Status ResultQueueMgr::fetch_result(const TUniqueId& fragment_instance_id,
49
7
                                    std::shared_ptr<arrow::RecordBatch>* result, bool* eos) {
50
7
    BlockQueueSharedPtr queue;
51
7
    {
52
7
        std::lock_guard<std::mutex> l(_lock);
53
7
        auto iter = _fragment_queue_map.find(fragment_instance_id);
54
7
        if (_fragment_queue_map.end() != iter) {
55
7
            queue = iter->second;
56
7
        } else {
57
0
            return Status::InternalError("fragment_instance_id does not exists");
58
0
        }
59
7
    }
60
    // check queue status before get result
61
7
    RETURN_IF_ERROR(queue->status());
62
7
    bool success = queue->blocking_get(result);
63
7
    if (success) {
64
        // sentinel nullptr indicates scan end
65
7
        if (*result == nullptr) {
66
3
            *eos = true;
67
            // put sentinel for consistency, avoid repeated invoking fetch result when have no rowbatch
68
3
            queue->blocking_put(nullptr);
69
4
        } else {
70
4
            *eos = false;
71
4
        }
72
7
    } else {
73
0
        *eos = true;
74
0
    }
75
7
    return Status::OK();
76
7
}
77
78
void ResultQueueMgr::create_queue(const TUniqueId& fragment_instance_id,
79
10
                                  BlockQueueSharedPtr* queue) {
80
10
    std::lock_guard<std::mutex> l(_lock);
81
10
    auto iter = _fragment_queue_map.find(fragment_instance_id);
82
10
    if (iter != _fragment_queue_map.end()) {
83
1
        *queue = iter->second;
84
9
    } else {
85
        // max_elements will not take effect, because when queue size reaches max_memory_sink_batch_count,
86
        // MemoryScratchSink will block queue dependency, in this way, one queue have 20 * 1024 rows at most.
87
        // use MemoryScratchSink queue dependency instead of BlockingQueue to achieve blocking.
88
9
        BlockQueueSharedPtr tmp(new RecordBatchQueue(config::max_memory_sink_batch_count * 2));
89
9
        _fragment_queue_map.insert(std::make_pair(fragment_instance_id, tmp));
90
9
        *queue = tmp;
91
9
    }
92
10
}
93
94
6
Status ResultQueueMgr::cancel(const TUniqueId& fragment_instance_id) {
95
6
    std::lock_guard<std::mutex> l(_lock);
96
6
    auto iter = _fragment_queue_map.find(fragment_instance_id);
97
6
    if (iter != _fragment_queue_map.end()) {
98
        // first remove RecordBatch from queue
99
        // avoid MemoryScratchSink block on send or close operation
100
5
        iter->second->shutdown();
101
        // remove this queue from map
102
5
        _fragment_queue_map.erase(fragment_instance_id);
103
5
    }
104
6
    return Status::OK();
105
6
}
106
107
void ResultQueueMgr::update_queue_status(const TUniqueId& fragment_instance_id,
108
0
                                         const Status& status) {
109
0
    if (status.ok()) {
110
0
        return;
111
0
    }
112
0
    std::lock_guard<std::mutex> l(_lock);
113
0
    auto iter = _fragment_queue_map.find(fragment_instance_id);
114
0
    if (iter != _fragment_queue_map.end()) {
115
0
        iter->second->update_status(status);
116
0
    }
117
0
}
118
119
} // namespace doris