be/src/runtime/result_queue_mgr.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "runtime/result_queue_mgr.h" |
19 | | |
20 | | #include <gen_cpp/Types_types.h> |
21 | | |
22 | | #include <utility> |
23 | | |
24 | | #include "common/config.h" |
25 | | #include "common/metrics/doris_metrics.h" |
26 | | #include "common/metrics/metrics.h" |
27 | | #include "common/status.h" |
28 | | #include "runtime/record_batch_queue.h" |
29 | | #include "util/hash_util.hpp" |
30 | | |
31 | | namespace doris { |
32 | | |
33 | | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(result_block_queue_count, MetricUnit::NOUNIT); |
34 | | |
35 | 17 | ResultQueueMgr::ResultQueueMgr() { |
36 | | // Each BlockingQueue has a limited size (default 20, by config::max_memory_sink_batch_count), |
37 | | // it's not needed to count the actual size of all BlockingQueue. |
38 | 17 | REGISTER_HOOK_METRIC(result_block_queue_count, [this]() { |
39 | | // std::lock_guard<std::mutex> l(_lock); |
40 | 17 | return _fragment_queue_map.size(); |
41 | 17 | }); |
42 | 17 | } |
43 | | |
44 | 13 | ResultQueueMgr::~ResultQueueMgr() { |
45 | 13 | DEREGISTER_HOOK_METRIC(result_block_queue_count); |
46 | 13 | } |
47 | | |
48 | | Status ResultQueueMgr::fetch_result(const TUniqueId& fragment_instance_id, |
49 | 7 | std::shared_ptr<arrow::RecordBatch>* result, bool* eos) { |
50 | 7 | BlockQueueSharedPtr queue; |
51 | 7 | { |
52 | 7 | std::lock_guard<std::mutex> l(_lock); |
53 | 7 | auto iter = _fragment_queue_map.find(fragment_instance_id); |
54 | 7 | if (_fragment_queue_map.end() != iter) { |
55 | 7 | queue = iter->second; |
56 | 7 | } else { |
57 | 0 | return Status::InternalError("fragment_instance_id does not exists"); |
58 | 0 | } |
59 | 7 | } |
60 | | // check queue status before get result |
61 | 7 | RETURN_IF_ERROR(queue->status()); |
62 | 7 | bool success = queue->blocking_get(result); |
63 | 7 | if (success) { |
64 | | // sentinel nullptr indicates scan end |
65 | 7 | if (*result == nullptr) { |
66 | 3 | *eos = true; |
67 | | // put sentinel for consistency, avoid repeated invoking fetch result when have no rowbatch |
68 | 3 | queue->blocking_put(nullptr); |
69 | 4 | } else { |
70 | 4 | *eos = false; |
71 | 4 | } |
72 | 7 | } else { |
73 | 0 | *eos = true; |
74 | 0 | } |
75 | 7 | return Status::OK(); |
76 | 7 | } |
77 | | |
78 | | void ResultQueueMgr::create_queue(const TUniqueId& fragment_instance_id, |
79 | 10 | BlockQueueSharedPtr* queue) { |
80 | 10 | std::lock_guard<std::mutex> l(_lock); |
81 | 10 | auto iter = _fragment_queue_map.find(fragment_instance_id); |
82 | 10 | if (iter != _fragment_queue_map.end()) { |
83 | 1 | *queue = iter->second; |
84 | 9 | } else { |
85 | | // max_elements will not take effect, because when queue size reaches max_memory_sink_batch_count, |
86 | | // MemoryScratchSink will block queue dependency, in this way, one queue have 20 * 1024 rows at most. |
87 | | // use MemoryScratchSink queue dependency instead of BlockingQueue to achieve blocking. |
88 | 9 | BlockQueueSharedPtr tmp(new RecordBatchQueue(config::max_memory_sink_batch_count * 2)); |
89 | 9 | _fragment_queue_map.insert(std::make_pair(fragment_instance_id, tmp)); |
90 | 9 | *queue = tmp; |
91 | 9 | } |
92 | 10 | } |
93 | | |
94 | 6 | Status ResultQueueMgr::cancel(const TUniqueId& fragment_instance_id) { |
95 | 6 | std::lock_guard<std::mutex> l(_lock); |
96 | 6 | auto iter = _fragment_queue_map.find(fragment_instance_id); |
97 | 6 | if (iter != _fragment_queue_map.end()) { |
98 | | // first remove RecordBatch from queue |
99 | | // avoid MemoryScratchSink block on send or close operation |
100 | 5 | iter->second->shutdown(); |
101 | | // remove this queue from map |
102 | 5 | _fragment_queue_map.erase(fragment_instance_id); |
103 | 5 | } |
104 | 6 | return Status::OK(); |
105 | 6 | } |
106 | | |
107 | | void ResultQueueMgr::update_queue_status(const TUniqueId& fragment_instance_id, |
108 | 0 | const Status& status) { |
109 | 0 | if (status.ok()) { |
110 | 0 | return; |
111 | 0 | } |
112 | 0 | std::lock_guard<std::mutex> l(_lock); |
113 | 0 | auto iter = _fragment_queue_map.find(fragment_instance_id); |
114 | 0 | if (iter != _fragment_queue_map.end()) { |
115 | 0 | iter->second->update_status(status); |
116 | 0 | } |
117 | 0 | } |
118 | | |
119 | | } // namespace doris |