be/src/agent/task_worker_pool.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "agent/task_worker_pool.h" |
19 | | |
20 | | #include <brpc/controller.h> |
21 | | #include <fmt/format.h> |
22 | | #include <gen_cpp/AgentService_types.h> |
23 | | #include <gen_cpp/DataSinks_types.h> |
24 | | #include <gen_cpp/HeartbeatService_types.h> |
25 | | #include <gen_cpp/MasterService_types.h> |
26 | | #include <gen_cpp/Status_types.h> |
27 | | #include <gen_cpp/Types_types.h> |
28 | | #include <unistd.h> |
29 | | |
30 | | #include <algorithm> |
31 | | // IWYU pragma: no_include <bits/chrono.h> |
32 | | #include <thrift/protocol/TDebugProtocol.h> |
33 | | |
34 | | #include <atomic> |
35 | | #include <chrono> // IWYU pragma: keep |
36 | | #include <ctime> |
37 | | #include <functional> |
38 | | #include <memory> |
39 | | #include <mutex> |
40 | | #include <shared_mutex> |
41 | | #include <sstream> |
42 | | #include <string> |
43 | | #include <thread> |
44 | | #include <type_traits> |
45 | | #include <utility> |
46 | | #include <vector> |
47 | | |
48 | | #include "agent/utils.h" |
49 | | #include "cloud/cloud_delete_task.h" |
50 | | #include "cloud/cloud_engine_calc_delete_bitmap_task.h" |
51 | | #include "cloud/cloud_schema_change_job.h" |
52 | | #include "cloud/cloud_snapshot_loader.h" |
53 | | #include "cloud/cloud_snapshot_mgr.h" |
54 | | #include "cloud/cloud_tablet.h" |
55 | | #include "cloud/cloud_tablet_mgr.h" |
56 | | #include "cloud/config.h" |
57 | | #include "common/config.h" |
58 | | #include "common/logging.h" |
59 | | #include "common/metrics/doris_metrics.h" |
60 | | #include "common/status.h" |
61 | | #include "io/fs/file_system.h" |
62 | | #include "io/fs/hdfs_file_system.h" |
63 | | #include "io/fs/local_file_system.h" |
64 | | #include "io/fs/obj_storage_client.h" |
65 | | #include "io/fs/path.h" |
66 | | #include "io/fs/remote_file_system.h" |
67 | | #include "io/fs/s3_file_system.h" |
68 | | #include "runtime/exec_env.h" |
69 | | #include "runtime/fragment_mgr.h" |
70 | | #include "runtime/index_policy/index_policy_mgr.h" |
71 | | #include "runtime/memory/global_memory_arbitrator.h" |
72 | | #include "runtime/snapshot_loader.h" |
73 | | #include "runtime/user_function_cache.h" |
74 | | #include "service/backend_options.h" |
75 | | #include "storage/compaction/cumulative_compaction_time_series_policy.h" |
76 | | #include "storage/data_dir.h" |
77 | | #include "storage/olap_common.h" |
78 | | #include "storage/rowset/rowset_meta.h" |
79 | | #include "storage/snapshot/snapshot_manager.h" |
80 | | #include "storage/storage_engine.h" |
81 | | #include "storage/storage_policy.h" |
82 | | #include "storage/tablet/tablet.h" |
83 | | #include "storage/tablet/tablet_manager.h" |
84 | | #include "storage/tablet/tablet_meta.h" |
85 | | #include "storage/tablet/tablet_schema.h" |
86 | | #include "storage/task/engine_batch_load_task.h" |
87 | | #include "storage/task/engine_checksum_task.h" |
88 | | #include "storage/task/engine_clone_task.h" |
89 | | #include "storage/task/engine_cloud_index_change_task.h" |
90 | | #include "storage/task/engine_index_change_task.h" |
91 | | #include "storage/task/engine_publish_version_task.h" |
92 | | #include "storage/task/engine_storage_migration_task.h" |
93 | | #include "storage/txn/txn_manager.h" |
94 | | #include "storage/utils.h" |
95 | | #include "util/brpc_client_cache.h" |
96 | | #include "util/debug_points.h" |
97 | | #include "util/jni-util.h" |
98 | | #include "util/mem_info.h" |
99 | | #include "util/random.h" |
100 | | #include "util/s3_util.h" |
101 | | #include "util/stopwatch.hpp" |
102 | | #include "util/threadpool.h" |
103 | | #include "util/time.h" |
104 | | #include "util/trace.h" |
105 | | |
106 | | namespace doris { |
107 | | #include "common/compile_check_begin.h" |
108 | | using namespace ErrorCode; |
109 | | |
110 | | namespace { |
111 | | |
112 | | std::mutex s_task_signatures_mtx; |
113 | | std::unordered_map<TTaskType::type, std::unordered_set<int64_t>> s_task_signatures; |
114 | | |
115 | | std::atomic_ulong s_report_version(time(nullptr) * 100000); |
116 | | |
117 | 19.6k | void increase_report_version() { |
118 | 19.6k | s_report_version.fetch_add(1, std::memory_order_relaxed); |
119 | 19.6k | } |
120 | | |
121 | | // FIXME(plat1ko): Paired register and remove task info |
122 | 75.5k | bool register_task_info(const TTaskType::type task_type, int64_t signature) { |
123 | 75.5k | if (task_type == TTaskType::type::PUSH_STORAGE_POLICY || |
124 | 75.5k | task_type == TTaskType::type::PUSH_COOLDOWN_CONF || |
125 | 75.5k | task_type == TTaskType::type::COMPACTION) { |
126 | | // no need to report task of these types |
127 | 11 | return true; |
128 | 11 | } |
129 | | |
130 | 75.5k | if (signature == -1) { // No need to report task with unintialized signature |
131 | 2.70k | return true; |
132 | 2.70k | } |
133 | | |
134 | 72.8k | std::lock_guard lock(s_task_signatures_mtx); |
135 | 72.8k | auto& set = s_task_signatures[task_type]; |
136 | 72.8k | return set.insert(signature).second; |
137 | 75.5k | } |
138 | | |
139 | 45.0k | void remove_task_info(const TTaskType::type task_type, int64_t signature) { |
140 | 45.0k | size_t queue_size; |
141 | 45.0k | { |
142 | 45.0k | std::lock_guard lock(s_task_signatures_mtx); |
143 | 45.0k | auto& set = s_task_signatures[task_type]; |
144 | 45.0k | set.erase(signature); |
145 | 45.0k | queue_size = set.size(); |
146 | 45.0k | } |
147 | | |
148 | 18.4E | VLOG_NOTICE << "remove task info. type=" << task_type << ", signature=" << signature |
149 | 18.4E | << ", queue_size=" << queue_size; |
150 | 45.0k | } |
151 | | |
152 | 44.5k | void finish_task(const TFinishTaskRequest& finish_task_request) { |
153 | | // Return result to FE |
154 | 44.5k | TMasterResult result; |
155 | 44.5k | uint32_t try_time = 0; |
156 | 44.5k | constexpr int TASK_FINISH_MAX_RETRY = 3; |
157 | 44.8k | while (try_time < TASK_FINISH_MAX_RETRY) { |
158 | 44.8k | DorisMetrics::instance()->finish_task_requests_total->increment(1); |
159 | 44.8k | Status client_status = |
160 | 44.8k | MasterServerClient::instance()->finish_task(finish_task_request, &result); |
161 | | |
162 | 45.0k | if (client_status.ok()) { |
163 | 45.0k | break; |
164 | 18.4E | } else { |
165 | 18.4E | DorisMetrics::instance()->finish_task_requests_failed->increment(1); |
166 | 18.4E | LOG_WARNING("failed to finish task") |
167 | 18.4E | .tag("type", finish_task_request.task_type) |
168 | 18.4E | .tag("signature", finish_task_request.signature) |
169 | 18.4E | .error(result.status); |
170 | 18.4E | try_time += 1; |
171 | 18.4E | } |
172 | 18.4E | sleep(1); |
173 | 18.4E | } |
174 | 44.5k | } |
175 | | |
176 | | Status get_tablet_info(StorageEngine& engine, const TTabletId tablet_id, |
177 | 0 | const TSchemaHash schema_hash, TTabletInfo* tablet_info) { |
178 | 0 | tablet_info->__set_tablet_id(tablet_id); |
179 | 0 | tablet_info->__set_schema_hash(schema_hash); |
180 | 0 | return engine.tablet_manager()->report_tablet_info(tablet_info); |
181 | 0 | } |
182 | | |
183 | 1.36k | void random_sleep(int second) { |
184 | 1.36k | Random rnd(static_cast<uint32_t>(UnixMillis())); |
185 | 1.36k | sleep(rnd.Uniform(second) + 1); |
186 | 1.36k | } |
187 | | |
188 | | void alter_tablet(StorageEngine& engine, const TAgentTaskRequest& agent_task_req, int64_t signature, |
189 | 0 | const TTaskType::type task_type, TFinishTaskRequest* finish_task_request) { |
190 | 0 | Status status; |
191 | |
|
192 | 0 | std::string_view process_name = "alter tablet"; |
193 | | // Check last schema change status, if failed delete tablet file |
194 | | // Do not need to adjust delete success or not |
195 | | // Because if delete failed create rollup will failed |
196 | 0 | TTabletId new_tablet_id = 0; |
197 | 0 | TSchemaHash new_schema_hash = 0; |
198 | 0 | if (status.ok()) { |
199 | 0 | new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id; |
200 | 0 | new_schema_hash = agent_task_req.alter_tablet_req_v2.new_schema_hash; |
201 | 0 | auto mem_tracker = MemTrackerLimiter::create_shared( |
202 | 0 | MemTrackerLimiter::Type::SCHEMA_CHANGE, |
203 | 0 | fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}", |
204 | 0 | std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id), |
205 | 0 | std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id), |
206 | 0 | engine.memory_limitation_bytes_per_thread_for_schema_change())); |
207 | 0 | SCOPED_ATTACH_TASK(mem_tracker); |
208 | 0 | DorisMetrics::instance()->create_rollup_requests_total->increment(1); |
209 | 0 | Status res = Status::OK(); |
210 | 0 | try { |
211 | 0 | LOG_INFO("start {}", process_name) |
212 | 0 | .tag("signature", agent_task_req.signature) |
213 | 0 | .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id) |
214 | 0 | .tag("new_tablet_id", new_tablet_id) |
215 | 0 | .tag("mem_limit", |
216 | 0 | engine.memory_limitation_bytes_per_thread_for_schema_change()); |
217 | 0 | SchemaChangeJob job(engine, agent_task_req.alter_tablet_req_v2, |
218 | 0 | std::to_string(agent_task_req.alter_tablet_req_v2.__isset.job_id |
219 | 0 | ? agent_task_req.alter_tablet_req_v2.job_id |
220 | 0 | : 0)); |
221 | 0 | status = job.process_alter_tablet(agent_task_req.alter_tablet_req_v2); |
222 | 0 | } catch (const Exception& e) { |
223 | 0 | status = e.to_status(); |
224 | 0 | } |
225 | 0 | if (!status.ok()) { |
226 | 0 | DorisMetrics::instance()->create_rollup_requests_failed->increment(1); |
227 | 0 | } |
228 | 0 | } |
229 | |
|
230 | 0 | if (status.ok()) { |
231 | 0 | increase_report_version(); |
232 | 0 | } |
233 | | |
234 | | // Return result to fe |
235 | 0 | finish_task_request->__set_backend(BackendOptions::get_local_backend()); |
236 | 0 | finish_task_request->__set_report_version(s_report_version); |
237 | 0 | finish_task_request->__set_task_type(task_type); |
238 | 0 | finish_task_request->__set_signature(signature); |
239 | |
|
240 | 0 | std::vector<TTabletInfo> finish_tablet_infos; |
241 | 0 | if (status.ok()) { |
242 | 0 | TTabletInfo tablet_info; |
243 | 0 | status = get_tablet_info(engine, new_tablet_id, new_schema_hash, &tablet_info); |
244 | 0 | if (status.ok()) { |
245 | 0 | finish_tablet_infos.push_back(tablet_info); |
246 | 0 | } |
247 | 0 | } |
248 | |
|
249 | 0 | if (!status.ok() && !status.is<NOT_IMPLEMENTED_ERROR>()) { |
250 | 0 | LOG_WARNING("failed to {}", process_name) |
251 | 0 | .tag("signature", agent_task_req.signature) |
252 | 0 | .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id) |
253 | 0 | .tag("new_tablet_id", new_tablet_id) |
254 | 0 | .error(status); |
255 | 0 | } else { |
256 | 0 | finish_task_request->__set_finish_tablet_infos(finish_tablet_infos); |
257 | 0 | LOG_INFO("successfully {}", process_name) |
258 | 0 | .tag("signature", agent_task_req.signature) |
259 | 0 | .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id) |
260 | 0 | .tag("new_tablet_id", new_tablet_id); |
261 | 0 | } |
262 | 0 | finish_task_request->__set_task_status(status.to_thrift()); |
263 | 0 | } |
264 | | |
265 | | void alter_cloud_tablet(CloudStorageEngine& engine, const TAgentTaskRequest& agent_task_req, |
266 | | int64_t signature, const TTaskType::type task_type, |
267 | 10.6k | TFinishTaskRequest* finish_task_request) { |
268 | 10.6k | Status status; |
269 | | |
270 | 10.6k | std::string_view process_name = "alter tablet"; |
271 | | // Check last schema change status, if failed delete tablet file |
272 | | // Do not need to adjust delete success or not |
273 | | // Because if delete failed create rollup will failed |
274 | 10.6k | TTabletId new_tablet_id = 0; |
275 | 10.6k | new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id; |
276 | 10.6k | auto mem_tracker = MemTrackerLimiter::create_shared( |
277 | 10.6k | MemTrackerLimiter::Type::SCHEMA_CHANGE, |
278 | 10.6k | fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}", |
279 | 10.6k | std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id), |
280 | 10.6k | std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id), |
281 | 10.6k | engine.memory_limitation_bytes_per_thread_for_schema_change())); |
282 | 10.6k | SCOPED_ATTACH_TASK(mem_tracker); |
283 | 10.6k | DorisMetrics::instance()->create_rollup_requests_total->increment(1); |
284 | | |
285 | 10.6k | LOG_INFO("start {}", process_name) |
286 | 10.6k | .tag("signature", agent_task_req.signature) |
287 | 10.6k | .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id) |
288 | 10.6k | .tag("new_tablet_id", new_tablet_id) |
289 | 10.6k | .tag("mem_limit", engine.memory_limitation_bytes_per_thread_for_schema_change()); |
290 | 10.6k | DCHECK(agent_task_req.alter_tablet_req_v2.__isset.job_id); |
291 | 10.6k | CloudSchemaChangeJob job(engine, std::to_string(agent_task_req.alter_tablet_req_v2.job_id), |
292 | 10.6k | agent_task_req.alter_tablet_req_v2.expiration); |
293 | 10.6k | status = [&]() { |
294 | 10.6k | HANDLE_EXCEPTION_IF_CATCH_EXCEPTION( |
295 | 10.6k | job.process_alter_tablet(agent_task_req.alter_tablet_req_v2), |
296 | 10.6k | [&](const doris::Exception& ex) { |
297 | 10.6k | DorisMetrics::instance()->create_rollup_requests_failed->increment(1); |
298 | 10.6k | job.clean_up_on_failure(); |
299 | 10.6k | }); |
300 | 10.0k | return Status::OK(); |
301 | 10.6k | }(); |
302 | | |
303 | 10.6k | if (status.ok()) { |
304 | 9.99k | increase_report_version(); |
305 | 9.99k | LOG_INFO("successfully {}", process_name) |
306 | 9.99k | .tag("signature", agent_task_req.signature) |
307 | 9.99k | .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id) |
308 | 9.99k | .tag("new_tablet_id", new_tablet_id); |
309 | 9.99k | } else { |
310 | 673 | LOG_WARNING("failed to {}", process_name) |
311 | 673 | .tag("signature", agent_task_req.signature) |
312 | 673 | .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id) |
313 | 673 | .tag("new_tablet_id", new_tablet_id) |
314 | 673 | .error(status); |
315 | 673 | } |
316 | | |
317 | | // Return result to fe |
318 | 10.6k | finish_task_request->__set_backend(BackendOptions::get_local_backend()); |
319 | 10.6k | finish_task_request->__set_report_version(s_report_version); |
320 | 10.6k | finish_task_request->__set_task_type(task_type); |
321 | 10.6k | finish_task_request->__set_signature(signature); |
322 | 10.6k | finish_task_request->__set_task_status(status.to_thrift()); |
323 | 10.6k | } |
324 | | |
325 | | Status check_migrate_request(StorageEngine& engine, const TStorageMediumMigrateReq& req, |
326 | 0 | TabletSharedPtr& tablet, DataDir** dest_store) { |
327 | 0 | int64_t tablet_id = req.tablet_id; |
328 | 0 | tablet = engine.tablet_manager()->get_tablet(tablet_id); |
329 | 0 | if (tablet == nullptr) { |
330 | 0 | return Status::InternalError("could not find tablet {}", tablet_id); |
331 | 0 | } |
332 | | |
333 | 0 | if (req.__isset.data_dir) { |
334 | | // request specify the data dir |
335 | 0 | *dest_store = engine.get_store(req.data_dir); |
336 | 0 | if (*dest_store == nullptr) { |
337 | 0 | return Status::InternalError("could not find data dir {}", req.data_dir); |
338 | 0 | } |
339 | 0 | } else { |
340 | | // this is a storage medium |
341 | | // get data dir by storage medium |
342 | | |
343 | | // judge case when no need to migrate |
344 | 0 | uint32_t count = engine.available_storage_medium_type_count(); |
345 | 0 | if (count <= 1) { |
346 | 0 | return Status::InternalError("available storage medium type count is less than 1"); |
347 | 0 | } |
348 | | // check current tablet storage medium |
349 | 0 | TStorageMedium::type storage_medium = req.storage_medium; |
350 | 0 | TStorageMedium::type src_storage_medium = tablet->data_dir()->storage_medium(); |
351 | 0 | if (src_storage_medium == storage_medium) { |
352 | 0 | return Status::InternalError("tablet is already on specified storage medium {}", |
353 | 0 | storage_medium); |
354 | 0 | } |
355 | | // get a random store of specified storage medium |
356 | 0 | auto stores = engine.get_stores_for_create_tablet(tablet->partition_id(), storage_medium); |
357 | 0 | if (stores.empty()) { |
358 | 0 | return Status::InternalError("failed to get root path for create tablet"); |
359 | 0 | } |
360 | | |
361 | 0 | *dest_store = stores[0]; |
362 | 0 | } |
363 | 0 | if (tablet->data_dir()->path() == (*dest_store)->path()) { |
364 | 0 | LOG_WARNING("tablet is already on specified path").tag("path", tablet->data_dir()->path()); |
365 | 0 | return Status::Error<FILE_ALREADY_EXIST, false>("tablet is already on specified path: {}", |
366 | 0 | tablet->data_dir()->path()); |
367 | 0 | } |
368 | | |
369 | | // check local disk capacity |
370 | 0 | int64_t tablet_size = tablet->tablet_local_size(); |
371 | 0 | if ((*dest_store)->reach_capacity_limit(tablet_size)) { |
372 | 0 | return Status::Error<EXCEEDED_LIMIT>("reach the capacity limit of path {}, tablet_size={}", |
373 | 0 | (*dest_store)->path(), tablet_size); |
374 | 0 | } |
375 | 0 | return Status::OK(); |
376 | 0 | } |
377 | | |
378 | | // Return `true` if report success |
379 | | bool handle_report(const TReportRequest& request, const ClusterInfo* cluster_info, |
380 | 3.00k | std::string_view name) { |
381 | 3.00k | TMasterResult result; |
382 | 3.00k | Status status = MasterServerClient::instance()->report(request, &result); |
383 | 3.00k | if (!status.ok()) [[unlikely]] { |
384 | 0 | LOG_WARNING("failed to report {}", name) |
385 | 0 | .tag("host", cluster_info->master_fe_addr.hostname) |
386 | 0 | .tag("port", cluster_info->master_fe_addr.port) |
387 | 0 | .error(status); |
388 | 0 | return false; |
389 | 0 | } |
390 | | |
391 | 3.00k | else if (result.status.status_code != TStatusCode::OK) [[unlikely]] { |
392 | 0 | LOG_WARNING("failed to report {}", name) |
393 | 0 | .tag("host", cluster_info->master_fe_addr.hostname) |
394 | 0 | .tag("port", cluster_info->master_fe_addr.port) |
395 | 0 | .error(result.status); |
396 | 0 | return false; |
397 | 0 | } |
398 | | |
399 | 3.00k | return true; |
400 | 3.00k | } |
401 | | |
402 | | Status _submit_task(const TAgentTaskRequest& task, |
403 | 75.5k | std::function<Status(const TAgentTaskRequest&)> submit_op) { |
404 | 75.5k | const TTaskType::type task_type = task.task_type; |
405 | 75.5k | int64_t signature = task.signature; |
406 | | |
407 | 75.5k | std::string type_str; |
408 | 75.5k | EnumToString(TTaskType, task_type, type_str); |
409 | 18.4E | VLOG_CRITICAL << "submitting task. type=" << type_str << ", signature=" << signature; |
410 | | |
411 | 75.5k | if (!register_task_info(task_type, signature)) { |
412 | 1.26k | LOG_WARNING("failed to register task").tag("type", type_str).tag("signature", signature); |
413 | | // Duplicated task request, just return OK |
414 | 1.26k | return Status::OK(); |
415 | 1.26k | } |
416 | | |
417 | | // TODO(plat1ko): check task request member |
418 | | |
419 | | // Set the receiving time of task so that we can determine whether it is timed out later |
420 | | // exist a path task_worker_pool <- agent_server <- backend_service <- BackendService |
421 | | // use the arg BackendService_submit_tasks_args.tasks is not const, so modify is ok |
422 | 74.2k | (const_cast<TAgentTaskRequest&>(task)).__set_recv_time(time(nullptr)); |
423 | 74.2k | auto st = submit_op(task); |
424 | 74.2k | if (!st.ok()) [[unlikely]] { |
425 | 1 | LOG_INFO("failed to submit task").tag("type", type_str).tag("signature", signature); |
426 | 1 | return st; |
427 | 1 | } |
428 | | |
429 | 74.2k | LOG_INFO("successfully submit task").tag("type", type_str).tag("signature", signature); |
430 | 74.2k | return Status::OK(); |
431 | 74.2k | } |
432 | | |
433 | | bvar::LatencyRecorder g_publish_version_latency("doris_pk", "publish_version"); |
434 | | |
435 | | bvar::Adder<uint64_t> ALTER_INVERTED_INDEX_count("task", "ALTER_INVERTED_INDEX"); |
436 | | bvar::Adder<uint64_t> CHECK_CONSISTENCY_count("task", "CHECK_CONSISTENCY"); |
437 | | bvar::Adder<uint64_t> UPLOAD_count("task", "UPLOAD"); |
438 | | bvar::Adder<uint64_t> DOWNLOAD_count("task", "DOWNLOAD"); |
439 | | bvar::Adder<uint64_t> MAKE_SNAPSHOT_count("task", "MAKE_SNAPSHOT"); |
440 | | bvar::Adder<uint64_t> RELEASE_SNAPSHOT_count("task", "RELEASE_SNAPSHOT"); |
441 | | bvar::Adder<uint64_t> MOVE_count("task", "MOVE"); |
442 | | bvar::Adder<uint64_t> COMPACTION_count("task", "COMPACTION"); |
443 | | bvar::Adder<uint64_t> PUSH_STORAGE_POLICY_count("task", "PUSH_STORAGE_POLICY"); |
444 | | bvar::Adder<uint64_t> PUSH_INDEX_POLICY_count("task", "PUSH_INDEX_POLICY"); |
445 | | bvar::Adder<uint64_t> PUSH_COOLDOWN_CONF_count("task", "PUSH_COOLDOWN_CONF"); |
446 | | bvar::Adder<uint64_t> CREATE_count("task", "CREATE_TABLE"); |
447 | | bvar::Adder<uint64_t> DROP_count("task", "DROP_TABLE"); |
448 | | bvar::Adder<uint64_t> PUBLISH_VERSION_count("task", "PUBLISH_VERSION"); |
449 | | bvar::Adder<uint64_t> CLEAR_TRANSACTION_TASK_count("task", "CLEAR_TRANSACTION_TASK"); |
450 | | bvar::Adder<uint64_t> DELETE_count("task", "DELETE"); |
451 | | bvar::Adder<uint64_t> PUSH_count("task", "PUSH"); |
452 | | bvar::Adder<uint64_t> UPDATE_TABLET_META_INFO_count("task", "UPDATE_TABLET_META_INFO"); |
453 | | bvar::Adder<uint64_t> ALTER_count("task", "ALTER_TABLE"); |
454 | | bvar::Adder<uint64_t> CLONE_count("task", "CLONE"); |
455 | | bvar::Adder<uint64_t> STORAGE_MEDIUM_MIGRATE_count("task", "STORAGE_MEDIUM_MIGRATE"); |
456 | | bvar::Adder<uint64_t> GC_BINLOG_count("task", "GC_BINLOG"); |
457 | | bvar::Adder<uint64_t> UPDATE_VISIBLE_VERSION_count("task", "UPDATE_VISIBLE_VERSION"); |
458 | | bvar::Adder<uint64_t> CALCULATE_DELETE_BITMAP_count("task", "CALCULATE_DELETE_BITMAP"); |
459 | | |
460 | 148k | void add_task_count(const TAgentTaskRequest& task, int n) { |
461 | | // clang-format off |
462 | 148k | switch (task.task_type) { |
463 | 0 | #define ADD_TASK_COUNT(type) \ |
464 | 67.6k | case TTaskType::type: \ |
465 | 135k | type##_count << n; \ |
466 | 67.6k | return; |
467 | 1.14k | ADD_TASK_COUNT(ALTER_INVERTED_INDEX) |
468 | 0 | ADD_TASK_COUNT(CHECK_CONSISTENCY) |
469 | 32 | ADD_TASK_COUNT(UPLOAD) |
470 | 44 | ADD_TASK_COUNT(DOWNLOAD) |
471 | 640 | ADD_TASK_COUNT(MAKE_SNAPSHOT) |
472 | 640 | ADD_TASK_COUNT(RELEASE_SNAPSHOT) |
473 | 344 | ADD_TASK_COUNT(MOVE) |
474 | 0 | ADD_TASK_COUNT(COMPACTION) |
475 | 18 | ADD_TASK_COUNT(PUSH_STORAGE_POLICY) |
476 | 32 | ADD_TASK_COUNT(PUSH_INDEX_POLICY) |
477 | 4 | ADD_TASK_COUNT(PUSH_COOLDOWN_CONF) |
478 | 12.4k | ADD_TASK_COUNT(CREATE) |
479 | 22.7k | ADD_TASK_COUNT(DROP) |
480 | 5.48k | ADD_TASK_COUNT(PUBLISH_VERSION) |
481 | 28 | ADD_TASK_COUNT(CLEAR_TRANSACTION_TASK) |
482 | 0 | ADD_TASK_COUNT(UPDATE_TABLET_META_INFO) |
483 | 0 | ADD_TASK_COUNT(CLONE) |
484 | 0 | ADD_TASK_COUNT(STORAGE_MEDIUM_MIGRATE) |
485 | 0 | ADD_TASK_COUNT(GC_BINLOG) |
486 | 5.34k | ADD_TASK_COUNT(UPDATE_VISIBLE_VERSION) |
487 | 18.6k | ADD_TASK_COUNT(CALCULATE_DELETE_BITMAP) |
488 | 0 | #undef ADD_TASK_COUNT |
489 | 6.49k | case TTaskType::REALTIME_PUSH: |
490 | 6.49k | case TTaskType::PUSH: |
491 | 6.49k | if (task.push_req.push_type == TPushType::LOAD_V2) { |
492 | 0 | PUSH_count << n; |
493 | 6.49k | } else if (task.push_req.push_type == TPushType::DELETE) { |
494 | 6.49k | DELETE_count << n; |
495 | 6.49k | } |
496 | 6.49k | return; |
497 | 21.3k | case TTaskType::ALTER: |
498 | 21.3k | { |
499 | 21.3k | ALTER_count << n; |
500 | | // cloud auto stop need sc jobs, a tablet's sc can also be considered a fragment |
501 | 21.3k | if (n > 0) { |
502 | | // only count fragment when task is actually starting |
503 | 10.6k | doris::g_fragment_executing_count << 1; |
504 | 10.6k | int64_t now = duration_cast<std::chrono::milliseconds>( |
505 | 10.6k | std::chrono::system_clock::now().time_since_epoch()) |
506 | 10.6k | .count(); |
507 | 10.6k | g_fragment_last_active_time.set_value(now); |
508 | 10.6k | } |
509 | 21.3k | return; |
510 | 6.49k | } |
511 | 53.0k | default: |
512 | 53.0k | return; |
513 | 148k | } |
514 | | // clang-format on |
515 | 148k | } |
516 | | |
517 | | bvar::Adder<uint64_t> report_task_total("report", "task_total"); |
518 | | bvar::Adder<uint64_t> report_task_failed("report", "task_failed"); |
519 | | bvar::Adder<uint64_t> report_disk_total("report", "disk_total"); |
520 | | bvar::Adder<uint64_t> report_disk_failed("report", "disk_failed"); |
521 | | bvar::Adder<uint64_t> report_tablet_total("report", "tablet_total"); |
522 | | bvar::Adder<uint64_t> report_tablet_failed("report", "tablet_failed"); |
523 | | bvar::Adder<uint64_t> report_index_policy_total("report", "index_policy_total"); |
524 | | bvar::Adder<uint64_t> report_index_policy_failed("report", "index_policy_failed"); |
525 | | |
526 | | } // namespace |
527 | | |
528 | | TaskWorkerPool::TaskWorkerPool( |
529 | | std::string_view name, int worker_count, |
530 | | std::function<void(const TAgentTaskRequest& task)> callback, |
531 | | std::function<void(const TAgentTaskRequest& task)> pre_submit_callback) |
532 | 151 | : _callback(std::move(callback)), _pre_submit_callback(std::move(pre_submit_callback)) { |
533 | 151 | auto st = ThreadPoolBuilder(fmt::format("TaskWP_{}", name)) |
534 | 151 | .set_min_threads(worker_count) |
535 | 151 | .set_max_threads(worker_count) |
536 | 151 | .build(&_thread_pool); |
537 | 151 | CHECK(st.ok()) << name << ": " << st; |
538 | 151 | } |
539 | | |
540 | 72 | TaskWorkerPool::~TaskWorkerPool() { |
541 | 72 | stop(); |
542 | 72 | } |
543 | | |
544 | 78 | void TaskWorkerPool::stop() { |
545 | 78 | if (_stopped.exchange(true)) { |
546 | 6 | return; |
547 | 6 | } |
548 | | |
549 | 72 | if (_thread_pool) { |
550 | 72 | _thread_pool->shutdown(); |
551 | 72 | } |
552 | 72 | } |
553 | | |
554 | 75.5k | Status TaskWorkerPool::submit_task(const TAgentTaskRequest& task) { |
555 | 75.5k | return _submit_task(task, [this](auto&& task) { |
556 | 74.2k | if (_pre_submit_callback) { |
557 | 10.6k | _pre_submit_callback(task); |
558 | 10.6k | } |
559 | 74.2k | add_task_count(task, 1); |
560 | 74.2k | return _thread_pool->submit_func([this, task]() { |
561 | 74.2k | _callback(task); |
562 | 74.2k | add_task_count(task, -1); |
563 | 74.2k | }); |
564 | 74.2k | }); |
565 | 75.5k | } |
566 | | |
567 | | PriorTaskWorkerPool::PriorTaskWorkerPool( |
568 | | const std::string& name, int normal_worker_count, int high_prior_worker_count, |
569 | | std::function<void(const TAgentTaskRequest& task)> callback) |
570 | 13 | : _callback(std::move(callback)) { |
571 | 50 | for (int i = 0; i < normal_worker_count; ++i) { |
572 | 37 | auto st = Thread::create( |
573 | 37 | "Normal", name, [this] { normal_loop(); }, &_workers.emplace_back()); |
574 | 37 | CHECK(st.ok()) << name << ": " << st; |
575 | 37 | } |
576 | | |
577 | 50 | for (int i = 0; i < high_prior_worker_count; ++i) { |
578 | 37 | auto st = Thread::create( |
579 | 37 | "HighPrior", name, [this] { high_prior_loop(); }, &_workers.emplace_back()); |
580 | 37 | CHECK(st.ok()) << name << ": " << st; |
581 | 37 | } |
582 | 13 | } |
583 | | |
584 | 7 | PriorTaskWorkerPool::~PriorTaskWorkerPool() { |
585 | 7 | stop(); |
586 | 7 | } |
587 | | |
588 | 11 | void PriorTaskWorkerPool::stop() { |
589 | 11 | { |
590 | 11 | std::lock_guard lock(_mtx); |
591 | 11 | if (_stopped) { |
592 | 4 | return; |
593 | 4 | } |
594 | | |
595 | 7 | _stopped = true; |
596 | 7 | } |
597 | 0 | _normal_condv.notify_all(); |
598 | 7 | _high_prior_condv.notify_all(); |
599 | | |
600 | 38 | for (auto&& w : _workers) { |
601 | 38 | if (w) { |
602 | 38 | w->join(); |
603 | 38 | } |
604 | 38 | } |
605 | 7 | } |
606 | | |
607 | 6 | Status PriorTaskWorkerPool::submit_task(const TAgentTaskRequest& task) { |
608 | 6 | return _submit_task(task, [this](auto&& task) { |
609 | 6 | auto req = std::make_unique<TAgentTaskRequest>(task); |
610 | 6 | add_task_count(*req, 1); |
611 | 6 | if (req->__isset.priority && req->priority == TPriority::HIGH) { |
612 | 4 | std::lock_guard lock(_mtx); |
613 | 4 | _high_prior_queue.push_back(std::move(req)); |
614 | 4 | _high_prior_condv.notify_one(); |
615 | 4 | _normal_condv.notify_one(); |
616 | 4 | } else { |
617 | 2 | std::lock_guard lock(_mtx); |
618 | 2 | _normal_queue.push_back(std::move(req)); |
619 | 2 | _normal_condv.notify_one(); |
620 | 2 | } |
621 | 6 | return Status::OK(); |
622 | 6 | }); |
623 | 6 | } |
624 | | |
625 | 0 | Status PriorTaskWorkerPool::submit_high_prior_and_cancel_low(TAgentTaskRequest& task) { |
626 | 0 | const TTaskType::type task_type = task.task_type; |
627 | 0 | int64_t signature = task.signature; |
628 | 0 | std::string type_str; |
629 | 0 | EnumToString(TTaskType, task_type, type_str); |
630 | 0 | auto req = std::make_unique<TAgentTaskRequest>(task); |
631 | |
|
632 | 0 | DCHECK(req->__isset.priority && req->priority == TPriority::HIGH); |
633 | 0 | do { |
634 | 0 | std::lock_guard lock(s_task_signatures_mtx); |
635 | 0 | auto& set = s_task_signatures[task_type]; |
636 | 0 | if (!set.contains(signature)) { |
637 | | // If it doesn't exist, put it directly into the priority queue |
638 | 0 | add_task_count(*req, 1); |
639 | 0 | set.insert(signature); |
640 | 0 | std::lock_guard temp_lock(_mtx); |
641 | 0 | _high_prior_queue.push_back(std::move(req)); |
642 | 0 | _high_prior_condv.notify_one(); |
643 | 0 | _normal_condv.notify_one(); |
644 | 0 | break; |
645 | 0 | } else { |
646 | 0 | std::lock_guard temp_lock(_mtx); |
647 | 0 | for (auto it = _normal_queue.begin(); it != _normal_queue.end();) { |
648 | | // If it exists in the normal queue, cancel the task in the normal queue |
649 | 0 | if ((*it)->signature == signature) { |
650 | 0 | _normal_queue.erase(it); // cancel the original task |
651 | 0 | _high_prior_queue.push_back(std::move(req)); // add the new task to the queue |
652 | 0 | _high_prior_condv.notify_one(); |
653 | 0 | _normal_condv.notify_one(); |
654 | 0 | break; |
655 | 0 | } else { |
656 | 0 | ++it; // doesn't meet the condition, continue to the next one |
657 | 0 | } |
658 | 0 | } |
659 | | // If it exists in the high priority queue, no operation is needed |
660 | 0 | LOG_INFO("task has already existed in high prior queue.").tag("signature", signature); |
661 | 0 | } |
662 | 0 | } while (false); |
663 | | |
664 | | // Set the receiving time of task so that we can determine whether it is timed out later |
665 | 0 | task.__set_recv_time(time(nullptr)); |
666 | |
|
667 | 0 | LOG_INFO("successfully submit task").tag("type", type_str).tag("signature", signature); |
668 | 0 | return Status::OK(); |
669 | 0 | } |
670 | | |
671 | 37 | void PriorTaskWorkerPool::normal_loop() { |
672 | 58 | while (true) { |
673 | 40 | std::unique_ptr<TAgentTaskRequest> req; |
674 | | |
675 | 40 | { |
676 | 40 | std::unique_lock lock(_mtx); |
677 | 61 | _normal_condv.wait(lock, [&] { |
678 | 61 | return !_normal_queue.empty() || !_high_prior_queue.empty() || _stopped; |
679 | 61 | }); |
680 | | |
681 | 40 | if (_stopped) { |
682 | 19 | return; |
683 | 19 | } |
684 | | |
685 | 21 | if (!_high_prior_queue.empty()) { |
686 | 1 | req = std::move(_high_prior_queue.front()); |
687 | 1 | _high_prior_queue.pop_front(); |
688 | 20 | } else if (!_normal_queue.empty()) { |
689 | 2 | req = std::move(_normal_queue.front()); |
690 | 2 | _normal_queue.pop_front(); |
691 | 18 | } else { |
692 | 18 | continue; |
693 | 18 | } |
694 | 21 | } |
695 | | |
696 | 3 | _callback(*req); |
697 | 3 | add_task_count(*req, -1); |
698 | 3 | } |
699 | 37 | } |
700 | | |
701 | 37 | void PriorTaskWorkerPool::high_prior_loop() { |
702 | 57 | while (true) { |
703 | 39 | std::unique_ptr<TAgentTaskRequest> req; |
704 | | |
705 | 39 | { |
706 | 39 | std::unique_lock lock(_mtx); |
707 | 61 | _high_prior_condv.wait(lock, [&] { return !_high_prior_queue.empty() || _stopped; }); |
708 | | |
709 | 39 | if (_stopped) { |
710 | 19 | return; |
711 | 19 | } |
712 | | |
713 | 20 | if (_high_prior_queue.empty()) { |
714 | 0 | continue; |
715 | 0 | } |
716 | | |
717 | 20 | req = std::move(_high_prior_queue.front()); |
718 | 20 | _high_prior_queue.pop_front(); |
719 | 20 | } |
720 | | |
721 | 0 | _callback(*req); |
722 | 20 | add_task_count(*req, -1); |
723 | 20 | } |
724 | 37 | } |
725 | | |
726 | | ReportWorker::ReportWorker(std::string name, const ClusterInfo* cluster_info, int report_interval_s, |
727 | | std::function<void()> callback) |
728 | 29 | : _name(std::move(name)) { |
729 | 29 | auto report_loop = [this, cluster_info, report_interval_s, callback = std::move(callback)] { |
730 | 29 | auto& engine = ExecEnv::GetInstance()->storage_engine(); |
731 | 29 | engine.register_report_listener(this); |
732 | 3.10k | while (true) { |
733 | 3.08k | { |
734 | 3.08k | std::unique_lock lock(_mtx); |
735 | 3.08k | _condv.wait_for(lock, std::chrono::seconds(report_interval_s), |
736 | 6.16k | [&] { return _stopped || _signal; }); |
737 | | |
738 | 3.08k | if (_stopped) { |
739 | 13 | break; |
740 | 13 | } |
741 | | |
742 | 3.07k | if (_signal) { |
743 | | // Consume received signal |
744 | 60 | _signal = false; |
745 | 60 | } |
746 | 3.07k | } |
747 | | |
748 | 3.07k | if (cluster_info->master_fe_addr.port == 0) { |
749 | | // port == 0 means not received heartbeat yet |
750 | 47 | LOG(INFO) << "waiting to receive first heartbeat from frontend before doing report"; |
751 | 47 | continue; |
752 | 47 | } |
753 | | |
754 | 3.02k | callback(); |
755 | 3.02k | } |
756 | 29 | engine.deregister_report_listener(this); |
757 | 29 | }; |
758 | | |
759 | 29 | auto st = Thread::create("ReportWorker", _name, report_loop, &_thread); |
760 | 29 | CHECK(st.ok()) << _name << ": " << st; |
761 | 29 | } |
762 | | |
763 | 13 | ReportWorker::~ReportWorker() { |
764 | 13 | stop(); |
765 | 13 | } |
766 | | |
767 | 61 | void ReportWorker::notify() { |
768 | 61 | { |
769 | 61 | std::lock_guard lock(_mtx); |
770 | 61 | _signal = true; |
771 | 61 | } |
772 | 61 | _condv.notify_all(); |
773 | 61 | } |
774 | | |
775 | 14 | void ReportWorker::stop() { |
776 | 14 | { |
777 | 14 | std::lock_guard lock(_mtx); |
778 | 14 | if (_stopped) { |
779 | 1 | return; |
780 | 1 | } |
781 | | |
782 | 13 | _stopped = true; |
783 | 13 | } |
784 | 0 | _condv.notify_all(); |
785 | 13 | if (_thread) { |
786 | 13 | _thread->join(); |
787 | 13 | } |
788 | 13 | } |
789 | | |
790 | 574 | void alter_cloud_index_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) { |
791 | 574 | const auto& alter_inverted_index_rq = req.alter_inverted_index_req; |
792 | 574 | LOG(INFO) << "[index_change]get alter index task. signature=" << req.signature |
793 | 574 | << ", tablet_id=" << alter_inverted_index_rq.tablet_id |
794 | 574 | << ", job_id=" << alter_inverted_index_rq.job_id; |
795 | | |
796 | 574 | Status status = Status::OK(); |
797 | 574 | auto tablet_ptr = engine.tablet_mgr().get_tablet(alter_inverted_index_rq.tablet_id); |
798 | 574 | if (tablet_ptr != nullptr) { |
799 | 574 | EngineCloudIndexChangeTask engine_task(engine, req.alter_inverted_index_req); |
800 | 574 | status = engine_task.execute(); |
801 | 574 | } else { |
802 | 0 | status = Status::NotFound("could not find tablet {}", alter_inverted_index_rq.tablet_id); |
803 | 0 | } |
804 | | |
805 | | // Return result to fe |
806 | 574 | TFinishTaskRequest finish_task_request; |
807 | 574 | finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
808 | 574 | finish_task_request.__set_task_type(req.task_type); |
809 | 574 | finish_task_request.__set_signature(req.signature); |
810 | 574 | if (!status.ok()) { |
811 | 9 | LOG(WARNING) << "[index_change]failed to alter inverted index task, signature=" |
812 | 9 | << req.signature << ", tablet_id=" << alter_inverted_index_rq.tablet_id |
813 | 9 | << ", job_id=" << alter_inverted_index_rq.job_id << ", error=" << status; |
814 | 565 | } else { |
815 | 565 | LOG(INFO) << "[index_change]successfully alter inverted index task, signature=" |
816 | 565 | << req.signature << ", tablet_id=" << alter_inverted_index_rq.tablet_id |
817 | 565 | << ", job_id=" << alter_inverted_index_rq.job_id; |
818 | 565 | } |
819 | 574 | finish_task_request.__set_task_status(status.to_thrift()); |
820 | 574 | finish_task(finish_task_request); |
821 | 574 | remove_task_info(req.task_type, req.signature); |
822 | 574 | } |
823 | | |
824 | 0 | void alter_inverted_index_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
825 | 0 | const auto& alter_inverted_index_rq = req.alter_inverted_index_req; |
826 | 0 | LOG(INFO) << "get alter inverted index task. signature=" << req.signature |
827 | 0 | << ", tablet_id=" << alter_inverted_index_rq.tablet_id |
828 | 0 | << ", job_id=" << alter_inverted_index_rq.job_id; |
829 | |
|
830 | 0 | Status status = Status::OK(); |
831 | 0 | auto tablet_ptr = engine.tablet_manager()->get_tablet(alter_inverted_index_rq.tablet_id); |
832 | 0 | if (tablet_ptr != nullptr) { |
833 | 0 | EngineIndexChangeTask engine_task(engine, alter_inverted_index_rq); |
834 | 0 | SCOPED_ATTACH_TASK(engine_task.mem_tracker()); |
835 | 0 | status = engine_task.execute(); |
836 | 0 | } else { |
837 | 0 | status = Status::NotFound("could not find tablet {}", alter_inverted_index_rq.tablet_id); |
838 | 0 | } |
839 | | |
840 | | // Return result to fe |
841 | 0 | TFinishTaskRequest finish_task_request; |
842 | 0 | finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
843 | 0 | finish_task_request.__set_task_type(req.task_type); |
844 | 0 | finish_task_request.__set_signature(req.signature); |
845 | 0 | std::vector<TTabletInfo> finish_tablet_infos; |
846 | 0 | if (!status.ok()) { |
847 | 0 | LOG(WARNING) << "failed to alter inverted index task, signature=" << req.signature |
848 | 0 | << ", tablet_id=" << alter_inverted_index_rq.tablet_id |
849 | 0 | << ", job_id=" << alter_inverted_index_rq.job_id << ", error=" << status; |
850 | 0 | } else { |
851 | 0 | LOG(INFO) << "successfully alter inverted index task, signature=" << req.signature |
852 | 0 | << ", tablet_id=" << alter_inverted_index_rq.tablet_id |
853 | 0 | << ", job_id=" << alter_inverted_index_rq.job_id; |
854 | 0 | TTabletInfo tablet_info; |
855 | 0 | status = get_tablet_info(engine, alter_inverted_index_rq.tablet_id, |
856 | 0 | alter_inverted_index_rq.schema_hash, &tablet_info); |
857 | 0 | if (status.ok()) { |
858 | 0 | finish_tablet_infos.push_back(tablet_info); |
859 | 0 | } |
860 | 0 | finish_task_request.__set_finish_tablet_infos(finish_tablet_infos); |
861 | 0 | } |
862 | 0 | finish_task_request.__set_task_status(status.to_thrift()); |
863 | 0 | finish_task(finish_task_request); |
864 | 0 | remove_task_info(req.task_type, req.signature); |
865 | 0 | } |
866 | | |
867 | 0 | void update_tablet_meta_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
868 | 0 | LOG(INFO) << "get update tablet meta task. signature=" << req.signature; |
869 | |
|
870 | 0 | Status status; |
871 | 0 | const auto& update_tablet_meta_req = req.update_tablet_meta_info_req; |
872 | 0 | for (const auto& tablet_meta_info : update_tablet_meta_req.tabletMetaInfos) { |
873 | 0 | auto tablet = engine.tablet_manager()->get_tablet(tablet_meta_info.tablet_id); |
874 | 0 | if (tablet == nullptr) { |
875 | 0 | status = Status::NotFound("tablet not found"); |
876 | 0 | LOG(WARNING) << "could not find tablet when update tablet meta. tablet_id=" |
877 | 0 | << tablet_meta_info.tablet_id; |
878 | 0 | continue; |
879 | 0 | } |
880 | 0 | bool need_to_save = false; |
881 | 0 | if (tablet_meta_info.__isset.partition_id) { |
882 | | // for fix partition_id = 0 |
883 | 0 | LOG(WARNING) << "change be tablet id: " << tablet->tablet_meta()->tablet_id() |
884 | 0 | << "partition id from : " << tablet->tablet_meta()->partition_id() |
885 | 0 | << " to : " << tablet_meta_info.partition_id; |
886 | 0 | auto succ = engine.tablet_manager()->update_tablet_partition_id( |
887 | 0 | tablet_meta_info.partition_id, tablet->tablet_meta()->tablet_id()); |
888 | 0 | if (!succ) { |
889 | 0 | std::string err_msg = fmt::format( |
890 | 0 | "change be tablet id : {} partition_id : {} failed", |
891 | 0 | tablet->tablet_meta()->tablet_id(), tablet_meta_info.partition_id); |
892 | 0 | LOG(WARNING) << err_msg; |
893 | 0 | status = Status::InvalidArgument(err_msg); |
894 | 0 | continue; |
895 | 0 | } |
896 | 0 | need_to_save = true; |
897 | 0 | } |
898 | 0 | if (tablet_meta_info.__isset.storage_policy_id) { |
899 | 0 | tablet->tablet_meta()->set_storage_policy_id(tablet_meta_info.storage_policy_id); |
900 | 0 | need_to_save = true; |
901 | 0 | } |
902 | 0 | if (tablet_meta_info.__isset.is_in_memory) { |
903 | 0 | tablet->tablet_meta()->mutable_tablet_schema()->set_is_in_memory( |
904 | 0 | tablet_meta_info.is_in_memory); |
905 | 0 | std::shared_lock rlock(tablet->get_header_lock()); |
906 | 0 | for (auto& [_, rowset_meta] : tablet->tablet_meta()->all_mutable_rs_metas()) { |
907 | 0 | rowset_meta->tablet_schema()->set_is_in_memory(tablet_meta_info.is_in_memory); |
908 | 0 | } |
909 | 0 | tablet->tablet_schema_unlocked()->set_is_in_memory(tablet_meta_info.is_in_memory); |
910 | 0 | need_to_save = true; |
911 | 0 | } |
912 | 0 | if (tablet_meta_info.__isset.compaction_policy) { |
913 | 0 | if (tablet_meta_info.compaction_policy != CUMULATIVE_SIZE_BASED_POLICY && |
914 | 0 | tablet_meta_info.compaction_policy != CUMULATIVE_TIME_SERIES_POLICY) { |
915 | 0 | status = Status::InvalidArgument( |
916 | 0 | "invalid compaction policy, only support for size_based or " |
917 | 0 | "time_series"); |
918 | 0 | continue; |
919 | 0 | } |
920 | 0 | tablet->tablet_meta()->set_compaction_policy(tablet_meta_info.compaction_policy); |
921 | 0 | need_to_save = true; |
922 | 0 | } |
923 | 0 | if (tablet_meta_info.__isset.time_series_compaction_goal_size_mbytes) { |
924 | 0 | if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) { |
925 | 0 | status = Status::InvalidArgument( |
926 | 0 | "only time series compaction policy support time series config"); |
927 | 0 | continue; |
928 | 0 | } |
929 | 0 | tablet->tablet_meta()->set_time_series_compaction_goal_size_mbytes( |
930 | 0 | tablet_meta_info.time_series_compaction_goal_size_mbytes); |
931 | 0 | need_to_save = true; |
932 | 0 | } |
933 | 0 | if (tablet_meta_info.__isset.time_series_compaction_file_count_threshold) { |
934 | 0 | if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) { |
935 | 0 | status = Status::InvalidArgument( |
936 | 0 | "only time series compaction policy support time series config"); |
937 | 0 | continue; |
938 | 0 | } |
939 | 0 | tablet->tablet_meta()->set_time_series_compaction_file_count_threshold( |
940 | 0 | tablet_meta_info.time_series_compaction_file_count_threshold); |
941 | 0 | need_to_save = true; |
942 | 0 | } |
943 | 0 | if (tablet_meta_info.__isset.time_series_compaction_time_threshold_seconds) { |
944 | 0 | if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) { |
945 | 0 | status = Status::InvalidArgument( |
946 | 0 | "only time series compaction policy support time series config"); |
947 | 0 | continue; |
948 | 0 | } |
949 | 0 | tablet->tablet_meta()->set_time_series_compaction_time_threshold_seconds( |
950 | 0 | tablet_meta_info.time_series_compaction_time_threshold_seconds); |
951 | 0 | need_to_save = true; |
952 | 0 | } |
953 | 0 | if (tablet_meta_info.__isset.time_series_compaction_empty_rowsets_threshold) { |
954 | 0 | if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) { |
955 | 0 | status = Status::InvalidArgument( |
956 | 0 | "only time series compaction policy support time series config"); |
957 | 0 | continue; |
958 | 0 | } |
959 | 0 | tablet->tablet_meta()->set_time_series_compaction_empty_rowsets_threshold( |
960 | 0 | tablet_meta_info.time_series_compaction_empty_rowsets_threshold); |
961 | 0 | need_to_save = true; |
962 | 0 | } |
963 | 0 | if (tablet_meta_info.__isset.time_series_compaction_level_threshold) { |
964 | 0 | if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) { |
965 | 0 | status = Status::InvalidArgument( |
966 | 0 | "only time series compaction policy support time series config"); |
967 | 0 | continue; |
968 | 0 | } |
969 | 0 | tablet->tablet_meta()->set_time_series_compaction_level_threshold( |
970 | 0 | tablet_meta_info.time_series_compaction_level_threshold); |
971 | 0 | need_to_save = true; |
972 | 0 | } |
973 | 0 | if (tablet_meta_info.__isset.vertical_compaction_num_columns_per_group) { |
974 | 0 | tablet->tablet_meta()->set_vertical_compaction_num_columns_per_group( |
975 | 0 | tablet_meta_info.vertical_compaction_num_columns_per_group); |
976 | 0 | need_to_save = true; |
977 | 0 | } |
978 | 0 | if (tablet_meta_info.__isset.replica_id) { |
979 | 0 | tablet->tablet_meta()->set_replica_id(tablet_meta_info.replica_id); |
980 | 0 | } |
981 | 0 | if (tablet_meta_info.__isset.binlog_config) { |
982 | | // check binlog_config require fields: enable, ttl_seconds, max_bytes, max_history_nums |
983 | 0 | const auto& t_binlog_config = tablet_meta_info.binlog_config; |
984 | 0 | if (!t_binlog_config.__isset.enable || !t_binlog_config.__isset.ttl_seconds || |
985 | 0 | !t_binlog_config.__isset.max_bytes || !t_binlog_config.__isset.max_history_nums) { |
986 | 0 | status = Status::InvalidArgument("invalid binlog config, some fields not set"); |
987 | 0 | LOG(WARNING) << fmt::format( |
988 | 0 | "invalid binlog config, some fields not set, tablet_id={}, " |
989 | 0 | "t_binlog_config={}", |
990 | 0 | tablet_meta_info.tablet_id, |
991 | 0 | apache::thrift::ThriftDebugString(t_binlog_config)); |
992 | 0 | continue; |
993 | 0 | } |
994 | | |
995 | 0 | BinlogConfig new_binlog_config; |
996 | 0 | new_binlog_config = tablet_meta_info.binlog_config; |
997 | 0 | LOG(INFO) << fmt::format( |
998 | 0 | "update tablet meta binlog config. tablet_id={}, old_binlog_config={}, " |
999 | 0 | "new_binlog_config={}", |
1000 | 0 | tablet_meta_info.tablet_id, tablet->tablet_meta()->binlog_config().to_string(), |
1001 | 0 | new_binlog_config.to_string()); |
1002 | 0 | tablet->set_binlog_config(new_binlog_config); |
1003 | 0 | need_to_save = true; |
1004 | 0 | } |
1005 | 0 | if (tablet_meta_info.__isset.enable_single_replica_compaction) { |
1006 | 0 | std::shared_lock rlock(tablet->get_header_lock()); |
1007 | 0 | tablet->tablet_meta()->mutable_tablet_schema()->set_enable_single_replica_compaction( |
1008 | 0 | tablet_meta_info.enable_single_replica_compaction); |
1009 | 0 | for (auto& [_, rowset_meta] : tablet->tablet_meta()->all_mutable_rs_metas()) { |
1010 | 0 | rowset_meta->tablet_schema()->set_enable_single_replica_compaction( |
1011 | 0 | tablet_meta_info.enable_single_replica_compaction); |
1012 | 0 | } |
1013 | 0 | tablet->tablet_schema_unlocked()->set_enable_single_replica_compaction( |
1014 | 0 | tablet_meta_info.enable_single_replica_compaction); |
1015 | 0 | need_to_save = true; |
1016 | 0 | } |
1017 | 0 | if (tablet_meta_info.__isset.disable_auto_compaction) { |
1018 | 0 | std::shared_lock rlock(tablet->get_header_lock()); |
1019 | 0 | tablet->tablet_meta()->mutable_tablet_schema()->set_disable_auto_compaction( |
1020 | 0 | tablet_meta_info.disable_auto_compaction); |
1021 | 0 | for (auto& [_, rowset_meta] : tablet->tablet_meta()->all_mutable_rs_metas()) { |
1022 | 0 | rowset_meta->tablet_schema()->set_disable_auto_compaction( |
1023 | 0 | tablet_meta_info.disable_auto_compaction); |
1024 | 0 | } |
1025 | 0 | tablet->tablet_schema_unlocked()->set_disable_auto_compaction( |
1026 | 0 | tablet_meta_info.disable_auto_compaction); |
1027 | 0 | need_to_save = true; |
1028 | 0 | } |
1029 | |
|
1030 | 0 | if (tablet_meta_info.__isset.skip_write_index_on_load) { |
1031 | 0 | std::shared_lock rlock(tablet->get_header_lock()); |
1032 | 0 | tablet->tablet_meta()->mutable_tablet_schema()->set_skip_write_index_on_load( |
1033 | 0 | tablet_meta_info.skip_write_index_on_load); |
1034 | 0 | for (auto& [_, rowset_meta] : tablet->tablet_meta()->all_mutable_rs_metas()) { |
1035 | 0 | rowset_meta->tablet_schema()->set_skip_write_index_on_load( |
1036 | 0 | tablet_meta_info.skip_write_index_on_load); |
1037 | 0 | } |
1038 | 0 | tablet->tablet_schema_unlocked()->set_skip_write_index_on_load( |
1039 | 0 | tablet_meta_info.skip_write_index_on_load); |
1040 | 0 | need_to_save = true; |
1041 | 0 | } |
1042 | 0 | if (need_to_save) { |
1043 | 0 | std::shared_lock rlock(tablet->get_header_lock()); |
1044 | 0 | tablet->save_meta(); |
1045 | 0 | } |
1046 | 0 | } |
1047 | |
|
1048 | 0 | LOG(INFO) << "finish update tablet meta task. signature=" << req.signature; |
1049 | 0 | if (req.signature != -1) { |
1050 | 0 | TFinishTaskRequest finish_task_request; |
1051 | 0 | finish_task_request.__set_task_status(status.to_thrift()); |
1052 | 0 | finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
1053 | 0 | finish_task_request.__set_task_type(req.task_type); |
1054 | 0 | finish_task_request.__set_signature(req.signature); |
1055 | 0 | finish_task(finish_task_request); |
1056 | 0 | remove_task_info(req.task_type, req.signature); |
1057 | 0 | } |
1058 | 0 | } |
1059 | | |
1060 | 0 | void check_consistency_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
1061 | 0 | uint32_t checksum = 0; |
1062 | 0 | const auto& check_consistency_req = req.check_consistency_req; |
1063 | 0 | EngineChecksumTask engine_task(engine, check_consistency_req.tablet_id, |
1064 | 0 | check_consistency_req.schema_hash, check_consistency_req.version, |
1065 | 0 | &checksum); |
1066 | 0 | SCOPED_ATTACH_TASK(engine_task.mem_tracker()); |
1067 | 0 | Status status = engine_task.execute(); |
1068 | 0 | if (!status.ok()) { |
1069 | 0 | LOG_WARNING("failed to check consistency") |
1070 | 0 | .tag("signature", req.signature) |
1071 | 0 | .tag("tablet_id", check_consistency_req.tablet_id) |
1072 | 0 | .error(status); |
1073 | 0 | } else { |
1074 | 0 | LOG_INFO("successfully check consistency") |
1075 | 0 | .tag("signature", req.signature) |
1076 | 0 | .tag("tablet_id", check_consistency_req.tablet_id) |
1077 | 0 | .tag("checksum", checksum); |
1078 | 0 | } |
1079 | |
|
1080 | 0 | TFinishTaskRequest finish_task_request; |
1081 | 0 | finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
1082 | 0 | finish_task_request.__set_task_type(req.task_type); |
1083 | 0 | finish_task_request.__set_signature(req.signature); |
1084 | 0 | finish_task_request.__set_task_status(status.to_thrift()); |
1085 | 0 | finish_task_request.__set_tablet_checksum(static_cast<int64_t>(checksum)); |
1086 | 0 | finish_task_request.__set_request_version(check_consistency_req.version); |
1087 | |
|
1088 | 0 | finish_task(finish_task_request); |
1089 | 0 | remove_task_info(req.task_type, req.signature); |
1090 | 0 | } |
1091 | | |
1092 | 1.02k | void report_task_callback(const ClusterInfo* cluster_info) { |
1093 | 1.02k | TReportRequest request; |
1094 | 1.02k | if (config::report_random_wait) { |
1095 | 1.02k | random_sleep(5); |
1096 | 1.02k | } |
1097 | 1.02k | request.__isset.tasks = true; |
1098 | 1.02k | { |
1099 | 1.02k | std::lock_guard lock(s_task_signatures_mtx); |
1100 | 1.02k | auto& tasks = request.tasks; |
1101 | 5.55k | for (auto&& [task_type, signatures] : s_task_signatures) { |
1102 | 5.55k | auto& set = tasks[task_type]; |
1103 | 4.35M | for (auto&& signature : signatures) { |
1104 | 4.35M | set.insert(signature); |
1105 | 4.35M | } |
1106 | 5.55k | } |
1107 | 1.02k | } |
1108 | 1.02k | request.__set_backend(BackendOptions::get_local_backend()); |
1109 | 1.02k | request.__set_running_tasks(ExecEnv::GetInstance()->fragment_mgr()->running_query_num()); |
1110 | 1.02k | bool succ = handle_report(request, cluster_info, "task"); |
1111 | 1.02k | report_task_total << 1; |
1112 | 1.02k | if (!succ) [[unlikely]] { |
1113 | 0 | report_task_failed << 1; |
1114 | 0 | } |
1115 | 1.02k | } |
1116 | | |
1117 | 313 | void report_disk_callback(StorageEngine& engine, const ClusterInfo* cluster_info) { |
1118 | 313 | TReportRequest request; |
1119 | 313 | request.__set_backend(BackendOptions::get_local_backend()); |
1120 | 313 | request.__isset.disks = true; |
1121 | | |
1122 | 313 | std::vector<DataDirInfo> data_dir_infos; |
1123 | 313 | static_cast<void>(engine.get_all_data_dir_info(&data_dir_infos, true /* update */)); |
1124 | | |
1125 | 349 | for (auto& root_path_info : data_dir_infos) { |
1126 | 349 | TDisk disk; |
1127 | 349 | disk.__set_root_path(root_path_info.path); |
1128 | 349 | disk.__set_path_hash(root_path_info.path_hash); |
1129 | 349 | disk.__set_storage_medium(root_path_info.storage_medium); |
1130 | 349 | disk.__set_disk_total_capacity(root_path_info.disk_capacity); |
1131 | 349 | disk.__set_data_used_capacity(root_path_info.local_used_capacity); |
1132 | 349 | disk.__set_remote_used_capacity(root_path_info.remote_used_capacity); |
1133 | 349 | disk.__set_disk_available_capacity(root_path_info.available); |
1134 | 349 | disk.__set_trash_used_capacity(root_path_info.trash_used_capacity); |
1135 | 349 | disk.__set_used(root_path_info.is_used); |
1136 | 349 | request.disks[root_path_info.path] = disk; |
1137 | 349 | } |
1138 | 313 | request.__set_num_cores(CpuInfo::num_cores()); |
1139 | 313 | request.__set_pipeline_executor_size(config::pipeline_executor_size > 0 |
1140 | 313 | ? config::pipeline_executor_size |
1141 | 313 | : CpuInfo::num_cores()); |
1142 | 313 | bool succ = handle_report(request, cluster_info, "disk"); |
1143 | 313 | report_disk_total << 1; |
1144 | 313 | if (!succ) [[unlikely]] { |
1145 | 0 | report_disk_failed << 1; |
1146 | 0 | } |
1147 | 313 | } |
1148 | | |
1149 | 124 | void report_disk_callback(CloudStorageEngine& engine, const ClusterInfo* cluster_info) { |
1150 | | // Random sleep 1~5 seconds before doing report. |
1151 | | // In order to avoid the problem that the FE receives many report requests at the same time |
1152 | | // and can not be processed. |
1153 | 124 | if (config::report_random_wait) { |
1154 | 124 | random_sleep(5); |
1155 | 124 | } |
1156 | 124 | (void)engine; // To be used in the future |
1157 | | |
1158 | 124 | TReportRequest request; |
1159 | 124 | request.__set_backend(BackendOptions::get_local_backend()); |
1160 | 124 | request.__isset.disks = true; |
1161 | | |
1162 | | // TODO(deardeng): report disk info in cloud mode. And make it more clear |
1163 | | // that report CPU by using a separte report procedure |
1164 | | // or abstracting disk report as "host info report" |
1165 | 124 | request.__set_num_cores(CpuInfo::num_cores()); |
1166 | 124 | request.__set_pipeline_executor_size(config::pipeline_executor_size > 0 |
1167 | 124 | ? config::pipeline_executor_size |
1168 | 124 | : CpuInfo::num_cores()); |
1169 | 124 | bool succ = handle_report(request, cluster_info, "disk"); |
1170 | 124 | report_disk_total << 1; |
1171 | 124 | report_disk_failed << !succ; |
1172 | 124 | } |
1173 | | |
1174 | 150 | void report_tablet_callback(StorageEngine& engine, const ClusterInfo* cluster_info) { |
1175 | 150 | if (config::report_random_wait) { |
1176 | 150 | random_sleep(5); |
1177 | 150 | } |
1178 | | |
1179 | 150 | TReportRequest request; |
1180 | 150 | request.__set_backend(BackendOptions::get_local_backend()); |
1181 | 150 | request.__isset.tablets = true; |
1182 | | |
1183 | 150 | increase_report_version(); |
1184 | 150 | uint64_t report_version; |
1185 | 150 | for (int i = 0; i < 5; i++) { |
1186 | 150 | request.tablets.clear(); |
1187 | 150 | report_version = s_report_version; |
1188 | 150 | engine.tablet_manager()->build_all_report_tablets_info(&request.tablets); |
1189 | 150 | if (report_version == s_report_version) { |
1190 | 150 | break; |
1191 | 150 | } |
1192 | 150 | } |
1193 | | |
1194 | 150 | if (report_version < s_report_version) { |
1195 | | // TODO llj This can only reduce the possibility for report error, but can't avoid it. |
1196 | | // If FE create a tablet in FE meta and send CREATE task to this BE, the tablet may not be included in this |
1197 | | // report, and the report version has a small probability that it has not been updated in time. When FE |
1198 | | // receives this report, it is possible to delete the new tablet. |
1199 | 0 | LOG(WARNING) << "report version " << report_version << " change to " << s_report_version; |
1200 | 0 | DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1); |
1201 | 0 | return; |
1202 | 0 | } |
1203 | | |
1204 | 150 | std::map<int64_t, int64_t> partitions_version; |
1205 | 150 | engine.tablet_manager()->get_partitions_visible_version(&partitions_version); |
1206 | 150 | request.__set_partitions_version(std::move(partitions_version)); |
1207 | | |
1208 | 150 | int64_t max_compaction_score = |
1209 | 150 | std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score->value(), |
1210 | 150 | DorisMetrics::instance()->tablet_base_max_compaction_score->value()); |
1211 | 150 | request.__set_tablet_max_compaction_score(max_compaction_score); |
1212 | 150 | request.__set_report_version(report_version); |
1213 | | |
1214 | | // report storage policy and resource |
1215 | 150 | auto& storage_policy_list = request.storage_policy; |
1216 | 150 | for (auto [id, version] : get_storage_policy_ids()) { |
1217 | 60 | auto& storage_policy = storage_policy_list.emplace_back(); |
1218 | 60 | storage_policy.__set_id(id); |
1219 | 60 | storage_policy.__set_version(version); |
1220 | 60 | } |
1221 | 150 | request.__isset.storage_policy = true; |
1222 | 150 | auto& resource_list = request.resource; |
1223 | 160 | for (auto [id_str, version] : get_storage_resource_ids()) { |
1224 | 160 | auto& resource = resource_list.emplace_back(); |
1225 | 160 | int64_t id = -1; |
1226 | 160 | if (auto [_, ec] = std::from_chars(id_str.data(), id_str.data() + id_str.size(), id); |
1227 | 160 | ec != std::errc {}) [[unlikely]] { |
1228 | 0 | LOG(ERROR) << "invalid resource id format: " << id_str; |
1229 | 160 | } else { |
1230 | 160 | resource.__set_id(id); |
1231 | 160 | resource.__set_version(version); |
1232 | 160 | } |
1233 | 160 | } |
1234 | 150 | request.__isset.resource = true; |
1235 | | |
1236 | 150 | bool succ = handle_report(request, cluster_info, "tablet"); |
1237 | 150 | report_tablet_total << 1; |
1238 | 150 | if (!succ) [[unlikely]] { |
1239 | 0 | report_tablet_failed << 1; |
1240 | 0 | } |
1241 | 150 | } |
1242 | | |
1243 | 65 | void report_tablet_callback(CloudStorageEngine& engine, const ClusterInfo* cluster_info) { |
1244 | | // Random sleep 1~5 seconds before doing report. |
1245 | | // In order to avoid the problem that the FE receives many report requests at the same time |
1246 | | // and can not be processed. |
1247 | 65 | if (config::report_random_wait) { |
1248 | 65 | random_sleep(5); |
1249 | 65 | } |
1250 | | |
1251 | 65 | TReportRequest request; |
1252 | 65 | request.__set_backend(BackendOptions::get_local_backend()); |
1253 | 65 | request.__isset.tablets = true; |
1254 | | |
1255 | 65 | increase_report_version(); |
1256 | 65 | uint64_t report_version; |
1257 | 65 | uint64_t total_num_tablets = 0; |
1258 | 87 | for (int i = 0; i < 5; i++) { |
1259 | 85 | request.tablets.clear(); |
1260 | 85 | report_version = s_report_version; |
1261 | 85 | engine.tablet_mgr().build_all_report_tablets_info(&request.tablets, &total_num_tablets); |
1262 | 85 | if (report_version == s_report_version) { |
1263 | 63 | break; |
1264 | 63 | } |
1265 | 85 | } |
1266 | | |
1267 | 65 | if (report_version < s_report_version) { |
1268 | 2 | LOG(WARNING) << "report version " << report_version << " change to " << s_report_version; |
1269 | 2 | DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1); |
1270 | 2 | return; |
1271 | 2 | } |
1272 | | |
1273 | 63 | request.__set_report_version(report_version); |
1274 | 63 | request.__set_num_tablets(total_num_tablets); |
1275 | | |
1276 | 63 | bool succ = handle_report(request, cluster_info, "tablet"); |
1277 | 63 | report_tablet_total << 1; |
1278 | 63 | if (!succ) [[unlikely]] { |
1279 | 0 | report_tablet_failed << 1; |
1280 | 0 | } |
1281 | 63 | } |
1282 | | |
1283 | 16 | void upload_callback(StorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) { |
1284 | 16 | const auto& upload_request = req.upload_req; |
1285 | | |
1286 | 16 | LOG(INFO) << "get upload task. signature=" << req.signature |
1287 | 16 | << ", job_id=" << upload_request.job_id; |
1288 | | |
1289 | 16 | std::map<int64_t, std::vector<std::string>> tablet_files; |
1290 | 16 | std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>( |
1291 | 16 | engine, env, upload_request.job_id, req.signature, upload_request.broker_addr, |
1292 | 16 | upload_request.broker_prop); |
1293 | 16 | SCOPED_ATTACH_TASK(loader->resource_ctx()); |
1294 | 16 | Status status = |
1295 | 16 | loader->init(upload_request.__isset.storage_backend ? upload_request.storage_backend |
1296 | 16 | : TStorageBackendType::type::BROKER, |
1297 | 16 | upload_request.__isset.location ? upload_request.location : ""); |
1298 | 16 | if (status.ok()) { |
1299 | 16 | status = loader->upload(upload_request.src_dest_map, &tablet_files); |
1300 | 16 | } |
1301 | | |
1302 | 16 | if (!status.ok()) { |
1303 | 0 | LOG_WARNING("failed to upload") |
1304 | 0 | .tag("signature", req.signature) |
1305 | 0 | .tag("job_id", upload_request.job_id) |
1306 | 0 | .error(status); |
1307 | 16 | } else { |
1308 | 16 | LOG_INFO("successfully upload") |
1309 | 16 | .tag("signature", req.signature) |
1310 | 16 | .tag("job_id", upload_request.job_id); |
1311 | 16 | } |
1312 | | |
1313 | 16 | TFinishTaskRequest finish_task_request; |
1314 | 16 | finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
1315 | 16 | finish_task_request.__set_task_type(req.task_type); |
1316 | 16 | finish_task_request.__set_signature(req.signature); |
1317 | 16 | finish_task_request.__set_task_status(status.to_thrift()); |
1318 | 16 | finish_task_request.__set_tablet_files(tablet_files); |
1319 | | |
1320 | 16 | finish_task(finish_task_request); |
1321 | 16 | remove_task_info(req.task_type, req.signature); |
1322 | 16 | } |
1323 | | |
1324 | 22 | void download_callback(StorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) { |
1325 | 22 | const auto& download_request = req.download_req; |
1326 | 22 | LOG(INFO) << "get download task. signature=" << req.signature |
1327 | 22 | << ", job_id=" << download_request.job_id |
1328 | 22 | << ", task detail: " << apache::thrift::ThriftDebugString(download_request); |
1329 | | |
1330 | | // TODO: download |
1331 | 22 | std::vector<int64_t> downloaded_tablet_ids; |
1332 | | |
1333 | 22 | auto status = Status::OK(); |
1334 | 22 | if (download_request.__isset.remote_tablet_snapshots) { |
1335 | 0 | std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>( |
1336 | 0 | engine, env, download_request.job_id, req.signature); |
1337 | 0 | status = loader->remote_http_download(download_request.remote_tablet_snapshots, |
1338 | 0 | &downloaded_tablet_ids); |
1339 | 22 | } else { |
1340 | 22 | std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>( |
1341 | 22 | engine, env, download_request.job_id, req.signature, download_request.broker_addr, |
1342 | 22 | download_request.broker_prop); |
1343 | 22 | status = loader->init(download_request.__isset.storage_backend |
1344 | 22 | ? download_request.storage_backend |
1345 | 22 | : TStorageBackendType::type::BROKER, |
1346 | 22 | download_request.__isset.location ? download_request.location : ""); |
1347 | 22 | if (status.ok()) { |
1348 | 22 | status = loader->download(download_request.src_dest_map, &downloaded_tablet_ids); |
1349 | 22 | } |
1350 | 22 | } |
1351 | | |
1352 | 22 | if (!status.ok()) { |
1353 | 0 | LOG_WARNING("failed to download") |
1354 | 0 | .tag("signature", req.signature) |
1355 | 0 | .tag("job_id", download_request.job_id) |
1356 | 0 | .error(status); |
1357 | 22 | } else { |
1358 | 22 | LOG_INFO("successfully download") |
1359 | 22 | .tag("signature", req.signature) |
1360 | 22 | .tag("job_id", download_request.job_id); |
1361 | 22 | } |
1362 | | |
1363 | 22 | TFinishTaskRequest finish_task_request; |
1364 | 22 | finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
1365 | 22 | finish_task_request.__set_task_type(req.task_type); |
1366 | 22 | finish_task_request.__set_signature(req.signature); |
1367 | 22 | finish_task_request.__set_task_status(status.to_thrift()); |
1368 | 22 | finish_task_request.__set_downloaded_tablet_ids(downloaded_tablet_ids); |
1369 | | |
1370 | 22 | finish_task(finish_task_request); |
1371 | 22 | remove_task_info(req.task_type, req.signature); |
1372 | 22 | } |
1373 | | |
1374 | 0 | void download_callback(CloudStorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) { |
1375 | 0 | const auto& download_request = req.download_req; |
1376 | 0 | LOG(INFO) << "get download task. signature=" << req.signature |
1377 | 0 | << ", job_id=" << download_request.job_id |
1378 | 0 | << ", task detail: " << apache::thrift::ThriftDebugString(download_request); |
1379 | |
|
1380 | 0 | std::vector<int64_t> transferred_tablet_ids; |
1381 | |
|
1382 | 0 | auto status = Status::OK(); |
1383 | 0 | if (download_request.__isset.remote_tablet_snapshots) { |
1384 | 0 | status = Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>( |
1385 | 0 | "remote tablet snapshot is not supported."); |
1386 | 0 | } else { |
1387 | 0 | std::unique_ptr<CloudSnapshotLoader> loader = std::make_unique<CloudSnapshotLoader>( |
1388 | 0 | engine, env, download_request.job_id, req.signature, download_request.broker_addr, |
1389 | 0 | download_request.broker_prop); |
1390 | 0 | status = loader->init(download_request.__isset.storage_backend |
1391 | 0 | ? download_request.storage_backend |
1392 | 0 | : TStorageBackendType::type::BROKER, |
1393 | 0 | download_request.__isset.location ? download_request.location : "", |
1394 | 0 | download_request.vault_id); |
1395 | 0 | if (status.ok()) { |
1396 | 0 | status = loader->download(download_request.src_dest_map, &transferred_tablet_ids); |
1397 | 0 | } |
1398 | |
|
1399 | 0 | if (!status.ok()) { |
1400 | 0 | LOG_WARNING("failed to download") |
1401 | 0 | .tag("signature", req.signature) |
1402 | 0 | .tag("job_id", download_request.job_id) |
1403 | 0 | .error(status); |
1404 | 0 | } else { |
1405 | 0 | LOG_INFO("successfully download") |
1406 | 0 | .tag("signature", req.signature) |
1407 | 0 | .tag("job_id", download_request.job_id); |
1408 | 0 | } |
1409 | |
|
1410 | 0 | TFinishTaskRequest finish_task_request; |
1411 | 0 | finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
1412 | 0 | finish_task_request.__set_task_type(req.task_type); |
1413 | 0 | finish_task_request.__set_signature(req.signature); |
1414 | 0 | finish_task_request.__set_task_status(status.to_thrift()); |
1415 | 0 | finish_task_request.__set_downloaded_tablet_ids(transferred_tablet_ids); |
1416 | |
|
1417 | 0 | finish_task(finish_task_request); |
1418 | 0 | remove_task_info(req.task_type, req.signature); |
1419 | 0 | } |
1420 | 0 | } |
1421 | | |
1422 | 320 | void make_snapshot_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
1423 | 320 | const auto& snapshot_request = req.snapshot_req; |
1424 | | |
1425 | 320 | LOG(INFO) << "get snapshot task. signature=" << req.signature; |
1426 | | |
1427 | 320 | std::string snapshot_path; |
1428 | 320 | bool allow_incremental_clone = false; // not used |
1429 | 320 | std::vector<std::string> snapshot_files; |
1430 | 320 | Status status = engine.snapshot_mgr()->make_snapshot(snapshot_request, &snapshot_path, |
1431 | 320 | &allow_incremental_clone); |
1432 | 320 | if (status.ok() && snapshot_request.__isset.list_files) { |
1433 | | // list and save all snapshot files |
1434 | | // snapshot_path like: data/snapshot/20180417205230.1.86400 |
1435 | | // we need to add subdir: tablet_id/schema_hash/ |
1436 | 318 | std::vector<io::FileInfo> files; |
1437 | 318 | bool exists = true; |
1438 | 318 | io::Path path = fmt::format("{}/{}/{}/", snapshot_path, snapshot_request.tablet_id, |
1439 | 318 | snapshot_request.schema_hash); |
1440 | 318 | status = io::global_local_filesystem()->list(path, true, &files, &exists); |
1441 | 318 | if (status.ok()) { |
1442 | 338 | for (auto& file : files) { |
1443 | 338 | snapshot_files.push_back(file.file_name); |
1444 | 338 | } |
1445 | 316 | } |
1446 | 318 | } |
1447 | 320 | if (!status.ok()) { |
1448 | 0 | LOG_WARNING("failed to make snapshot") |
1449 | 0 | .tag("signature", req.signature) |
1450 | 0 | .tag("tablet_id", snapshot_request.tablet_id) |
1451 | 0 | .tag("version", snapshot_request.version) |
1452 | 0 | .error(status); |
1453 | 320 | } else { |
1454 | 320 | LOG_INFO("successfully make snapshot") |
1455 | 320 | .tag("signature", req.signature) |
1456 | 320 | .tag("tablet_id", snapshot_request.tablet_id) |
1457 | 320 | .tag("version", snapshot_request.version) |
1458 | 320 | .tag("snapshot_path", snapshot_path); |
1459 | 320 | } |
1460 | | |
1461 | 320 | TFinishTaskRequest finish_task_request; |
1462 | 320 | finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
1463 | 320 | finish_task_request.__set_task_type(req.task_type); |
1464 | 320 | finish_task_request.__set_signature(req.signature); |
1465 | 320 | finish_task_request.__set_snapshot_path(snapshot_path); |
1466 | 320 | finish_task_request.__set_snapshot_files(snapshot_files); |
1467 | 320 | finish_task_request.__set_task_status(status.to_thrift()); |
1468 | | |
1469 | 320 | finish_task(finish_task_request); |
1470 | 320 | remove_task_info(req.task_type, req.signature); |
1471 | 320 | } |
1472 | | |
1473 | 320 | void release_snapshot_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
1474 | 320 | const auto& release_snapshot_request = req.release_snapshot_req; |
1475 | | |
1476 | 320 | LOG(INFO) << "get release snapshot task. signature=" << req.signature; |
1477 | | |
1478 | 320 | const std::string& snapshot_path = release_snapshot_request.snapshot_path; |
1479 | 320 | Status status = engine.snapshot_mgr()->release_snapshot(snapshot_path); |
1480 | 320 | if (!status.ok()) { |
1481 | 0 | LOG_WARNING("failed to release snapshot") |
1482 | 0 | .tag("signature", req.signature) |
1483 | 0 | .tag("snapshot_path", snapshot_path) |
1484 | 0 | .error(status); |
1485 | 320 | } else { |
1486 | 320 | LOG_INFO("successfully release snapshot") |
1487 | 320 | .tag("signature", req.signature) |
1488 | 320 | .tag("snapshot_path", snapshot_path); |
1489 | 320 | } |
1490 | | |
1491 | 320 | TFinishTaskRequest finish_task_request; |
1492 | 320 | finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
1493 | 320 | finish_task_request.__set_task_type(req.task_type); |
1494 | 320 | finish_task_request.__set_signature(req.signature); |
1495 | 320 | finish_task_request.__set_task_status(status.to_thrift()); |
1496 | | |
1497 | 320 | finish_task(finish_task_request); |
1498 | 320 | remove_task_info(req.task_type, req.signature); |
1499 | 320 | } |
1500 | | |
1501 | 0 | void release_snapshot_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) { |
1502 | 0 | const auto& release_snapshot_request = req.release_snapshot_req; |
1503 | |
|
1504 | 0 | LOG(INFO) << "get release snapshot task. signature=" << req.signature; |
1505 | |
|
1506 | 0 | Status status = engine.cloud_snapshot_mgr().release_snapshot( |
1507 | 0 | release_snapshot_request.tablet_id, release_snapshot_request.is_job_completed); |
1508 | |
|
1509 | 0 | if (!status.ok()) { |
1510 | 0 | LOG_WARNING("failed to release snapshot") |
1511 | 0 | .tag("signature", req.signature) |
1512 | 0 | .tag("tablet_id", release_snapshot_request.tablet_id) |
1513 | 0 | .tag("is_job_completed", release_snapshot_request.is_job_completed) |
1514 | 0 | .error(status); |
1515 | 0 | } else { |
1516 | 0 | LOG_INFO("successfully release snapshot") |
1517 | 0 | .tag("signature", req.signature) |
1518 | 0 | .tag("tablet_id", release_snapshot_request.tablet_id) |
1519 | 0 | .tag("is_job_completed", release_snapshot_request.is_job_completed); |
1520 | 0 | } |
1521 | |
|
1522 | 0 | TFinishTaskRequest finish_task_request; |
1523 | 0 | finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
1524 | 0 | finish_task_request.__set_task_type(req.task_type); |
1525 | 0 | finish_task_request.__set_signature(req.signature); |
1526 | 0 | finish_task_request.__set_task_status(status.to_thrift()); |
1527 | |
|
1528 | 0 | finish_task(finish_task_request); |
1529 | 0 | remove_task_info(req.task_type, req.signature); |
1530 | 0 | } |
1531 | | |
1532 | 172 | void move_dir_callback(StorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) { |
1533 | 172 | const auto& move_dir_req = req.move_dir_req; |
1534 | | |
1535 | 172 | LOG(INFO) << "get move dir task. signature=" << req.signature |
1536 | 172 | << ", job_id=" << move_dir_req.job_id; |
1537 | 172 | Status status; |
1538 | 172 | auto tablet = engine.tablet_manager()->get_tablet(move_dir_req.tablet_id); |
1539 | 172 | if (tablet == nullptr) { |
1540 | 0 | status = Status::InvalidArgument("Could not find tablet"); |
1541 | 172 | } else { |
1542 | 172 | SnapshotLoader loader(engine, env, move_dir_req.job_id, move_dir_req.tablet_id); |
1543 | 172 | status = loader.move(move_dir_req.src, tablet, true); |
1544 | 172 | } |
1545 | | |
1546 | 172 | if (!status.ok()) { |
1547 | 0 | LOG_WARNING("failed to move dir") |
1548 | 0 | .tag("signature", req.signature) |
1549 | 0 | .tag("job_id", move_dir_req.job_id) |
1550 | 0 | .tag("tablet_id", move_dir_req.tablet_id) |
1551 | 0 | .tag("src", move_dir_req.src) |
1552 | 0 | .error(status); |
1553 | 172 | } else { |
1554 | 172 | LOG_INFO("successfully move dir") |
1555 | 172 | .tag("signature", req.signature) |
1556 | 172 | .tag("job_id", move_dir_req.job_id) |
1557 | 172 | .tag("tablet_id", move_dir_req.tablet_id) |
1558 | 172 | .tag("src", move_dir_req.src); |
1559 | 172 | } |
1560 | | |
1561 | 172 | TFinishTaskRequest finish_task_request; |
1562 | 172 | finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
1563 | 172 | finish_task_request.__set_task_type(req.task_type); |
1564 | 172 | finish_task_request.__set_signature(req.signature); |
1565 | 172 | finish_task_request.__set_task_status(status.to_thrift()); |
1566 | | |
1567 | 172 | finish_task(finish_task_request); |
1568 | 172 | remove_task_info(req.task_type, req.signature); |
1569 | 172 | } |
1570 | | |
1571 | 0 | void move_dir_callback(CloudStorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) { |
1572 | 0 | const auto& move_dir_req = req.move_dir_req; |
1573 | |
|
1574 | 0 | LOG(INFO) << "get move dir task. signature=" << req.signature |
1575 | 0 | << ", job_id=" << move_dir_req.job_id; |
1576 | |
|
1577 | 0 | Status status = engine.cloud_snapshot_mgr().commit_snapshot(move_dir_req.tablet_id); |
1578 | 0 | if (!status.ok()) { |
1579 | 0 | LOG_WARNING("failed to move dir") |
1580 | 0 | .tag("signature", req.signature) |
1581 | 0 | .tag("job_id", move_dir_req.job_id) |
1582 | 0 | .tag("tablet_id", move_dir_req.tablet_id) |
1583 | 0 | .error(status); |
1584 | 0 | } else { |
1585 | 0 | LOG_INFO("successfully move dir") |
1586 | 0 | .tag("signature", req.signature) |
1587 | 0 | .tag("job_id", move_dir_req.job_id) |
1588 | 0 | .tag("tablet_id", move_dir_req.tablet_id); |
1589 | 0 | } |
1590 | |
|
1591 | 0 | TFinishTaskRequest finish_task_request; |
1592 | 0 | finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
1593 | 0 | finish_task_request.__set_task_type(req.task_type); |
1594 | 0 | finish_task_request.__set_signature(req.signature); |
1595 | 0 | finish_task_request.__set_task_status(status.to_thrift()); |
1596 | |
|
1597 | 0 | finish_task(finish_task_request); |
1598 | 0 | remove_task_info(req.task_type, req.signature); |
1599 | 0 | } |
1600 | | |
1601 | 0 | void submit_table_compaction_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
1602 | 0 | const auto& compaction_req = req.compaction_req; |
1603 | |
|
1604 | 0 | LOG(INFO) << "get compaction task. signature=" << req.signature |
1605 | 0 | << ", compaction_type=" << compaction_req.type; |
1606 | |
|
1607 | 0 | CompactionType compaction_type; |
1608 | 0 | if (compaction_req.type == "base") { |
1609 | 0 | compaction_type = CompactionType::BASE_COMPACTION; |
1610 | 0 | } else { |
1611 | 0 | compaction_type = CompactionType::CUMULATIVE_COMPACTION; |
1612 | 0 | } |
1613 | |
|
1614 | 0 | auto tablet_ptr = engine.tablet_manager()->get_tablet(compaction_req.tablet_id); |
1615 | 0 | if (tablet_ptr != nullptr) { |
1616 | 0 | auto* data_dir = tablet_ptr->data_dir(); |
1617 | 0 | if (!tablet_ptr->can_do_compaction(data_dir->path_hash(), compaction_type)) { |
1618 | 0 | LOG(WARNING) << "could not do compaction. tablet_id=" << tablet_ptr->tablet_id() |
1619 | 0 | << ", compaction_type=" << compaction_type; |
1620 | 0 | return; |
1621 | 0 | } |
1622 | | |
1623 | 0 | Status status = engine.submit_compaction_task(tablet_ptr, compaction_type, false); |
1624 | 0 | if (!status.ok()) { |
1625 | 0 | LOG(WARNING) << "failed to submit table compaction task. error=" << status; |
1626 | 0 | } |
1627 | 0 | } |
1628 | 0 | } |
1629 | | |
1630 | | namespace { |
1631 | | |
1632 | 33 | void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr existed_fs) { |
1633 | 33 | Status st; |
1634 | 33 | io::RemoteFileSystemSPtr fs; |
1635 | | |
1636 | 33 | if (!existed_fs) { |
1637 | | // No such FS instance on BE |
1638 | 33 | auto res = io::S3FileSystem::create(S3Conf::get_s3_conf(param.s3_storage_param), |
1639 | 33 | std::to_string(param.id)); |
1640 | 33 | if (!res.has_value()) { |
1641 | 7 | st = std::move(res).error(); |
1642 | 26 | } else { |
1643 | 26 | fs = std::move(res).value(); |
1644 | 26 | } |
1645 | 33 | } else { |
1646 | 0 | DCHECK_EQ(existed_fs->type(), io::FileSystemType::S3) << param.id << ' ' << param.name; |
1647 | 0 | auto client = static_cast<io::S3FileSystem*>(existed_fs.get())->client_holder(); |
1648 | 0 | auto new_s3_conf = S3Conf::get_s3_conf(param.s3_storage_param); |
1649 | 0 | S3ClientConf conf = std::move(new_s3_conf.client_conf); |
1650 | 0 | st = client->reset(conf); |
1651 | 0 | fs = std::move(existed_fs); |
1652 | 0 | } |
1653 | | |
1654 | 33 | if (!st.ok()) { |
1655 | 7 | LOG(WARNING) << "update s3 resource failed: " << st; |
1656 | 26 | } else { |
1657 | 26 | LOG_INFO("successfully update s3 resource") |
1658 | 26 | .tag("resource_id", param.id) |
1659 | 26 | .tag("resource_name", param.name); |
1660 | 26 | put_storage_resource(param.id, {std::move(fs)}, param.version); |
1661 | 26 | } |
1662 | 33 | } |
1663 | | |
1664 | 34 | void update_hdfs_resource(const TStorageResource& param, io::RemoteFileSystemSPtr existed_fs) { |
1665 | 34 | Status st; |
1666 | 34 | io::RemoteFileSystemSPtr fs; |
1667 | 34 | std::string root_path = |
1668 | 34 | param.hdfs_storage_param.__isset.root_path ? param.hdfs_storage_param.root_path : ""; |
1669 | | |
1670 | 34 | if (!existed_fs) { |
1671 | | // No such FS instance on BE |
1672 | 34 | auto res = io::HdfsFileSystem::create( |
1673 | 34 | param.hdfs_storage_param, param.hdfs_storage_param.fs_name, |
1674 | 34 | std::to_string(param.id), nullptr, std::move(root_path)); |
1675 | 34 | if (!res.has_value()) { |
1676 | 14 | st = std::move(res).error(); |
1677 | 20 | } else { |
1678 | 20 | fs = std::move(res).value(); |
1679 | 20 | } |
1680 | | |
1681 | 34 | } else { |
1682 | 0 | DCHECK_EQ(existed_fs->type(), io::FileSystemType::HDFS) << param.id << ' ' << param.name; |
1683 | | // TODO(plat1ko): update hdfs conf |
1684 | 0 | fs = std::move(existed_fs); |
1685 | 0 | } |
1686 | | |
1687 | 34 | if (!st.ok()) { |
1688 | 14 | LOG(WARNING) << "update hdfs resource failed: " << st; |
1689 | 20 | } else { |
1690 | 20 | LOG_INFO("successfully update hdfs resource") |
1691 | 20 | .tag("resource_id", param.id) |
1692 | 20 | .tag("resource_name", param.name) |
1693 | 20 | .tag("root_path", fs->root_path().string()); |
1694 | 20 | put_storage_resource(param.id, {std::move(fs)}, param.version); |
1695 | 20 | } |
1696 | 34 | } |
1697 | | |
1698 | | } // namespace |
1699 | | |
1700 | 9 | void push_storage_policy_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
1701 | 9 | const auto& push_storage_policy_req = req.push_storage_policy_req; |
1702 | | // refresh resource |
1703 | 67 | for (auto&& param : push_storage_policy_req.resource) { |
1704 | 67 | io::RemoteFileSystemSPtr fs; |
1705 | 67 | if (auto existed_resource = get_storage_resource(param.id); existed_resource) { |
1706 | 0 | if (existed_resource->second >= param.version) { |
1707 | | // Stale request, ignore |
1708 | 0 | continue; |
1709 | 0 | } |
1710 | | |
1711 | 0 | fs = std::move(existed_resource->first.fs); |
1712 | 0 | } |
1713 | | |
1714 | 67 | if (param.__isset.s3_storage_param) { |
1715 | 33 | update_s3_resource(param, std::move(fs)); |
1716 | 34 | } else if (param.__isset.hdfs_storage_param) { |
1717 | 34 | update_hdfs_resource(param, std::move(fs)); |
1718 | 34 | } else { |
1719 | 0 | LOG(WARNING) << "unknown resource=" << param; |
1720 | 0 | } |
1721 | 67 | } |
1722 | | // drop storage policy |
1723 | 9 | for (auto policy_id : push_storage_policy_req.dropped_storage_policy) { |
1724 | 0 | delete_storage_policy(policy_id); |
1725 | 0 | } |
1726 | | // refresh storage policy |
1727 | 24 | for (auto&& storage_policy : push_storage_policy_req.storage_policy) { |
1728 | 24 | auto existed_storage_policy = get_storage_policy(storage_policy.id); |
1729 | 24 | if (existed_storage_policy == nullptr || |
1730 | 24 | existed_storage_policy->version < storage_policy.version) { |
1731 | 24 | auto storage_policy1 = std::make_shared<StoragePolicy>(); |
1732 | 24 | storage_policy1->name = storage_policy.name; |
1733 | 24 | storage_policy1->version = storage_policy.version; |
1734 | 24 | storage_policy1->cooldown_datetime = storage_policy.cooldown_datetime; |
1735 | 24 | storage_policy1->cooldown_ttl = storage_policy.cooldown_ttl; |
1736 | 24 | storage_policy1->resource_id = storage_policy.resource_id; |
1737 | 24 | LOG_INFO("successfully update storage policy") |
1738 | 24 | .tag("storage_policy_id", storage_policy.id) |
1739 | 24 | .tag("storage_policy", storage_policy1->to_string()); |
1740 | 24 | put_storage_policy(storage_policy.id, std::move(storage_policy1)); |
1741 | 24 | } |
1742 | 24 | } |
1743 | 9 | } |
1744 | | |
1745 | 16 | void push_index_policy_callback(const TAgentTaskRequest& req) { |
1746 | 16 | const auto& request = req.push_index_policy_req; |
1747 | 16 | doris::ExecEnv::GetInstance()->index_policy_mgr()->apply_policy_changes( |
1748 | 16 | request.index_policys, request.dropped_index_policys); |
1749 | 16 | } |
1750 | | |
1751 | 2 | void push_cooldown_conf_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
1752 | 2 | const auto& push_cooldown_conf_req = req.push_cooldown_conf; |
1753 | 326 | for (const auto& cooldown_conf : push_cooldown_conf_req.cooldown_confs) { |
1754 | 326 | int64_t tablet_id = cooldown_conf.tablet_id; |
1755 | 326 | TabletSharedPtr tablet = engine.tablet_manager()->get_tablet(tablet_id); |
1756 | 326 | if (tablet == nullptr) { |
1757 | 0 | LOG(WARNING) << "failed to get tablet. tablet_id=" << tablet_id; |
1758 | 0 | continue; |
1759 | 0 | } |
1760 | 326 | if (tablet->update_cooldown_conf(cooldown_conf.cooldown_term, |
1761 | 326 | cooldown_conf.cooldown_replica_id) && |
1762 | 326 | cooldown_conf.cooldown_replica_id == tablet->replica_id() && |
1763 | 326 | tablet->tablet_meta()->cooldown_meta_id().initialized()) { |
1764 | 2 | Tablet::async_write_cooldown_meta(tablet); |
1765 | 2 | } |
1766 | 326 | } |
1767 | 2 | } |
1768 | | |
1769 | 6.23k | void create_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
1770 | 6.23k | const auto& create_tablet_req = req.create_tablet_req; |
1771 | 6.23k | RuntimeProfile runtime_profile("CreateTablet"); |
1772 | 6.23k | RuntimeProfile* profile = &runtime_profile; |
1773 | 6.23k | MonotonicStopWatch watch; |
1774 | 6.23k | watch.start(); |
1775 | 6.23k | Defer defer = [&] { |
1776 | 6.23k | auto elapsed_time = static_cast<double>(watch.elapsed_time()); |
1777 | 6.23k | if (elapsed_time / 1e9 > config::agent_task_trace_threshold_sec) { |
1778 | 0 | COUNTER_UPDATE(profile->total_time_counter(), elapsed_time); |
1779 | 0 | std::stringstream ss; |
1780 | 0 | profile->pretty_print(&ss); |
1781 | 0 | LOG(WARNING) << "create tablet cost(s) " << elapsed_time / 1e9 << std::endl << ss.str(); |
1782 | 0 | } |
1783 | 6.23k | }; |
1784 | 6.23k | DorisMetrics::instance()->create_tablet_requests_total->increment(1); |
1785 | 6.23k | VLOG_NOTICE << "start to create tablet " << create_tablet_req.tablet_id; |
1786 | | |
1787 | 6.23k | std::vector<TTabletInfo> finish_tablet_infos; |
1788 | 6.23k | VLOG_NOTICE << "create tablet: " << create_tablet_req; |
1789 | 6.23k | Status status = engine.create_tablet(create_tablet_req, profile); |
1790 | 6.23k | if (!status.ok()) { |
1791 | 0 | DorisMetrics::instance()->create_tablet_requests_failed->increment(1); |
1792 | 0 | LOG_WARNING("failed to create tablet, reason={}", status.to_string()) |
1793 | 0 | .tag("signature", req.signature) |
1794 | 0 | .tag("tablet_id", create_tablet_req.tablet_id) |
1795 | 0 | .error(status); |
1796 | 6.23k | } else { |
1797 | 6.23k | increase_report_version(); |
1798 | | // get path hash of the created tablet |
1799 | 6.23k | TabletSharedPtr tablet; |
1800 | 6.23k | { |
1801 | 6.23k | SCOPED_TIMER(ADD_TIMER(profile, "GetTablet")); |
1802 | 6.23k | tablet = engine.tablet_manager()->get_tablet(create_tablet_req.tablet_id); |
1803 | 6.23k | } |
1804 | 6.23k | DCHECK(tablet != nullptr); |
1805 | 6.23k | TTabletInfo tablet_info; |
1806 | 6.23k | tablet_info.tablet_id = tablet->tablet_id(); |
1807 | 6.23k | tablet_info.schema_hash = tablet->schema_hash(); |
1808 | 6.23k | tablet_info.version = create_tablet_req.version; |
1809 | | // Useless but it is a required field in TTabletInfo |
1810 | 6.23k | tablet_info.version_hash = 0; |
1811 | 6.23k | tablet_info.row_count = 0; |
1812 | 6.23k | tablet_info.data_size = 0; |
1813 | 6.23k | tablet_info.__set_path_hash(tablet->data_dir()->path_hash()); |
1814 | 6.23k | tablet_info.__set_replica_id(tablet->replica_id()); |
1815 | 6.23k | finish_tablet_infos.push_back(tablet_info); |
1816 | 6.23k | LOG_INFO("successfully create tablet") |
1817 | 6.23k | .tag("signature", req.signature) |
1818 | 6.23k | .tag("tablet_id", create_tablet_req.tablet_id); |
1819 | 6.23k | } |
1820 | 6.23k | TFinishTaskRequest finish_task_request; |
1821 | 6.23k | finish_task_request.__set_finish_tablet_infos(finish_tablet_infos); |
1822 | 6.23k | finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
1823 | 6.23k | finish_task_request.__set_report_version(s_report_version); |
1824 | 6.23k | finish_task_request.__set_task_type(req.task_type); |
1825 | 6.23k | finish_task_request.__set_signature(req.signature); |
1826 | 6.23k | finish_task_request.__set_task_status(status.to_thrift()); |
1827 | 6.23k | finish_task(finish_task_request); |
1828 | 6.23k | remove_task_info(req.task_type, req.signature); |
1829 | 6.23k | } |
1830 | | |
1831 | 11.3k | void drop_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
1832 | 11.3k | const auto& drop_tablet_req = req.drop_tablet_req; |
1833 | 11.3k | Status status; |
1834 | 11.3k | auto dropped_tablet = engine.tablet_manager()->get_tablet(drop_tablet_req.tablet_id, false); |
1835 | 11.3k | if (dropped_tablet != nullptr) { |
1836 | 11.3k | status = engine.tablet_manager()->drop_tablet(drop_tablet_req.tablet_id, |
1837 | 11.3k | drop_tablet_req.replica_id, |
1838 | 11.3k | drop_tablet_req.is_drop_table_or_partition); |
1839 | 11.3k | } else { |
1840 | 0 | status = Status::NotFound("could not find tablet {}", drop_tablet_req.tablet_id); |
1841 | 0 | } |
1842 | 11.3k | if (status.ok()) { |
1843 | | // if tablet is dropped by fe, then the related txn should also be removed |
1844 | 11.3k | engine.txn_manager()->force_rollback_tablet_related_txns( |
1845 | 11.3k | dropped_tablet->data_dir()->get_meta(), drop_tablet_req.tablet_id, |
1846 | 11.3k | dropped_tablet->tablet_uid()); |
1847 | 11.3k | LOG_INFO("successfully drop tablet") |
1848 | 11.3k | .tag("signature", req.signature) |
1849 | 11.3k | .tag("tablet_id", drop_tablet_req.tablet_id) |
1850 | 11.3k | .tag("replica_id", drop_tablet_req.replica_id); |
1851 | 11.3k | } else { |
1852 | 0 | LOG_WARNING("failed to drop tablet") |
1853 | 0 | .tag("signature", req.signature) |
1854 | 0 | .tag("tablet_id", drop_tablet_req.tablet_id) |
1855 | 0 | .tag("replica_id", drop_tablet_req.replica_id) |
1856 | 0 | .error(status); |
1857 | 0 | } |
1858 | | |
1859 | 11.3k | TFinishTaskRequest finish_task_request; |
1860 | 11.3k | finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
1861 | 11.3k | finish_task_request.__set_task_type(req.task_type); |
1862 | 11.3k | finish_task_request.__set_signature(req.signature); |
1863 | 11.3k | finish_task_request.__set_task_status(status.to_thrift()); |
1864 | | |
1865 | 11.3k | TTabletInfo tablet_info; |
1866 | 11.3k | tablet_info.tablet_id = drop_tablet_req.tablet_id; |
1867 | 11.3k | tablet_info.schema_hash = drop_tablet_req.schema_hash; |
1868 | 11.3k | tablet_info.version = 0; |
1869 | | // Useless but it is a required field in TTabletInfo |
1870 | 11.3k | tablet_info.version_hash = 0; |
1871 | 11.3k | tablet_info.row_count = 0; |
1872 | 11.3k | tablet_info.data_size = 0; |
1873 | | |
1874 | 11.3k | finish_task_request.__set_finish_tablet_infos({tablet_info}); |
1875 | 11.3k | LOG_INFO("successfully drop tablet") |
1876 | 11.3k | .tag("signature", req.signature) |
1877 | 11.3k | .tag("tablet_id", drop_tablet_req.tablet_id); |
1878 | | |
1879 | 11.3k | finish_task(finish_task_request); |
1880 | 11.3k | remove_task_info(req.task_type, req.signature); |
1881 | 11.3k | } |
1882 | | |
1883 | 0 | void drop_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) { |
1884 | 0 | const auto& drop_tablet_req = req.drop_tablet_req; |
1885 | | // here drop_tablet_req.tablet_id is the signature of the task, see DropReplicaTask in fe |
1886 | 0 | Defer defer = [&] { remove_task_info(req.task_type, req.signature); }; |
1887 | 0 | DBUG_EXECUTE_IF("WorkPoolCloudDropTablet.drop_tablet_callback.failed", { |
1888 | 0 | LOG_WARNING("WorkPoolCloudDropTablet.drop_tablet_callback.failed") |
1889 | 0 | .tag("tablet_id", drop_tablet_req.tablet_id); |
1890 | 0 | return; |
1891 | 0 | }); |
1892 | 0 | MonotonicStopWatch watch; |
1893 | 0 | watch.start(); |
1894 | 0 | auto weak_tablets = engine.tablet_mgr().get_weak_tablets(); |
1895 | 0 | std::ostringstream rowset_ids_stream; |
1896 | 0 | bool found = false; |
1897 | 0 | for (auto& weak_tablet : weak_tablets) { |
1898 | 0 | auto tablet = weak_tablet.lock(); |
1899 | 0 | if (tablet == nullptr) { |
1900 | 0 | continue; |
1901 | 0 | } |
1902 | 0 | if (tablet->tablet_id() != drop_tablet_req.tablet_id) { |
1903 | 0 | continue; |
1904 | 0 | } |
1905 | 0 | found = true; |
1906 | 0 | auto clean_rowsets = tablet->get_snapshot_rowset(true); |
1907 | | // Get first 10 rowset IDs as comma-separated string, just for log |
1908 | 0 | int count = 0; |
1909 | 0 | for (const auto& rowset : clean_rowsets) { |
1910 | 0 | if (count >= 10) break; |
1911 | 0 | if (count > 0) { |
1912 | 0 | rowset_ids_stream << ","; |
1913 | 0 | } |
1914 | 0 | rowset_ids_stream << rowset->rowset_id().to_string(); |
1915 | 0 | count++; |
1916 | 0 | } |
1917 | |
|
1918 | 0 | CloudTablet::recycle_cached_data(clean_rowsets); |
1919 | 0 | break; |
1920 | 0 | } |
1921 | |
|
1922 | 0 | if (!found) { |
1923 | 0 | LOG(WARNING) << "tablet not found when dropping tablet_id=" << drop_tablet_req.tablet_id |
1924 | 0 | << ", cost " << static_cast<double>(watch.elapsed_time()) / 1e9 << "(s)"; |
1925 | 0 | return; |
1926 | 0 | } |
1927 | | |
1928 | 0 | engine.tablet_mgr().erase_tablet(drop_tablet_req.tablet_id); |
1929 | 0 | LOG(INFO) << "drop cloud tablet_id=" << drop_tablet_req.tablet_id |
1930 | 0 | << " and clean file cache first 10 rowsets {" << rowset_ids_stream.str() << "}, cost " |
1931 | 0 | << static_cast<double>(watch.elapsed_time()) / 1e9 << "(s)"; |
1932 | 0 | } |
1933 | | |
1934 | 22 | void push_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
1935 | 22 | const auto& push_req = req.push_req; |
1936 | | |
1937 | 22 | LOG(INFO) << "get push task. signature=" << req.signature |
1938 | 22 | << " push_type=" << push_req.push_type; |
1939 | 22 | std::vector<TTabletInfo> tablet_infos; |
1940 | | |
1941 | | // exist a path task_worker_pool <- agent_server <- backend_service <- BackendService |
1942 | | // use the arg BackendService_submit_tasks_args.tasks is not const |
1943 | | // and push_req will be modify, so modify is ok |
1944 | 22 | EngineBatchLoadTask engine_task(engine, const_cast<TPushReq&>(push_req), &tablet_infos); |
1945 | 22 | SCOPED_ATTACH_TASK(engine_task.mem_tracker()); |
1946 | 22 | auto status = engine_task.execute(); |
1947 | | |
1948 | | // Return result to fe |
1949 | 22 | TFinishTaskRequest finish_task_request; |
1950 | 22 | finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
1951 | 22 | finish_task_request.__set_task_type(req.task_type); |
1952 | 22 | finish_task_request.__set_signature(req.signature); |
1953 | 22 | if (push_req.push_type == TPushType::DELETE) { |
1954 | 22 | finish_task_request.__set_request_version(push_req.version); |
1955 | 22 | } |
1956 | | |
1957 | 22 | if (status.ok()) { |
1958 | 22 | LOG_INFO("successfully execute push task") |
1959 | 22 | .tag("signature", req.signature) |
1960 | 22 | .tag("tablet_id", push_req.tablet_id) |
1961 | 22 | .tag("push_type", push_req.push_type); |
1962 | 22 | increase_report_version(); |
1963 | 22 | finish_task_request.__set_finish_tablet_infos(tablet_infos); |
1964 | 22 | } else { |
1965 | 0 | LOG_WARNING("failed to execute push task") |
1966 | 0 | .tag("signature", req.signature) |
1967 | 0 | .tag("tablet_id", push_req.tablet_id) |
1968 | 0 | .tag("push_type", push_req.push_type) |
1969 | 0 | .error(status); |
1970 | 0 | } |
1971 | 22 | finish_task_request.__set_task_status(status.to_thrift()); |
1972 | 22 | finish_task_request.__set_report_version(s_report_version); |
1973 | | |
1974 | 22 | finish_task(finish_task_request); |
1975 | 22 | remove_task_info(req.task_type, req.signature); |
1976 | 22 | } |
1977 | | |
1978 | 3.22k | void cloud_push_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) { |
1979 | 3.22k | const auto& push_req = req.push_req; |
1980 | | |
1981 | 3.22k | LOG(INFO) << "get push task. signature=" << req.signature |
1982 | 3.22k | << " push_type=" << push_req.push_type; |
1983 | | |
1984 | | // Return result to fe |
1985 | 3.22k | TFinishTaskRequest finish_task_request; |
1986 | 3.22k | finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
1987 | 3.22k | finish_task_request.__set_task_type(req.task_type); |
1988 | 3.22k | finish_task_request.__set_signature(req.signature); |
1989 | | |
1990 | | // Only support DELETE in cloud mode now |
1991 | 3.22k | if (push_req.push_type != TPushType::DELETE) { |
1992 | 0 | finish_task_request.__set_task_status( |
1993 | 0 | Status::NotSupported("push_type {} not is supported", |
1994 | 0 | std::to_string(push_req.push_type)) |
1995 | 0 | .to_thrift()); |
1996 | 0 | return; |
1997 | 0 | } |
1998 | | |
1999 | 3.22k | finish_task_request.__set_request_version(push_req.version); |
2000 | | |
2001 | 3.22k | DorisMetrics::instance()->delete_requests_total->increment(1); |
2002 | 3.22k | auto st = CloudDeleteTask::execute(engine, req.push_req); |
2003 | 3.22k | if (st.ok()) { |
2004 | 3.21k | LOG_INFO("successfully execute push task") |
2005 | 3.21k | .tag("signature", req.signature) |
2006 | 3.21k | .tag("tablet_id", push_req.tablet_id) |
2007 | 3.21k | .tag("push_type", push_req.push_type); |
2008 | 3.21k | increase_report_version(); |
2009 | 3.21k | auto& tablet_info = finish_task_request.finish_tablet_infos.emplace_back(); |
2010 | | // Just need tablet_id |
2011 | 3.21k | tablet_info.tablet_id = push_req.tablet_id; |
2012 | 3.21k | finish_task_request.__isset.finish_tablet_infos = true; |
2013 | 3.21k | } else { |
2014 | 6 | DorisMetrics::instance()->delete_requests_failed->increment(1); |
2015 | 6 | LOG_WARNING("failed to execute push task") |
2016 | 6 | .tag("signature", req.signature) |
2017 | 6 | .tag("tablet_id", push_req.tablet_id) |
2018 | 6 | .tag("push_type", push_req.push_type) |
2019 | 6 | .error(st); |
2020 | 6 | } |
2021 | | |
2022 | 3.22k | finish_task_request.__set_task_status(st.to_thrift()); |
2023 | 3.22k | finish_task_request.__set_report_version(s_report_version); |
2024 | | |
2025 | 3.22k | finish_task(finish_task_request); |
2026 | 3.22k | remove_task_info(req.task_type, req.signature); |
2027 | 3.22k | } |
2028 | | |
2029 | | PublishVersionWorkerPool::PublishVersionWorkerPool(StorageEngine& engine) |
2030 | 6 | : TaskWorkerPool("PUBLISH_VERSION", config::publish_version_worker_count, |
2031 | 2.74k | [this](const TAgentTaskRequest& task) { publish_version_callback(task); }), |
2032 | 6 | _engine(engine) {} |
2033 | | |
2034 | | PublishVersionWorkerPool::~PublishVersionWorkerPool() = default; |
2035 | | |
2036 | 2.74k | void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest& req) { |
2037 | 2.74k | const auto& publish_version_req = req.publish_version_req; |
2038 | 2.74k | DorisMetrics::instance()->publish_task_request_total->increment(1); |
2039 | 2.74k | VLOG_NOTICE << "get publish version task. signature=" << req.signature; |
2040 | | |
2041 | 2.74k | std::set<TTabletId> error_tablet_ids; |
2042 | 2.74k | std::map<TTabletId, TVersion> succ_tablets; |
2043 | | // partition_id, tablet_id, publish_version |
2044 | 2.74k | std::vector<std::tuple<int64_t, int64_t, int64_t>> discontinuous_version_tablets; |
2045 | 2.74k | std::map<TTableId, std::map<TTabletId, int64_t>> table_id_to_tablet_id_to_num_delta_rows; |
2046 | 2.74k | uint32_t retry_time = 0; |
2047 | 2.74k | Status status; |
2048 | 2.74k | constexpr uint32_t PUBLISH_VERSION_MAX_RETRY = 3; |
2049 | 2.74k | while (retry_time < PUBLISH_VERSION_MAX_RETRY) { |
2050 | 2.74k | succ_tablets.clear(); |
2051 | 2.74k | error_tablet_ids.clear(); |
2052 | 2.74k | table_id_to_tablet_id_to_num_delta_rows.clear(); |
2053 | 2.74k | EnginePublishVersionTask engine_task(_engine, publish_version_req, &error_tablet_ids, |
2054 | 2.74k | &succ_tablets, &discontinuous_version_tablets, |
2055 | 2.74k | &table_id_to_tablet_id_to_num_delta_rows); |
2056 | 2.74k | SCOPED_ATTACH_TASK(engine_task.mem_tracker()); |
2057 | 2.74k | status = engine_task.execute(); |
2058 | 2.74k | if (status.ok()) { |
2059 | 2.74k | break; |
2060 | 2.74k | } |
2061 | | |
2062 | 0 | if (status.is<PUBLISH_VERSION_NOT_CONTINUOUS>()) { |
2063 | | // there are too many missing versions, it has been be added to async |
2064 | | // publish task, so no need to retry here. |
2065 | 0 | if (discontinuous_version_tablets.empty()) { |
2066 | 0 | break; |
2067 | 0 | } |
2068 | 0 | LOG_EVERY_SECOND(INFO) << "wait for previous publish version task to be done, " |
2069 | 0 | << "transaction_id: " << publish_version_req.transaction_id; |
2070 | |
|
2071 | 0 | int64_t time_elapsed = time(nullptr) - req.recv_time; |
2072 | 0 | if (time_elapsed > config::publish_version_task_timeout_s) { |
2073 | 0 | LOG(INFO) << "task elapsed " << time_elapsed |
2074 | 0 | << " seconds since it is inserted to queue, it is timeout"; |
2075 | 0 | break; |
2076 | 0 | } |
2077 | | |
2078 | | // Version not continuous, put to queue and wait pre version publish task execute |
2079 | 0 | PUBLISH_VERSION_count << 1; |
2080 | 0 | auto st = _thread_pool->submit_func([this, req] { |
2081 | 0 | this->publish_version_callback(req); |
2082 | 0 | PUBLISH_VERSION_count << -1; |
2083 | 0 | }); |
2084 | 0 | if (!st.ok()) [[unlikely]] { |
2085 | 0 | PUBLISH_VERSION_count << -1; |
2086 | 0 | status = std::move(st); |
2087 | 0 | } else { |
2088 | 0 | return; |
2089 | 0 | } |
2090 | 0 | } |
2091 | | |
2092 | 0 | LOG_WARNING("failed to publish version") |
2093 | 0 | .tag("transaction_id", publish_version_req.transaction_id) |
2094 | 0 | .tag("error_tablets_num", error_tablet_ids.size()) |
2095 | 0 | .tag("retry_time", retry_time) |
2096 | 0 | .error(status); |
2097 | 0 | ++retry_time; |
2098 | 0 | } |
2099 | | |
2100 | 2.74k | for (auto& item : discontinuous_version_tablets) { |
2101 | 0 | _engine.add_async_publish_task(std::get<0>(item), std::get<1>(item), std::get<2>(item), |
2102 | 0 | publish_version_req.transaction_id, false); |
2103 | 0 | } |
2104 | 2.74k | TFinishTaskRequest finish_task_request; |
2105 | 2.74k | if (!status.ok()) [[unlikely]] { |
2106 | 0 | DorisMetrics::instance()->publish_task_failed_total->increment(1); |
2107 | | // if publish failed, return failed, FE will ignore this error and |
2108 | | // check error tablet ids and FE will also republish this task |
2109 | 0 | LOG_WARNING("failed to publish version") |
2110 | 0 | .tag("signature", req.signature) |
2111 | 0 | .tag("transaction_id", publish_version_req.transaction_id) |
2112 | 0 | .tag("error_tablets_num", error_tablet_ids.size()) |
2113 | 0 | .error(status); |
2114 | 2.74k | } else { |
2115 | 2.74k | if (!config::disable_auto_compaction && |
2116 | 2.74k | (!config::enable_compaction_pause_on_high_memory || |
2117 | 2.74k | !GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE))) { |
2118 | 16.8k | for (auto [tablet_id, _] : succ_tablets) { |
2119 | 16.8k | TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id); |
2120 | 16.8k | if (tablet != nullptr) { |
2121 | 16.8k | if (!tablet->tablet_meta()->tablet_schema()->disable_auto_compaction()) { |
2122 | 16.8k | tablet->published_count.fetch_add(1); |
2123 | 16.8k | int64_t published_count = tablet->published_count.load(); |
2124 | 16.8k | int32_t max_version_config = tablet->max_version_config(); |
2125 | 16.8k | if (tablet->exceed_version_limit( |
2126 | 16.8k | max_version_config * |
2127 | 16.8k | config::load_trigger_compaction_version_percent / 100) && |
2128 | 16.8k | published_count % 20 == 0) { |
2129 | 0 | auto st = _engine.submit_compaction_task( |
2130 | 0 | tablet, CompactionType::CUMULATIVE_COMPACTION, true, false); |
2131 | 0 | if (!st.ok()) [[unlikely]] { |
2132 | 0 | LOG(WARNING) << "trigger compaction failed, tablet_id=" << tablet_id |
2133 | 0 | << ", published=" << published_count << " : " << st; |
2134 | 0 | } else { |
2135 | 0 | LOG(INFO) << "trigger compaction succ, tablet_id:" << tablet_id |
2136 | 0 | << ", published:" << published_count; |
2137 | 0 | } |
2138 | 0 | } |
2139 | 16.8k | } |
2140 | 16.8k | } else { |
2141 | 0 | LOG(WARNING) << "trigger compaction failed, tablet_id:" << tablet_id; |
2142 | 0 | } |
2143 | 16.8k | } |
2144 | 2.74k | } |
2145 | 2.74k | int64_t cost_second = time(nullptr) - req.recv_time; |
2146 | 2.74k | g_publish_version_latency << cost_second; |
2147 | 2.74k | LOG_INFO("successfully publish version") |
2148 | 2.74k | .tag("signature", req.signature) |
2149 | 2.74k | .tag("transaction_id", publish_version_req.transaction_id) |
2150 | 2.74k | .tag("tablets_num", succ_tablets.size()) |
2151 | 2.74k | .tag("cost(s)", cost_second); |
2152 | 2.74k | } |
2153 | | |
2154 | 2.74k | status.to_thrift(&finish_task_request.task_status); |
2155 | 2.74k | finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
2156 | 2.74k | finish_task_request.__set_task_type(req.task_type); |
2157 | 2.74k | finish_task_request.__set_signature(req.signature); |
2158 | 2.74k | finish_task_request.__set_report_version(s_report_version); |
2159 | 2.74k | finish_task_request.__set_succ_tablets(succ_tablets); |
2160 | 2.74k | finish_task_request.__set_error_tablet_ids( |
2161 | 2.74k | std::vector<TTabletId>(error_tablet_ids.begin(), error_tablet_ids.end())); |
2162 | 2.74k | finish_task_request.__set_table_id_to_tablet_id_to_delta_num_rows( |
2163 | 2.74k | table_id_to_tablet_id_to_num_delta_rows); |
2164 | 2.74k | finish_task(finish_task_request); |
2165 | 2.74k | remove_task_info(req.task_type, req.signature); |
2166 | 2.74k | } |
2167 | | |
2168 | 14 | void clear_transaction_task_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
2169 | 14 | const auto& clear_transaction_task_req = req.clear_transaction_task_req; |
2170 | 14 | LOG(INFO) << "get clear transaction task. signature=" << req.signature |
2171 | 14 | << ", transaction_id=" << clear_transaction_task_req.transaction_id |
2172 | 14 | << ", partition_id_size=" << clear_transaction_task_req.partition_id.size(); |
2173 | | |
2174 | 14 | Status status; |
2175 | | |
2176 | 14 | if (clear_transaction_task_req.transaction_id > 0) { |
2177 | | // transaction_id should be greater than zero. |
2178 | | // If it is not greater than zero, no need to execute |
2179 | | // the following clear_transaction_task() function. |
2180 | 14 | if (!clear_transaction_task_req.partition_id.empty()) { |
2181 | 2 | engine.clear_transaction_task(clear_transaction_task_req.transaction_id, |
2182 | 2 | clear_transaction_task_req.partition_id); |
2183 | 12 | } else { |
2184 | 12 | engine.clear_transaction_task(clear_transaction_task_req.transaction_id); |
2185 | 12 | } |
2186 | 14 | LOG(INFO) << "finish to clear transaction task. signature=" << req.signature |
2187 | 14 | << ", transaction_id=" << clear_transaction_task_req.transaction_id; |
2188 | 14 | } else { |
2189 | 0 | LOG(WARNING) << "invalid transaction id " << clear_transaction_task_req.transaction_id |
2190 | 0 | << ". signature= " << req.signature; |
2191 | 0 | } |
2192 | | |
2193 | 14 | TFinishTaskRequest finish_task_request; |
2194 | 14 | finish_task_request.__set_task_status(status.to_thrift()); |
2195 | 14 | finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
2196 | 14 | finish_task_request.__set_task_type(req.task_type); |
2197 | 14 | finish_task_request.__set_signature(req.signature); |
2198 | | |
2199 | 14 | finish_task(finish_task_request); |
2200 | 14 | remove_task_info(req.task_type, req.signature); |
2201 | 14 | } |
2202 | | |
2203 | 0 | void alter_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
2204 | 0 | int64_t signature = req.signature; |
2205 | 0 | LOG(INFO) << "get alter table task, signature: " << signature; |
2206 | 0 | bool is_task_timeout = false; |
2207 | 0 | if (req.__isset.recv_time) { |
2208 | 0 | int64_t time_elapsed = time(nullptr) - req.recv_time; |
2209 | 0 | if (time_elapsed > config::report_task_interval_seconds * 20) { |
2210 | 0 | LOG(INFO) << "task elapsed " << time_elapsed |
2211 | 0 | << " seconds since it is inserted to queue, it is timeout"; |
2212 | 0 | is_task_timeout = true; |
2213 | 0 | } |
2214 | 0 | } |
2215 | 0 | if (!is_task_timeout) { |
2216 | 0 | TFinishTaskRequest finish_task_request; |
2217 | 0 | TTaskType::type task_type = req.task_type; |
2218 | 0 | alter_tablet(engine, req, signature, task_type, &finish_task_request); |
2219 | 0 | finish_task(finish_task_request); |
2220 | 0 | } |
2221 | 0 | doris::g_fragment_executing_count << -1; |
2222 | 0 | int64_t now = duration_cast<std::chrono::milliseconds>( |
2223 | 0 | std::chrono::system_clock::now().time_since_epoch()) |
2224 | 0 | .count(); |
2225 | 0 | g_fragment_last_active_time.set_value(now); |
2226 | 0 | remove_task_info(req.task_type, req.signature); |
2227 | 0 | } |
2228 | | |
2229 | 10.6k | void alter_cloud_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) { |
2230 | 10.6k | int64_t signature = req.signature; |
2231 | 10.6k | LOG(INFO) << "get alter table task, signature: " << signature; |
2232 | 10.6k | bool is_task_timeout = false; |
2233 | 10.6k | if (req.__isset.recv_time) { |
2234 | 10.6k | int64_t time_elapsed = time(nullptr) - req.recv_time; |
2235 | 10.6k | if (time_elapsed > config::report_task_interval_seconds * 20) { |
2236 | 0 | LOG(INFO) << "task elapsed " << time_elapsed |
2237 | 0 | << " seconds since it is inserted to queue, it is timeout"; |
2238 | 0 | is_task_timeout = true; |
2239 | 0 | } |
2240 | 10.6k | } |
2241 | 10.6k | if (!is_task_timeout) { |
2242 | 10.6k | TFinishTaskRequest finish_task_request; |
2243 | 10.6k | TTaskType::type task_type = req.task_type; |
2244 | 10.6k | alter_cloud_tablet(engine, req, signature, task_type, &finish_task_request); |
2245 | 10.6k | finish_task(finish_task_request); |
2246 | 10.6k | } |
2247 | 10.6k | doris::g_fragment_executing_count << -1; |
2248 | 10.6k | int64_t now = duration_cast<std::chrono::milliseconds>( |
2249 | 10.6k | std::chrono::system_clock::now().time_since_epoch()) |
2250 | 10.6k | .count(); |
2251 | 10.6k | g_fragment_last_active_time.set_value(now); |
2252 | | |
2253 | | // Clean up alter_version before remove_task_info to avoid race: |
2254 | | // remove_task_info allows same-signature re-submit, whose pre_submit_callback |
2255 | | // would set alter_version, then this cleanup would wipe it. |
2256 | 10.6k | if (req.__isset.alter_tablet_req_v2) { |
2257 | 10.6k | const auto& alter_req = req.alter_tablet_req_v2; |
2258 | 10.6k | auto new_tablet = engine.tablet_mgr().get_tablet(alter_req.new_tablet_id); |
2259 | 10.6k | auto base_tablet = engine.tablet_mgr().get_tablet(alter_req.base_tablet_id); |
2260 | 10.6k | if (new_tablet.has_value()) { |
2261 | 10.6k | new_tablet.value()->set_alter_version(-1); |
2262 | 10.6k | } |
2263 | 10.6k | if (base_tablet.has_value()) { |
2264 | 10.6k | base_tablet.value()->set_alter_version(-1); |
2265 | 10.6k | } |
2266 | 10.6k | } |
2267 | | |
2268 | 10.6k | remove_task_info(req.task_type, req.signature); |
2269 | 10.6k | } |
2270 | | |
2271 | 10.6k | void set_alter_version_before_enqueue(CloudStorageEngine& engine, const TAgentTaskRequest& req) { |
2272 | 10.6k | if (!req.__isset.alter_tablet_req_v2) { |
2273 | 0 | return; |
2274 | 0 | } |
2275 | 10.6k | const auto& alter_req = req.alter_tablet_req_v2; |
2276 | 10.6k | if (alter_req.alter_version <= 1) { |
2277 | 5.43k | return; |
2278 | 5.43k | } |
2279 | 5.22k | auto new_tablet = engine.tablet_mgr().get_tablet(alter_req.new_tablet_id); |
2280 | 5.22k | if (!new_tablet.has_value() || new_tablet.value()->tablet_state() == TABLET_RUNNING) { |
2281 | 54 | return; |
2282 | 54 | } |
2283 | 5.17k | auto base_tablet = engine.tablet_mgr().get_tablet(alter_req.base_tablet_id); |
2284 | 5.17k | if (!base_tablet.has_value()) { |
2285 | 0 | return; |
2286 | 0 | } |
2287 | 5.17k | new_tablet.value()->set_alter_version(alter_req.alter_version); |
2288 | 5.17k | base_tablet.value()->set_alter_version(alter_req.alter_version); |
2289 | 5.17k | LOG(INFO) << "set alter_version=" << alter_req.alter_version |
2290 | 5.17k | << " before enqueue, base_tablet=" << alter_req.base_tablet_id |
2291 | 5.17k | << ", new_tablet=" << alter_req.new_tablet_id; |
2292 | 5.17k | } |
2293 | | |
2294 | 0 | void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
2295 | 0 | std::unordered_map<int64_t, int64_t> gc_tablet_infos; |
2296 | 0 | if (!req.__isset.gc_binlog_req) { |
2297 | 0 | LOG(WARNING) << "gc binlog task is not valid"; |
2298 | 0 | return; |
2299 | 0 | } |
2300 | 0 | if (!req.gc_binlog_req.__isset.tablet_gc_binlog_infos) { |
2301 | 0 | LOG(WARNING) << "gc binlog task tablet_gc_binlog_infos is not valid"; |
2302 | 0 | return; |
2303 | 0 | } |
2304 | | |
2305 | 0 | const auto& tablet_gc_binlog_infos = req.gc_binlog_req.tablet_gc_binlog_infos; |
2306 | 0 | for (auto&& tablet_info : tablet_gc_binlog_infos) { |
2307 | | // gc_tablet_infos.emplace(tablet_info.tablet_id, tablet_info.schema_hash); |
2308 | 0 | gc_tablet_infos.emplace(tablet_info.tablet_id, tablet_info.version); |
2309 | 0 | } |
2310 | |
|
2311 | 0 | engine.gc_binlogs(gc_tablet_infos); |
2312 | 0 | } |
2313 | | |
2314 | 2.67k | void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
2315 | 2.67k | const TVisibleVersionReq& visible_version_req = req.visible_version_req; |
2316 | 2.67k | engine.tablet_manager()->update_partitions_visible_version( |
2317 | 2.67k | visible_version_req.partition_version); |
2318 | 2.67k | } |
2319 | | |
2320 | | void clone_callback(StorageEngine& engine, const ClusterInfo* cluster_info, |
2321 | 0 | const TAgentTaskRequest& req) { |
2322 | 0 | const auto& clone_req = req.clone_req; |
2323 | |
|
2324 | 0 | DorisMetrics::instance()->clone_requests_total->increment(1); |
2325 | 0 | LOG(INFO) << "get clone task. signature=" << req.signature; |
2326 | |
|
2327 | 0 | std::vector<TTabletInfo> tablet_infos; |
2328 | 0 | EngineCloneTask engine_task(engine, clone_req, cluster_info, req.signature, &tablet_infos); |
2329 | 0 | SCOPED_ATTACH_TASK(engine_task.mem_tracker()); |
2330 | 0 | auto status = engine_task.execute(); |
2331 | | // Return result to fe |
2332 | 0 | TFinishTaskRequest finish_task_request; |
2333 | 0 | finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
2334 | 0 | finish_task_request.__set_task_type(req.task_type); |
2335 | 0 | finish_task_request.__set_signature(req.signature); |
2336 | 0 | finish_task_request.__set_task_status(status.to_thrift()); |
2337 | |
|
2338 | 0 | if (!status.ok()) { |
2339 | 0 | DorisMetrics::instance()->clone_requests_failed->increment(1); |
2340 | 0 | LOG_WARNING("failed to clone tablet") |
2341 | 0 | .tag("signature", req.signature) |
2342 | 0 | .tag("tablet_id", clone_req.tablet_id) |
2343 | 0 | .error(status); |
2344 | 0 | } else { |
2345 | 0 | LOG_INFO("successfully clone tablet") |
2346 | 0 | .tag("signature", req.signature) |
2347 | 0 | .tag("tablet_id", clone_req.tablet_id) |
2348 | 0 | .tag("copy_size", engine_task.get_copy_size()) |
2349 | 0 | .tag("copy_time_ms", engine_task.get_copy_time_ms()); |
2350 | |
|
2351 | 0 | if (engine_task.is_new_tablet()) { |
2352 | 0 | increase_report_version(); |
2353 | 0 | finish_task_request.__set_report_version(s_report_version); |
2354 | 0 | } |
2355 | 0 | finish_task_request.__set_finish_tablet_infos(tablet_infos); |
2356 | 0 | finish_task_request.__set_copy_size(engine_task.get_copy_size()); |
2357 | 0 | finish_task_request.__set_copy_time_ms(engine_task.get_copy_time_ms()); |
2358 | 0 | } |
2359 | |
|
2360 | 0 | finish_task(finish_task_request); |
2361 | 0 | remove_task_info(req.task_type, req.signature); |
2362 | 0 | } |
2363 | | |
2364 | 0 | void storage_medium_migrate_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
2365 | 0 | const auto& storage_medium_migrate_req = req.storage_medium_migrate_req; |
2366 | | |
2367 | | // check request and get info |
2368 | 0 | TabletSharedPtr tablet; |
2369 | 0 | DataDir* dest_store = nullptr; |
2370 | |
|
2371 | 0 | auto status = check_migrate_request(engine, storage_medium_migrate_req, tablet, &dest_store); |
2372 | 0 | if (status.ok()) { |
2373 | 0 | EngineStorageMigrationTask engine_task(engine, tablet, dest_store); |
2374 | 0 | SCOPED_ATTACH_TASK(engine_task.mem_tracker()); |
2375 | 0 | status = engine_task.execute(); |
2376 | 0 | } |
2377 | | // fe should ignore this err |
2378 | 0 | if (status.is<FILE_ALREADY_EXIST>()) { |
2379 | 0 | status = Status::OK(); |
2380 | 0 | } |
2381 | 0 | if (!status.ok()) { |
2382 | 0 | LOG_WARNING("failed to migrate storage medium") |
2383 | 0 | .tag("signature", req.signature) |
2384 | 0 | .tag("tablet_id", storage_medium_migrate_req.tablet_id) |
2385 | 0 | .error(status); |
2386 | 0 | } else { |
2387 | 0 | LOG_INFO("successfully migrate storage medium") |
2388 | 0 | .tag("signature", req.signature) |
2389 | 0 | .tag("tablet_id", storage_medium_migrate_req.tablet_id); |
2390 | 0 | } |
2391 | |
|
2392 | 0 | TFinishTaskRequest finish_task_request; |
2393 | 0 | finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
2394 | 0 | finish_task_request.__set_task_type(req.task_type); |
2395 | 0 | finish_task_request.__set_signature(req.signature); |
2396 | 0 | finish_task_request.__set_task_status(status.to_thrift()); |
2397 | |
|
2398 | 0 | finish_task(finish_task_request); |
2399 | 0 | remove_task_info(req.task_type, req.signature); |
2400 | 0 | } |
2401 | | |
2402 | 9.32k | void calc_delete_bitmap_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) { |
2403 | 9.32k | std::vector<TTabletId> error_tablet_ids; |
2404 | 9.32k | std::vector<TTabletId> succ_tablet_ids; |
2405 | 9.32k | Status status; |
2406 | 9.32k | error_tablet_ids.clear(); |
2407 | 9.32k | const auto& calc_delete_bitmap_req = req.calc_delete_bitmap_req; |
2408 | 9.32k | CloudEngineCalcDeleteBitmapTask engine_task(engine, calc_delete_bitmap_req, &error_tablet_ids, |
2409 | 9.32k | &succ_tablet_ids); |
2410 | 9.32k | SCOPED_ATTACH_TASK(engine_task.mem_tracker()); |
2411 | 9.32k | if (req.signature != calc_delete_bitmap_req.transaction_id) { |
2412 | | // transaction_id may not be the same as req.signature, so add a log here |
2413 | 0 | LOG_INFO("begin to execute calc delete bitmap task") |
2414 | 0 | .tag("signature", req.signature) |
2415 | 0 | .tag("transaction_id", calc_delete_bitmap_req.transaction_id); |
2416 | 0 | } |
2417 | 9.32k | status = engine_task.execute(); |
2418 | | |
2419 | 9.32k | TFinishTaskRequest finish_task_request; |
2420 | 9.32k | if (!status) { |
2421 | 0 | DorisMetrics::instance()->publish_task_failed_total->increment(1); |
2422 | 0 | LOG_WARNING("failed to calculate delete bitmap") |
2423 | 0 | .tag("signature", req.signature) |
2424 | 0 | .tag("transaction_id", calc_delete_bitmap_req.transaction_id) |
2425 | 0 | .tag("error_tablets_num", error_tablet_ids.size()) |
2426 | 0 | .error(status); |
2427 | 0 | } |
2428 | | |
2429 | 9.32k | status.to_thrift(&finish_task_request.task_status); |
2430 | 9.32k | finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
2431 | 9.32k | finish_task_request.__set_task_type(req.task_type); |
2432 | 9.32k | finish_task_request.__set_signature(req.signature); |
2433 | 9.32k | finish_task_request.__set_report_version(s_report_version); |
2434 | 9.32k | finish_task_request.__set_error_tablet_ids(error_tablet_ids); |
2435 | 9.32k | finish_task_request.__set_resp_partitions(calc_delete_bitmap_req.partitions); |
2436 | | |
2437 | 9.32k | finish_task(finish_task_request); |
2438 | 9.32k | remove_task_info(req.task_type, req.signature); |
2439 | 9.32k | } |
2440 | | |
2441 | | void make_cloud_committed_rs_visible_callback(CloudStorageEngine& engine, |
2442 | 26.5k | const TAgentTaskRequest& req) { |
2443 | 26.5k | if (!config::enable_cloud_make_rs_visible_on_be) { |
2444 | 0 | return; |
2445 | 0 | } |
2446 | 26.5k | LOG(INFO) << "begin to make cloud tmp rs visible, txn_id=" |
2447 | 26.5k | << req.make_cloud_tmp_rs_visible_req.txn_id |
2448 | 26.5k | << ", tablet_count=" << req.make_cloud_tmp_rs_visible_req.tablet_ids.size(); |
2449 | | |
2450 | 26.5k | const auto& make_visible_req = req.make_cloud_tmp_rs_visible_req; |
2451 | 26.5k | auto& tablet_mgr = engine.tablet_mgr(); |
2452 | | |
2453 | 26.5k | int64_t txn_id = make_visible_req.txn_id; |
2454 | 26.5k | int64_t version_update_time_ms = make_visible_req.__isset.version_update_time_ms |
2455 | 26.5k | ? make_visible_req.version_update_time_ms |
2456 | 26.5k | : 0; |
2457 | | |
2458 | | // Process each tablet involved in this transaction on this BE |
2459 | 180k | for (int64_t tablet_id : make_visible_req.tablet_ids) { |
2460 | 180k | auto tablet_result = |
2461 | 180k | tablet_mgr.get_tablet(tablet_id, /* warmup_data */ false, |
2462 | 180k | /* sync_delete_bitmap */ false, |
2463 | 180k | /* sync_stats */ nullptr, /* force_use_only_cached */ true, |
2464 | 180k | /* cache_on_miss */ false); |
2465 | 180k | if (!tablet_result.has_value()) { |
2466 | 0 | continue; |
2467 | 0 | } |
2468 | 180k | auto cloud_tablet = tablet_result.value(); |
2469 | | |
2470 | 180k | int64_t partition_id = cloud_tablet->partition_id(); |
2471 | 180k | auto version_iter = make_visible_req.partition_version_map.find(partition_id); |
2472 | 180k | if (version_iter == make_visible_req.partition_version_map.end()) { |
2473 | 0 | continue; |
2474 | 0 | } |
2475 | 180k | int64_t visible_version = version_iter->second; |
2476 | 180k | DBUG_EXECUTE_IF("make_cloud_committed_rs_visible_callback.block", { |
2477 | 180k | auto target_tablet_id = dp->param<int64_t>("tablet_id", -1); |
2478 | 180k | auto target_table_id = dp->param<int64_t>("table_id", -1); |
2479 | 180k | auto version = dp->param<int64_t>("version", -1); |
2480 | 180k | if ((target_tablet_id == tablet_id || target_table_id == cloud_tablet->table_id()) && |
2481 | 180k | version == visible_version) { |
2482 | 180k | DBUG_BLOCK |
2483 | 180k | } |
2484 | 180k | }); |
2485 | 180k | cloud_tablet->try_make_committed_rs_visible(txn_id, visible_version, |
2486 | 180k | version_update_time_ms); |
2487 | 180k | } |
2488 | 26.5k | LOG(INFO) << "make cloud tmp rs visible finished, txn_id=" << txn_id |
2489 | 26.5k | << ", processed_tablets=" << make_visible_req.tablet_ids.size(); |
2490 | 26.5k | } |
2491 | | |
2492 | 0 | void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
2493 | 0 | LOG(INFO) << "clean trash start"; |
2494 | 0 | DBUG_EXECUTE_IF("clean_trash_callback_sleep", { sleep(100); }) |
2495 | 0 | static_cast<void>(engine.start_trash_sweep(nullptr, true)); |
2496 | 0 | static_cast<void>(engine.notify_listener("REPORT_DISK_STATE")); |
2497 | 0 | LOG(INFO) << "clean trash finish"; |
2498 | 0 | } |
2499 | | |
2500 | 4 | void clean_udf_cache_callback(const TAgentTaskRequest& req) { |
2501 | 4 | const auto& clean_req = req.clean_udf_cache_req; |
2502 | | |
2503 | 4 | if (doris::config::enable_java_support) { |
2504 | 4 | static_cast<void>(Jni::Util::clean_udf_class_load_cache(clean_req.function_signature)); |
2505 | 4 | } |
2506 | | |
2507 | 4 | if (clean_req.__isset.function_id && clean_req.function_id > 0) { |
2508 | 0 | UserFunctionCache::instance()->drop_function_cache(clean_req.function_id); |
2509 | 0 | } |
2510 | | |
2511 | 4 | LOG(INFO) << "clean udf cache finish: function_signature=" << clean_req.function_signature; |
2512 | 4 | } |
2513 | | |
2514 | 1.33k | void report_index_policy_callback(const ClusterInfo* cluster_info) { |
2515 | 1.33k | TReportRequest request; |
2516 | 1.33k | auto& index_policy_list = request.index_policy; |
2517 | 1.33k | const auto& policys = doris::ExecEnv::GetInstance()->index_policy_mgr()->get_index_policys(); |
2518 | 24.1k | for (const auto& policy : policys) { |
2519 | 24.1k | index_policy_list.emplace_back(policy.second); |
2520 | 24.1k | } |
2521 | 1.33k | request.__isset.index_policy = true; |
2522 | 1.33k | request.__set_backend(BackendOptions::get_local_backend()); |
2523 | 1.33k | bool succ = handle_report(request, cluster_info, "index_policy"); |
2524 | 1.33k | report_index_policy_total << 1; |
2525 | 1.33k | if (!succ) [[unlikely]] { |
2526 | 0 | report_index_policy_failed << 1; |
2527 | 0 | } |
2528 | 1.33k | } |
2529 | | |
2530 | | #include "common/compile_check_end.h" |
2531 | | } // namespace doris |