/root/doris/be/src/runtime/exec_env.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "runtime/exec_env.h" |
19 | | |
20 | | #include <gen_cpp/HeartbeatService_types.h> |
21 | | #include <glog/logging.h> |
22 | | |
23 | | #include <mutex> |
24 | | #include <utility> |
25 | | |
26 | | #include "common/config.h" |
27 | | #include "common/logging.h" |
28 | | #include "olap/olap_define.h" |
29 | | #include "olap/storage_engine.h" |
30 | | #include "olap/tablet_manager.h" |
31 | | #include "runtime/fragment_mgr.h" |
32 | | #include "runtime/frontend_info.h" |
33 | | #include "runtime/load_stream_mgr.h" |
34 | | #include "util/debug_util.h" |
35 | | #include "util/time.h" |
36 | | #include "vec/runtime/vdata_stream_mgr.h" |
37 | | #include "vec/sink/delta_writer_v2_pool.h" |
38 | | #include "vec/sink/load_stream_map_pool.h" |
39 | | |
40 | | namespace doris { |
41 | | |
42 | | #ifdef BE_TEST |
43 | | void ExecEnv::set_inverted_index_searcher_cache( |
44 | 145 | segment_v2::InvertedIndexSearcherCache* inverted_index_searcher_cache) { |
45 | 145 | _inverted_index_searcher_cache = inverted_index_searcher_cache; |
46 | 145 | } |
47 | 432 | void ExecEnv::set_storage_engine(std::unique_ptr<BaseStorageEngine>&& engine) { |
48 | 432 | _storage_engine = std::move(engine); |
49 | 432 | } |
50 | 1 | void ExecEnv::set_write_cooldown_meta_executors() { |
51 | 1 | _write_cooldown_meta_executors = std::make_unique<WriteCooldownMetaExecutors>(); |
52 | 1 | } |
53 | | #endif // BE_TEST |
54 | | |
55 | | Result<BaseTabletSPtr> ExecEnv::get_tablet(int64_t tablet_id, SyncRowsetStats* sync_stats, |
56 | 33 | bool force_use_only_cached, bool cache_on_miss) { |
57 | 33 | auto storage_engine = GetInstance()->_storage_engine.get(); |
58 | 33 | return storage_engine != nullptr |
59 | 33 | ? storage_engine->get_tablet(tablet_id, sync_stats, force_use_only_cached, |
60 | 30 | cache_on_miss) |
61 | 33 | : ResultError(Status::InternalError("failed to get tablet {}", tablet_id)); |
62 | 33 | } |
63 | | |
64 | | Status ExecEnv::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta, |
65 | 26 | bool force_use_only_cached) { |
66 | 26 | auto storage_engine = GetInstance()->_storage_engine.get(); |
67 | 26 | if (storage_engine == nullptr) { |
68 | 20 | return Status::InternalError("storage engine is not initialized"); |
69 | 20 | } |
70 | 6 | return storage_engine->get_tablet_meta(tablet_id, tablet_meta, force_use_only_cached); |
71 | 26 | } |
72 | | |
73 | 4 | const std::string& ExecEnv::token() const { |
74 | 4 | return _cluster_info->token; |
75 | 4 | } |
76 | | |
77 | 0 | void ExecEnv::clear_stream_mgr() { |
78 | 0 | if (_vstream_mgr) { |
79 | 0 | SAFE_DELETE(_vstream_mgr); |
80 | 0 | } |
81 | 0 | } |
82 | | |
83 | 0 | std::vector<TFrontendInfo> ExecEnv::get_frontends() { |
84 | 0 | std::lock_guard<std::mutex> lg(_frontends_lock); |
85 | 0 | std::vector<TFrontendInfo> infos; |
86 | 0 | for (const auto& cur_fe : _frontends) { |
87 | 0 | infos.push_back(cur_fe.second.info); |
88 | 0 | } |
89 | 0 | return infos; |
90 | 0 | } |
91 | | |
92 | 0 | void ExecEnv::update_frontends(const std::vector<TFrontendInfo>& new_fe_infos) { |
93 | 0 | std::lock_guard<std::mutex> lg(_frontends_lock); |
94 | |
|
95 | 0 | std::set<TNetworkAddress> dropped_fes; |
96 | |
|
97 | 0 | for (const auto& cur_fe : _frontends) { |
98 | 0 | dropped_fes.insert(cur_fe.first); |
99 | 0 | } |
100 | |
|
101 | 0 | for (const auto& coming_fe_info : new_fe_infos) { |
102 | 0 | auto itr = _frontends.find(coming_fe_info.coordinator_address); |
103 | |
|
104 | 0 | if (itr == _frontends.end()) { |
105 | 0 | LOG(INFO) << "A completely new frontend, " << PrintFrontendInfo(coming_fe_info); |
106 | |
|
107 | 0 | _frontends.insert(std::pair<TNetworkAddress, FrontendInfo>( |
108 | 0 | coming_fe_info.coordinator_address, |
109 | 0 | FrontendInfo {coming_fe_info, GetCurrentTimeMicros() / 1000, /*first time*/ |
110 | 0 | GetCurrentTimeMicros() / 1000 /*last time*/})); |
111 | |
|
112 | 0 | continue; |
113 | 0 | } |
114 | | |
115 | 0 | dropped_fes.erase(coming_fe_info.coordinator_address); |
116 | |
|
117 | 0 | if (coming_fe_info.process_uuid == 0) { |
118 | 0 | LOG(WARNING) << "Frontend " << PrintFrontendInfo(coming_fe_info) |
119 | 0 | << " is in an unknown state."; |
120 | 0 | } |
121 | |
|
122 | 0 | if (coming_fe_info.process_uuid == itr->second.info.process_uuid) { |
123 | 0 | itr->second.last_reveiving_time_ms = GetCurrentTimeMicros() / 1000; |
124 | 0 | continue; |
125 | 0 | } |
126 | | |
127 | | // If we get here, means this frontend has already restarted. |
128 | 0 | itr->second.info.process_uuid = coming_fe_info.process_uuid; |
129 | 0 | itr->second.first_receiving_time_ms = GetCurrentTimeMicros() / 1000; |
130 | 0 | itr->second.last_reveiving_time_ms = GetCurrentTimeMicros() / 1000; |
131 | 0 | LOG(INFO) << "Update frontend " << PrintFrontendInfo(coming_fe_info); |
132 | 0 | } |
133 | |
|
134 | 0 | for (const auto& dropped_fe : dropped_fes) { |
135 | 0 | LOG(INFO) << "Frontend " << PrintThriftNetworkAddress(dropped_fe) |
136 | 0 | << " has already been dropped, remove it"; |
137 | 0 | _frontends.erase(dropped_fe); |
138 | 0 | } |
139 | 0 | } |
140 | | |
141 | 13 | std::map<TNetworkAddress, FrontendInfo> ExecEnv::get_running_frontends() { |
142 | 13 | std::lock_guard<std::mutex> lg(_frontends_lock); |
143 | 13 | std::map<TNetworkAddress, FrontendInfo> res; |
144 | 13 | const int expired_duration = config::fe_expire_duration_seconds * 1000; |
145 | 13 | const auto now = GetCurrentTimeMicros() / 1000; |
146 | | |
147 | 13 | for (const auto& pair : _frontends) { |
148 | 0 | auto& brpc_addr = pair.first; |
149 | 0 | auto& fe_info = pair.second; |
150 | |
|
151 | 0 | if (fe_info.info.process_uuid == 0) { |
152 | | // FE is in an unknown state, regart it as alive. conservative |
153 | 0 | res[brpc_addr] = fe_info; |
154 | 0 | } else { |
155 | 0 | if (now - fe_info.last_reveiving_time_ms < expired_duration) { |
156 | | // If fe info has just been update in last expired_duration, regard it as running. |
157 | 0 | res[brpc_addr] = fe_info; |
158 | 0 | } else { |
159 | | // Fe info has not been udpate for more than expired_duration, regard it as an abnormal. |
160 | | // Abnormal means this fe can not connect to master, and it is not dropped from cluster. |
161 | | // or fe do not have master yet. |
162 | 0 | LOG_EVERY_N(WARNING, 50) << fmt::format( |
163 | 0 | "Frontend {}:{} has not update its hb for more than {} secs, regard it as " |
164 | 0 | "abnormal", |
165 | 0 | brpc_addr.hostname, brpc_addr.port, config::fe_expire_duration_seconds); |
166 | 0 | } |
167 | 0 | } |
168 | 0 | } |
169 | | |
170 | 13 | return res; |
171 | 13 | } |
172 | | |
173 | 0 | void ExecEnv::wait_for_all_tasks_done() { |
174 | | // For graceful shutdown, need to wait for all running queries to stop |
175 | 0 | int32_t wait_seconds_passed = 0; |
176 | 0 | while (true) { |
177 | 0 | int num_queries = _fragment_mgr->running_query_num(); |
178 | 0 | if (num_queries < 1) { |
179 | 0 | break; |
180 | 0 | } |
181 | 0 | if (wait_seconds_passed > doris::config::grace_shutdown_wait_seconds) { |
182 | 0 | LOG(INFO) << "There are still " << num_queries << " queries running, but " |
183 | 0 | << wait_seconds_passed << " seconds passed, has to exist now"; |
184 | 0 | break; |
185 | 0 | } |
186 | 0 | LOG(INFO) << "There are still " << num_queries << " queries running, waiting..."; |
187 | 0 | sleep(1); |
188 | 0 | ++wait_seconds_passed; |
189 | 0 | } |
190 | | // This is a conservative strategy. |
191 | | // Because a query might still have fragments running on other BE nodes. |
192 | | // In other words, the query hasn't truly terminated. |
193 | | // If the current BE is shut down at this point, |
194 | | // the FE will detect the downtime of a related BE and cancel the entire query, |
195 | | // defeating the purpose of a graceful stop. |
196 | 0 | sleep(config::grace_shutdown_post_delay_seconds); |
197 | 0 | } |
198 | | |
199 | 0 | bool ExecEnv::check_auth_token(const std::string& auth_token) { |
200 | 0 | return _cluster_info->curr_auth_token == auth_token || |
201 | 0 | _cluster_info->last_auth_token == auth_token; |
202 | 0 | } |
203 | | |
204 | | } // namespace doris |