be/src/runtime/external_scan_context_mgr.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "runtime/external_scan_context_mgr.h" |
19 | | |
20 | | #include <glog/logging.h> |
21 | | // IWYU pragma: no_include <bits/chrono.h> |
22 | | #include <chrono> // IWYU pragma: keep |
23 | | #include <ostream> |
24 | | #include <vector> |
25 | | |
26 | | #include "common/config.h" |
27 | | #include "common/metrics/doris_metrics.h" |
28 | | #include "common/metrics/metrics.h" |
29 | | #include "runtime/exec_env.h" |
30 | | #include "runtime/fragment_mgr.h" |
31 | | #include "runtime/result_queue_mgr.h" |
32 | | #include "util/thread.h" |
33 | | #include "util/uid_util.h" |
34 | | |
35 | | namespace doris { |
36 | | |
37 | | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(active_scan_context_count, MetricUnit::NOUNIT); |
38 | | |
39 | | ExternalScanContextMgr::ExternalScanContextMgr(ExecEnv* exec_env) |
40 | 11 | : _exec_env(exec_env), _stop_background_threads_latch(1) { |
41 | | // start the reaper thread for gc the expired context |
42 | 11 | CHECK(Thread::create( |
43 | 11 | "ExternalScanContextMgr", "gc_expired_context", |
44 | 11 | [this]() { this->gc_expired_context(); }, &_keep_alive_reaper) |
45 | 11 | .ok()); |
46 | | |
47 | 11 | REGISTER_HOOK_METRIC(active_scan_context_count, [this]() { |
48 | | // std::lock_guard<std::mutex> l(_lock); |
49 | 11 | return _active_contexts.size(); |
50 | 11 | }); |
51 | 11 | } |
52 | | |
53 | 3 | void ExternalScanContextMgr::stop() { |
54 | 3 | DEREGISTER_HOOK_METRIC(active_scan_context_count); |
55 | 3 | _stop_background_threads_latch.count_down(); |
56 | 3 | if (_keep_alive_reaper) { |
57 | 3 | _keep_alive_reaper->join(); |
58 | 3 | } |
59 | 3 | } |
60 | | |
61 | 6 | Status ExternalScanContextMgr::create_scan_context(std::shared_ptr<ScanContext>* p_context) { |
62 | 6 | std::string context_id = generate_uuid_string(); |
63 | 6 | std::shared_ptr<ScanContext> context(new ScanContext(context_id)); |
64 | | // context->last_access_time = time(nullptr); |
65 | 6 | { |
66 | 6 | std::lock_guard<std::mutex> l(_lock); |
67 | 6 | _active_contexts.insert(std::make_pair(context_id, context)); |
68 | 6 | } |
69 | 6 | *p_context = context; |
70 | 6 | return Status::OK(); |
71 | 6 | } |
72 | | |
73 | | Status ExternalScanContextMgr::get_scan_context(const std::string& context_id, |
74 | 8 | std::shared_ptr<ScanContext>* p_context) { |
75 | 8 | { |
76 | 8 | std::lock_guard<std::mutex> l(_lock); |
77 | 8 | auto iter = _active_contexts.find(context_id); |
78 | 8 | if (iter != _active_contexts.end()) { |
79 | 6 | *p_context = iter->second; |
80 | 6 | } else { |
81 | 2 | LOG(WARNING) << "get_scan_context error: context id [ " << context_id << " ] not found"; |
82 | 2 | std::stringstream msg; |
83 | 2 | msg << "context_id: " << context_id << " not found"; |
84 | 2 | return Status::NotFound(msg.str()); |
85 | 2 | } |
86 | 8 | } |
87 | 6 | return Status::OK(); |
88 | 8 | } |
89 | | |
90 | 3 | Status ExternalScanContextMgr::clear_scan_context(const std::string& context_id) { |
91 | 3 | std::shared_ptr<ScanContext> context; |
92 | 3 | { |
93 | 3 | std::lock_guard<std::mutex> l(_lock); |
94 | 3 | auto iter = _active_contexts.find(context_id); |
95 | 3 | if (iter != _active_contexts.end()) { |
96 | 3 | context = iter->second; |
97 | 3 | if (context == nullptr) { |
98 | 0 | _active_contexts.erase(context_id); |
99 | 0 | return Status::OK(); |
100 | 0 | } |
101 | 3 | iter = _active_contexts.erase(iter); |
102 | 3 | } |
103 | 3 | } |
104 | 3 | if (context != nullptr) { |
105 | | // first cancel the fragment instance, just ignore return status |
106 | 3 | _exec_env->fragment_mgr()->cancel_query(context->query_id, |
107 | 3 | Status::InternalError("cancelled by clear thread")); |
108 | | // clear the fragment instance's related result queue |
109 | 3 | static_cast<void>(_exec_env->result_queue_mgr()->cancel(context->fragment_instance_id)); |
110 | 3 | LOG(INFO) << "close scan context: context id [ " << context_id << " ]"; |
111 | 3 | } |
112 | 3 | return Status::OK(); |
113 | 3 | } |
114 | | |
115 | 7 | void ExternalScanContextMgr::gc_expired_context() { |
116 | 7 | #ifndef BE_TEST |
117 | 49 | while (!_stop_background_threads_latch.wait_for( |
118 | 49 | std::chrono::seconds(doris::config::scan_context_gc_interval_min * 60))) { |
119 | 42 | time_t current_time = time(nullptr); |
120 | 42 | std::vector<std::shared_ptr<ScanContext>> expired_contexts; |
121 | 42 | { |
122 | 42 | std::lock_guard<std::mutex> l(_lock); |
123 | 44 | for (auto iter = _active_contexts.begin(); iter != _active_contexts.end();) { |
124 | 2 | auto context = iter->second; |
125 | 2 | if (context == nullptr) { |
126 | 0 | iter = _active_contexts.erase(iter); |
127 | 0 | continue; |
128 | 0 | } |
129 | | // being processed or timeout is disabled |
130 | 2 | if (context->last_access_time == -1) { |
131 | 0 | ++iter; // advance one entry |
132 | 0 | continue; |
133 | 0 | } |
134 | | // free context if context is idle for context->keep_alive_min |
135 | 2 | if (current_time - context->last_access_time > context->keep_alive_min * 60) { |
136 | 1 | LOG(INFO) << "gc expired scan context: context id [ " << context->context_id |
137 | 1 | << " ]"; |
138 | 1 | expired_contexts.push_back(context); |
139 | 1 | iter = _active_contexts.erase(iter); |
140 | 1 | } else { |
141 | 1 | ++iter; // advanced |
142 | 1 | } |
143 | 2 | } |
144 | 42 | } |
145 | 42 | for (auto expired_context : expired_contexts) { |
146 | | // must cancel the fragment instance, otherwise return thrift transport TTransportException |
147 | 1 | _exec_env->fragment_mgr()->cancel_query( |
148 | 1 | expired_context->query_id, Status::InternalError("scan context is expired")); |
149 | 1 | static_cast<void>( |
150 | 1 | _exec_env->result_queue_mgr()->cancel(expired_context->fragment_instance_id)); |
151 | 1 | } |
152 | 42 | } |
153 | 7 | #endif |
154 | 7 | } |
155 | | } // namespace doris |