be/src/agent/agent_server.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "agent/agent_server.h" |
19 | | |
20 | | #include <gen_cpp/AgentService_types.h> |
21 | | #include <gen_cpp/HeartbeatService_types.h> |
22 | | #include <gen_cpp/Types_types.h> |
23 | | #include <stdint.h> |
24 | | #include <thrift/protocol/TDebugProtocol.h> |
25 | | |
26 | | #include <filesystem> |
27 | | #include <memory> |
28 | | #include <ostream> |
29 | | #include <string> |
30 | | |
31 | | #include "agent/task_worker_pool.h" |
32 | | #include "agent/topic_subscriber.h" |
33 | | #include "agent/utils.h" |
34 | | #include "agent/workload_group_listener.h" |
35 | | #include "agent/workload_sched_policy_listener.h" |
36 | | #include "cloud/config.h" |
37 | | #include "common/config.h" |
38 | | #include "common/logging.h" |
39 | | #include "common/status.h" |
40 | | #include "runtime/exec_env.h" |
41 | | #include "storage/olap_define.h" |
42 | | #include "storage/options.h" |
43 | | #include "storage/snapshot/snapshot_manager.h" |
44 | | #include "storage/storage_engine.h" |
45 | | #include "util/work_thread_pool.hpp" |
46 | | |
47 | | namespace doris { |
48 | | |
49 | | AgentServer::AgentServer(ExecEnv* exec_env, const ClusterInfo* cluster_info) |
50 | 6 | : _cluster_info(cluster_info), _topic_subscriber(new TopicSubscriber()) { |
51 | 6 | MasterServerClient::create(cluster_info); |
52 | | |
53 | 6 | #if !defined(BE_TEST) && !defined(__APPLE__) |
54 | | // Add subscriber here and register listeners |
55 | 6 | std::unique_ptr<TopicListener> wg_listener = std::make_unique<WorkloadGroupListener>(exec_env); |
56 | 6 | LOG(INFO) << "Register workload group listener"; |
57 | 6 | _topic_subscriber->register_listener(doris::TTopicInfoType::type::WORKLOAD_GROUP, |
58 | 6 | std::move(wg_listener)); |
59 | | |
60 | 6 | std::unique_ptr<TopicListener> policy_listener = |
61 | 6 | std::make_unique<WorkloadschedPolicyListener>(exec_env); |
62 | 6 | LOG(INFO) << "Register workload scheduler policy listener"; |
63 | 6 | _topic_subscriber->register_listener(doris::TTopicInfoType::type::WORKLOAD_SCHED_POLICY, |
64 | 6 | std::move(policy_listener)); |
65 | | |
66 | 6 | #endif |
67 | 6 | } |
68 | | |
69 | 3 | AgentServer::~AgentServer() = default; |
70 | | |
71 | | class PushTaskWorkerPool final : public TaskWorkerPoolIf { |
72 | | public: |
73 | | PushTaskWorkerPool(StorageEngine& engine) |
74 | | : _push_delete_workers( |
75 | 6 | TaskWorkerPool("DELETE", config::delete_worker_count, |
76 | 22 | [&engine](auto&& task) { push_callback(engine, task); })), |
77 | 6 | _push_load_workers(PriorTaskWorkerPool( |
78 | 6 | "PUSH", config::push_worker_count_normal_priority, |
79 | 6 | config::push_worker_count_high_priority, |
80 | 6 | [&engine](auto&& task) { push_callback(engine, task); })) {} |
81 | | |
82 | 3 | ~PushTaskWorkerPool() override { stop(); } |
83 | | |
84 | 3 | void stop() { |
85 | 3 | _push_delete_workers.stop(); |
86 | 3 | _push_load_workers.stop(); |
87 | 3 | } |
88 | | |
89 | 22 | Status submit_task(const TAgentTaskRequest& task) override { |
90 | 22 | if (task.push_req.push_type == TPushType::LOAD_V2) { |
91 | 0 | return _push_load_workers.submit_task(task); |
92 | 22 | } else if (task.push_req.push_type == TPushType::DELETE) { |
93 | 22 | return _push_delete_workers.submit_task(task); |
94 | 22 | } else { |
95 | 0 | return Status::InvalidArgument( |
96 | 0 | "task(signature={}, type={}, push_type={}) has wrong push_type", task.signature, |
97 | 0 | task.task_type, task.push_req.push_type); |
98 | 0 | } |
99 | 22 | } |
100 | | |
101 | | private: |
102 | | TaskWorkerPool _push_delete_workers; |
103 | | PriorTaskWorkerPool _push_load_workers; |
104 | | }; |
105 | | |
106 | 6 | void AgentServer::start_workers(StorageEngine& engine, ExecEnv* exec_env) { |
107 | 10 | for (const auto& path : exec_env->store_paths()) { |
108 | 10 | try { |
109 | 10 | std::string dpp_download_path_str = path.path + "/" + DPP_PREFIX; |
110 | 10 | std::filesystem::path dpp_download_path(dpp_download_path_str); |
111 | 10 | if (std::filesystem::exists(dpp_download_path)) { |
112 | 0 | std::filesystem::remove_all(dpp_download_path); |
113 | 0 | } |
114 | 10 | } catch (...) { |
115 | 0 | LOG(WARNING) << "boost exception when remove dpp download path. path=" << path.path; |
116 | 0 | } |
117 | 10 | } |
118 | | |
119 | | // clang-format off |
120 | 6 | _workers[TTaskType::ALTER_INVERTED_INDEX] = std::make_unique<TaskWorkerPool>( |
121 | 6 | "ALTER_INVERTED_INDEX", config::alter_index_worker_count, [&engine](auto&& task) { return alter_inverted_index_callback(engine, task); }); |
122 | | |
123 | 6 | _workers[TTaskType::CHECK_CONSISTENCY] = std::make_unique<TaskWorkerPool>( |
124 | 6 | "CHECK_CONSISTENCY", config::check_consistency_worker_count, [&engine](auto&& task) { return check_consistency_callback(engine, task); }); |
125 | | |
126 | 6 | _workers[TTaskType::UPLOAD] = std::make_unique<TaskWorkerPool>( |
127 | 16 | "UPLOAD", config::upload_worker_count, [&engine, exec_env](auto&& task) { return upload_callback(engine, exec_env, task); }); |
128 | | |
129 | 6 | _workers[TTaskType::DOWNLOAD] = std::make_unique<TaskWorkerPool>( |
130 | 22 | "DOWNLOAD", config::download_worker_count, [&engine, exec_env](auto&& task) { return download_callback(engine, exec_env, task); }); |
131 | | |
132 | 6 | _workers[TTaskType::MAKE_SNAPSHOT] = std::make_unique<TaskWorkerPool>( |
133 | 320 | "MAKE_SNAPSHOT", config::make_snapshot_worker_count, [&engine](auto&& task) { return make_snapshot_callback(engine, task); }); |
134 | | |
135 | 6 | _workers[TTaskType::RELEASE_SNAPSHOT] = std::make_unique<TaskWorkerPool>( |
136 | 320 | "RELEASE_SNAPSHOT", config::release_snapshot_worker_count, [&engine](auto&& task) { return release_snapshot_callback(engine, task); }); |
137 | | |
138 | 6 | _workers[TTaskType::MOVE] = std::make_unique<TaskWorkerPool>( |
139 | 172 | "MOVE", 1, [&engine, exec_env](auto&& task) { return move_dir_callback(engine, exec_env, task); }); |
140 | | |
141 | 6 | _workers[TTaskType::COMPACTION] = std::make_unique<TaskWorkerPool>( |
142 | 6 | "SUBMIT_TABLE_COMPACTION", 1, [&engine](auto&& task) { return submit_table_compaction_callback(engine, task); }); |
143 | | |
144 | 6 | _workers[TTaskType::PUSH_STORAGE_POLICY] = std::make_unique<TaskWorkerPool>( |
145 | 9 | "PUSH_STORAGE_POLICY", 1, [&engine](auto&& task) { return push_storage_policy_callback(engine, task); }); |
146 | | |
147 | 6 | _workers[TTaskType::PUSH_INDEX_POLICY] = std::make_unique<TaskWorkerPool>( |
148 | 6 | "PUSH_INDEX_POLICY", 1, [](auto&& task) { return push_index_policy_callback(task); }); |
149 | | |
150 | 6 | _workers[TTaskType::PUSH_COOLDOWN_CONF] = std::make_unique<TaskWorkerPool>( |
151 | 6 | "PUSH_COOLDOWN_CONF", 1, [&engine](auto&& task) { return push_cooldown_conf_callback(engine, task); }); |
152 | | |
153 | 6 | _workers[TTaskType::CREATE] = std::make_unique<TaskWorkerPool>( |
154 | 6.06k | "CREATE_TABLE", config::create_tablet_worker_count, [&engine](auto&& task) { return create_tablet_callback(engine, task); }); |
155 | | |
156 | 6 | _workers[TTaskType::DROP] = std::make_unique<TaskWorkerPool>( |
157 | 11.4k | "DROP_TABLE", config::drop_tablet_worker_count, [&engine](auto&& task) { return drop_tablet_callback(engine, task); }); |
158 | | |
159 | 6 | _workers[TTaskType::PUBLISH_VERSION] = std::make_unique<PublishVersionWorkerPool>(engine); |
160 | | |
161 | 6 | _workers[TTaskType::CLEAR_TRANSACTION_TASK] = std::make_unique<TaskWorkerPool>( |
162 | 14 | "CLEAR_TRANSACTION_TASK", config::clear_transaction_task_worker_count, [&engine](auto&& task) { return clear_transaction_task_callback(engine, task); }); |
163 | | |
164 | 6 | _workers[TTaskType::PUSH] = std::make_unique<PushTaskWorkerPool>(engine); |
165 | | |
166 | 6 | _workers[TTaskType::UPDATE_TABLET_META_INFO] = std::make_unique<TaskWorkerPool>( |
167 | 6 | "UPDATE_TABLET_META_INFO", 1, [&engine](auto&& task) { return update_tablet_meta_callback(engine, task); }); |
168 | | |
169 | 6 | _workers[TTaskType::ALTER] = std::make_unique<TaskWorkerPool>( |
170 | 6 | "ALTER_TABLE", config::alter_tablet_worker_count, [&engine](auto&& task) { return alter_tablet_callback(engine, task); }); |
171 | | |
172 | 6 | _workers[TTaskType::CLONE] = std::make_unique<PriorTaskWorkerPool>( |
173 | 6 | "CLONE", config::clone_worker_count,config::clone_worker_count, [&engine, &cluster_info = _cluster_info](auto&& task) { return clone_callback(engine, cluster_info, task); }); |
174 | | |
175 | 6 | _workers[TTaskType::STORAGE_MEDIUM_MIGRATE] = std::make_unique<TaskWorkerPool>( |
176 | 6 | "STORAGE_MEDIUM_MIGRATE", config::storage_medium_migrate_count, [&engine](auto&& task) { return storage_medium_migrate_callback(engine, task); }); |
177 | | |
178 | 6 | _workers[TTaskType::GC_BINLOG] = std::make_unique<TaskWorkerPool>( |
179 | 6 | "GC_BINLOG", 1, [&engine](auto&& task) { return gc_binlog_callback(engine, task); }); |
180 | | |
181 | 6 | _workers[TTaskType::CLEAN_TRASH] = std::make_unique<TaskWorkerPool>( |
182 | 6 | "CLEAN_TRASH", 1, [&engine](auto&& task) {return clean_trash_callback(engine, task); }); |
183 | | |
184 | 6 | _workers[TTaskType::CLEAN_UDF_CACHE] = std::make_unique<TaskWorkerPool>( |
185 | 6 | "CLEAN_UDF_CACHE", 1, [](auto&& task) {return clean_udf_cache_callback(task); }); |
186 | | |
187 | 6 | _workers[TTaskType::UPDATE_VISIBLE_VERSION] = std::make_unique<TaskWorkerPool>( |
188 | 2.24k | "UPDATE_VISIBLE_VERSION", 1, [&engine](auto&& task) { return visible_version_callback(engine, task); }); |
189 | | |
190 | 6 | _report_workers.push_back(std::make_unique<ReportWorker>( |
191 | 675 | "REPORT_TASK", _cluster_info, config::report_task_interval_seconds, [&cluster_info = _cluster_info] { report_task_callback(cluster_info); })); |
192 | | |
193 | 6 | _report_workers.push_back(std::make_unique<ReportWorker>( |
194 | 297 | "REPORT_DISK_STATE", _cluster_info, config::report_disk_state_interval_seconds, [&engine, &cluster_info = _cluster_info] { report_disk_callback(engine, cluster_info); })); |
195 | | |
196 | 6 | _report_workers.push_back(std::make_unique<ReportWorker>( |
197 | 141 | "REPORT_OLAP_TABLET", _cluster_info, config::report_tablet_interval_seconds,[&engine, &cluster_info = _cluster_info] { report_tablet_callback(engine, cluster_info); })); |
198 | | |
199 | 6 | _report_workers.push_back(std::make_unique<ReportWorker>( |
200 | 882 | "REPORT_INDEX_POLICY", _cluster_info, config::report_index_policy_interval_seconds,[&cluster_info = _cluster_info] { report_index_policy_callback(cluster_info); })); |
201 | | // clang-format on |
202 | | |
203 | 6 | exec_env->storage_engine().to_local().workers = &_workers; |
204 | 6 | } |
205 | | |
206 | 0 | void AgentServer::cloud_start_workers(CloudStorageEngine& engine, ExecEnv* exec_env) { |
207 | 0 | _workers[TTaskType::PUSH] = std::make_unique<TaskWorkerPool>( |
208 | 0 | "PUSH", config::delete_worker_count, |
209 | 0 | [&engine](auto&& task) { cloud_push_callback(engine, task); }); |
210 | | // TODO(plat1ko): SUBMIT_TABLE_COMPACTION worker |
211 | |
|
212 | 0 | _workers[TTaskType::ALTER] = std::make_unique<TaskWorkerPool>( |
213 | 0 | "ALTER_TABLE", config::alter_tablet_worker_count, |
214 | 0 | [&engine](auto&& task) { return alter_cloud_tablet_callback(engine, task); }); |
215 | |
|
216 | 0 | _workers[TTaskType::CALCULATE_DELETE_BITMAP] = std::make_unique<TaskWorkerPool>( |
217 | 0 | "CALC_DBM_TASK", config::calc_delete_bitmap_worker_count, |
218 | 0 | [&engine](auto&& task) { return calc_delete_bitmap_callback(engine, task); }); |
219 | | |
220 | | // cloud, drop tablet just clean clear_cache, so just one thread do it |
221 | 0 | _workers[TTaskType::DROP] = std::make_unique<TaskWorkerPool>( |
222 | 0 | "DROP_TABLE", 1, [&engine](auto&& task) { return drop_tablet_callback(engine, task); }); |
223 | |
|
224 | 0 | _workers[TTaskType::PUSH_INDEX_POLICY] = std::make_unique<TaskWorkerPool>( |
225 | 0 | "PUSH_INDEX_POLICY", 1, [](auto&& task) { return push_index_policy_callback(task); }); |
226 | |
|
227 | 0 | _workers[TTaskType::DOWNLOAD] = std::make_unique<TaskWorkerPool>( |
228 | 0 | "DOWNLOAD", config::download_worker_count, |
229 | 0 | [&engine, exec_env](auto&& task) { return download_callback(engine, exec_env, task); }); |
230 | |
|
231 | 0 | _workers[TTaskType::MOVE] = std::make_unique<TaskWorkerPool>( |
232 | 0 | "MOVE", 1, |
233 | 0 | [&engine, exec_env](auto&& task) { return move_dir_callback(engine, exec_env, task); }); |
234 | |
|
235 | 0 | _workers[TTaskType::RELEASE_SNAPSHOT] = std::make_unique<TaskWorkerPool>( |
236 | 0 | "RELEASE_SNAPSHOT", config::release_snapshot_worker_count, |
237 | 0 | [&engine](auto&& task) { return release_snapshot_callback(engine, task); }); |
238 | |
|
239 | 0 | _workers[TTaskType::ALTER_INVERTED_INDEX] = std::make_unique<TaskWorkerPool>( |
240 | 0 | "ALTER_INVERTED_INDEX", config::alter_index_worker_count, |
241 | 0 | [&engine](auto&& task) { return alter_cloud_index_callback(engine, task); }); |
242 | |
|
243 | 0 | _report_workers.push_back(std::make_unique<ReportWorker>( |
244 | 0 | "REPORT_TASK", _cluster_info, config::report_task_interval_seconds, |
245 | 0 | [&cluster_info = _cluster_info] { report_task_callback(cluster_info); })); |
246 | |
|
247 | 0 | _report_workers.push_back(std::make_unique<ReportWorker>( |
248 | 0 | "REPORT_DISK_STATE", _cluster_info, config::report_disk_state_interval_seconds, |
249 | 0 | [&engine, &cluster_info = _cluster_info] { |
250 | 0 | report_disk_callback(engine, cluster_info); |
251 | 0 | })); |
252 | |
|
253 | 0 | if (config::enable_cloud_tablet_report) { |
254 | 0 | _report_workers.push_back(std::make_unique<ReportWorker>( |
255 | 0 | "REPORT_OLAP_TABLET", _cluster_info, config::report_tablet_interval_seconds, |
256 | 0 | [&engine, &cluster_info = _cluster_info] { |
257 | 0 | report_tablet_callback(engine, cluster_info); |
258 | 0 | })); |
259 | 0 | } |
260 | |
|
261 | 0 | _report_workers.push_back(std::make_unique<ReportWorker>( |
262 | 0 | "REPORT_INDEX_POLICY", _cluster_info, config::report_index_policy_interval_seconds, |
263 | 0 | [&cluster_info = _cluster_info] { report_index_policy_callback(cluster_info); })); |
264 | 0 | } |
265 | | |
266 | | // TODO(lingbin): each task in the batch may have it own status or FE must check and |
267 | | // resend request when something is wrong(BE may need some logic to guarantee idempotence. |
268 | | void AgentServer::submit_tasks(TAgentResult& agent_result, |
269 | 6.48k | const std::vector<TAgentTaskRequest>& tasks) { |
270 | 6.48k | Status ret_st; |
271 | | |
272 | | // TODO check cluster_info here if it is the same with that of heartbeat rpc |
273 | 6.48k | if (_cluster_info->master_fe_addr.hostname.empty() || _cluster_info->master_fe_addr.port == 0) { |
274 | 0 | Status st = Status::Cancelled("Have not get FE Master heartbeat yet"); |
275 | 0 | st.to_thrift(&agent_result.status); |
276 | 0 | return; |
277 | 0 | } |
278 | | |
279 | 22.9k | for (auto&& task : tasks) { |
280 | 22.9k | VLOG_RPC << "submit one task: " << apache::thrift::ThriftDebugString(task).c_str(); |
281 | 22.9k | auto task_type = task.task_type; |
282 | 22.9k | if (task_type == TTaskType::REALTIME_PUSH) { |
283 | 22 | task_type = TTaskType::PUSH; |
284 | 22 | } |
285 | 22.9k | int64_t signature = task.signature; |
286 | 22.9k | if (auto it = _workers.find(task_type); it != _workers.end()) { |
287 | 22.9k | auto& worker = it->second; |
288 | 22.9k | ret_st = worker->submit_task(task); |
289 | 22.9k | } else { |
290 | 0 | ret_st = Status::InvalidArgument("task(signature={}, type={}) has wrong task type", |
291 | 0 | signature, task.task_type); |
292 | 0 | } |
293 | | |
294 | 22.9k | if (!ret_st.ok()) { |
295 | 0 | LOG_WARNING("failed to submit task").tag("task", task).error(ret_st); |
296 | | // For now, all tasks in the batch share one status, so if any task |
297 | | // was failed to submit, we can only return error to FE(even when some |
298 | | // tasks have already been successfully submitted). |
299 | | // However, Fe does not check the return status of submit_tasks() currently, |
300 | | // and it is not sure that FE will retry when something is wrong, so here we |
301 | | // only print an warning log and go on(i.e. do not break current loop), |
302 | | // to ensure every task can be submitted once. It is OK for now, because the |
303 | | // ret_st can be error only when it encounters an wrong task_type and |
304 | | // req-member in TAgentTaskRequest, which is basically impossible. |
305 | | // TODO(lingbin): check the logic in FE again later. |
306 | 0 | } |
307 | 22.9k | } |
308 | | |
309 | 6.48k | ret_st.to_thrift(&agent_result.status); |
310 | 6.48k | } |
311 | | |
312 | | void AgentServer::publish_cluster_state(TAgentResult& t_agent_result, |
313 | 0 | const TAgentPublishRequest& request) { |
314 | 0 | Status status = Status::NotSupported("deprecated method(publish_cluster_state) was invoked"); |
315 | 0 | status.to_thrift(&t_agent_result.status); |
316 | 0 | } |
317 | | |
318 | 0 | void AgentServer::stop_report_workers() { |
319 | 0 | for (auto& work : _report_workers) { |
320 | 0 | work->stop(); |
321 | 0 | } |
322 | 0 | } |
323 | | |
324 | | } // namespace doris |