be/src/service/http/action/compaction_action.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "service/http/action/compaction_action.h" |
19 | | |
20 | | // IWYU pragma: no_include <bits/chrono.h> |
21 | | #include <chrono> // IWYU pragma: keep |
22 | | #include <exception> |
23 | | #include <future> |
24 | | #include <memory> |
25 | | #include <mutex> |
26 | | #include <sstream> |
27 | | #include <string> |
28 | | #include <thread> |
29 | | #include <utility> |
30 | | |
31 | | #include "absl/strings/substitute.h" |
32 | | #include "common/logging.h" |
33 | | #include "common/metrics/doris_metrics.h" |
34 | | #include "common/status.h" |
35 | | #include "service/http/http_channel.h" |
36 | | #include "service/http/http_headers.h" |
37 | | #include "service/http/http_request.h" |
38 | | #include "service/http/http_status.h" |
39 | | #include "storage/compaction/base_compaction.h" |
40 | | #include "storage/compaction/cumulative_compaction.h" |
41 | | #include "storage/compaction/cumulative_compaction_policy.h" |
42 | | #include "storage/compaction/cumulative_compaction_time_series_policy.h" |
43 | | #include "storage/compaction/single_replica_compaction.h" |
44 | | #include "storage/compaction_task_tracker.h" |
45 | | #include "storage/olap_define.h" |
46 | | #include "storage/storage_engine.h" |
47 | | #include "storage/tablet/tablet_manager.h" |
48 | | #include "util/stopwatch.hpp" |
49 | | |
50 | | namespace doris { |
51 | | using namespace ErrorCode; |
52 | | |
53 | | namespace { |
54 | | |
55 | | constexpr std::string_view HEADER_JSON = "application/json"; |
56 | | |
57 | | } // namespace |
58 | | |
59 | | CompactionAction::CompactionAction(CompactionActionType ctype, ExecEnv* exec_env, |
60 | | StorageEngine& engine, TPrivilegeHier::type hier, |
61 | | TPrivilegeType::type ptype) |
62 | 16 | : HttpHandlerWithAuth(exec_env, hier, ptype), _engine(engine), _compaction_type(ctype) {} |
63 | | |
64 | | /// check param and fetch tablet_id & table_id from req |
65 | 12 | static Status _check_param(HttpRequest* req, uint64_t* tablet_id, uint64_t* table_id) { |
66 | | // req tablet id and table id, we have to set only one of them. |
67 | 12 | const auto& req_tablet_id = req->param(TABLET_ID_KEY); |
68 | 12 | const auto& req_table_id = req->param(TABLE_ID_KEY); |
69 | 12 | if (req_tablet_id.empty()) { |
70 | 2 | if (req_table_id.empty()) { |
71 | | // both tablet id and table id are empty, return error. |
72 | 1 | return Status::InternalError( |
73 | 1 | "tablet id and table id can not be empty at the same time!"); |
74 | 1 | } else { |
75 | 1 | try { |
76 | 1 | *table_id = std::stoull(req_table_id); |
77 | 1 | } catch (const std::exception& e) { |
78 | 0 | return Status::InternalError("convert table_id failed, {}", e.what()); |
79 | 0 | } |
80 | 1 | return Status::OK(); |
81 | 1 | } |
82 | 10 | } else if (req_table_id.empty()) { |
83 | 9 | try { |
84 | 9 | *tablet_id = std::stoull(req_tablet_id); |
85 | 9 | } catch (const std::exception& e) { |
86 | 1 | return Status::InternalError("convert tablet_id failed, {}", e.what()); |
87 | 1 | } |
88 | 8 | return Status::OK(); |
89 | 9 | } else { |
90 | | // both tablet id and table id are not empty, return err. |
91 | 1 | return Status::InternalError("tablet id and table id can not be set at the same time!"); |
92 | 1 | } |
93 | 12 | } |
94 | | |
95 | | /// retrieve specific id from req |
96 | 4 | static Status _check_param(HttpRequest* req, uint64_t* id_param, const std::string param_name) { |
97 | 4 | const auto& req_id_param = req->param(param_name); |
98 | 4 | if (!req_id_param.empty()) { |
99 | 2 | try { |
100 | 2 | *id_param = std::stoull(req_id_param); |
101 | 2 | } catch (const std::exception& e) { |
102 | 0 | return Status::InternalError("convert {} failed, {}", param_name, e.what()); |
103 | 0 | } |
104 | 2 | } |
105 | | |
106 | 4 | return Status::OK(); |
107 | 4 | } |
108 | | |
109 | | // for viewing the compaction status |
110 | 2 | Status CompactionAction::_handle_show_compaction(HttpRequest* req, std::string* json_result) { |
111 | 2 | uint64_t tablet_id = 0; |
112 | | // check & retrieve tablet_id from req if it contains |
113 | 2 | RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id, TABLET_ID_KEY), |
114 | 2 | "check param failed"); |
115 | 2 | if (tablet_id == 0) { |
116 | 1 | return Status::InternalError("check param failed: missing tablet_id"); |
117 | 1 | } |
118 | | |
119 | 1 | TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id); |
120 | 1 | if (tablet == nullptr) { |
121 | 1 | return Status::NotFound("Tablet not found. tablet_id={}", tablet_id); |
122 | 1 | } |
123 | | |
124 | 0 | tablet->get_compaction_status(json_result); |
125 | 0 | return Status::OK(); |
126 | 1 | } |
127 | | |
128 | 12 | Status CompactionAction::_handle_run_compaction(HttpRequest* req, std::string* json_result) { |
129 | | // 1. param check |
130 | | // check req_tablet_id or req_table_id is not empty and can not be set together. |
131 | 12 | uint64_t tablet_id = 0; |
132 | 12 | uint64_t table_id = 0; |
133 | 12 | RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id, &table_id), "check param failed"); |
134 | | |
135 | | // check compaction_type equals 'base' or 'cumulative' |
136 | 9 | std::string compaction_type = req->param(PARAM_COMPACTION_TYPE); |
137 | 9 | if (compaction_type != PARAM_COMPACTION_BASE && |
138 | 9 | compaction_type != PARAM_COMPACTION_CUMULATIVE && |
139 | 9 | compaction_type != PARAM_COMPACTION_FULL) { |
140 | 1 | return Status::NotSupported("The compaction type '{}' is not supported", compaction_type); |
141 | 1 | } |
142 | | |
143 | | // "remote" = "true" means tablet should do single replica compaction to fetch rowset from peer |
144 | 8 | bool fetch_from_remote = false; |
145 | 8 | std::string param_remote = req->param(PARAM_COMPACTION_REMOTE); |
146 | 8 | if (param_remote == "true") { |
147 | 0 | fetch_from_remote = true; |
148 | 8 | } else if (!param_remote.empty() && param_remote != "false") { |
149 | 1 | return Status::NotSupported("The remote = '{}' is not supported", param_remote); |
150 | 1 | } |
151 | | |
152 | | // "force" = "true" means skip permit limiter when submitting full compaction to thread pool |
153 | 7 | bool force = false; |
154 | 7 | std::string param_force = req->param(PARAM_COMPACTION_FORCE); |
155 | 7 | if (param_force == "true") { |
156 | 2 | force = true; |
157 | 5 | } else if (!param_force.empty() && param_force != "false") { |
158 | 1 | return Status::NotSupported("The force = '{}' is not supported", param_force); |
159 | 1 | } |
160 | | |
161 | 6 | if (tablet_id == 0 && table_id != 0) { |
162 | 1 | std::vector<TabletSharedPtr> tablet_vec = _engine.tablet_manager()->get_all_tablet( |
163 | 1 | [table_id](Tablet* tablet) -> bool { return tablet->get_table_id() == table_id; }); |
164 | 1 | for (const auto& tablet : tablet_vec) { |
165 | 0 | tablet->set_last_full_compaction_schedule_time(UnixMillis()); |
166 | 0 | RETURN_IF_ERROR(_engine.submit_compaction_task(tablet, CompactionType::FULL_COMPACTION, |
167 | 0 | force, true, 1)); |
168 | 0 | } |
169 | 5 | } else { |
170 | | // 2. fetch the tablet by tablet_id |
171 | 5 | TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id); |
172 | 5 | if (tablet == nullptr) { |
173 | 5 | return Status::NotFound("Tablet not found. tablet_id={}", tablet_id); |
174 | 5 | } |
175 | | |
176 | 0 | if (fetch_from_remote && !tablet->should_fetch_from_peer()) { |
177 | 0 | return Status::NotSupported("tablet should do compaction locally"); |
178 | 0 | } |
179 | 0 | DBUG_EXECUTE_IF("CompactionAction._handle_run_compaction.submit_cumu_task", { |
180 | 0 | RETURN_IF_ERROR(_engine.submit_compaction_task( |
181 | 0 | tablet, CompactionType::CUMULATIVE_COMPACTION, false)); |
182 | 0 | LOG(INFO) << "Manual debug compaction task is successfully triggered"; |
183 | 0 | *json_result = |
184 | 0 | R"({"status": "Success", "msg": "debug compaction task is successfully triggered. Table id: )" + |
185 | 0 | std::to_string(table_id) + ". Tablet id: " + std::to_string(tablet_id) + "\"}"; |
186 | 0 | return Status::OK(); |
187 | 0 | }) |
188 | | |
189 | 0 | if (compaction_type == PARAM_COMPACTION_FULL) { |
190 | | // 3. submit full compaction task to thread pool |
191 | 0 | tablet->set_last_full_compaction_schedule_time(UnixMillis()); |
192 | 0 | RETURN_IF_ERROR(_engine.submit_compaction_task(tablet, CompactionType::FULL_COMPACTION, |
193 | 0 | force, true, 1)); |
194 | 0 | } else { |
195 | | // 3. execute base/cumulative compaction task in a detached thread |
196 | 0 | std::packaged_task<Status()> task([this, tablet, compaction_type, fetch_from_remote]() { |
197 | 0 | return _execute_compaction_callback(tablet, compaction_type, fetch_from_remote); |
198 | 0 | }); |
199 | 0 | std::future<Status> future_obj = task.get_future(); |
200 | 0 | std::thread(std::move(task)).detach(); |
201 | |
|
202 | 0 | if (compaction_type == PARAM_COMPACTION_BASE) { |
203 | 0 | tablet->set_last_base_compaction_schedule_time(UnixMillis()); |
204 | 0 | } else if (compaction_type == PARAM_COMPACTION_CUMULATIVE) { |
205 | 0 | tablet->set_last_cumu_compaction_schedule_time(UnixMillis()); |
206 | 0 | } |
207 | | |
208 | | // 4. wait for result for 2 seconds by async |
209 | 0 | std::future_status status = future_obj.wait_for(std::chrono::seconds(2)); |
210 | 0 | if (status == std::future_status::ready) { |
211 | 0 | Status olap_status = future_obj.get(); |
212 | 0 | if (!olap_status.ok()) { |
213 | 0 | return olap_status; |
214 | 0 | } |
215 | 0 | } else { |
216 | 0 | LOG(INFO) << "Manual compaction task is timeout for waiting " |
217 | 0 | << (status == std::future_status::timeout); |
218 | 0 | } |
219 | 0 | } |
220 | 0 | } |
221 | 6 | LOG(INFO) << "Manual compaction task is successfully triggered"; |
222 | 1 | *json_result = |
223 | 1 | R"({"status": "Success", "msg": "compaction task is successfully triggered. Table id: )" + |
224 | 1 | std::to_string(table_id) + ". Tablet id: " + std::to_string(tablet_id) + "\"}"; |
225 | 1 | return Status::OK(); |
226 | 6 | } |
227 | | |
228 | 2 | Status CompactionAction::_handle_run_status_compaction(HttpRequest* req, std::string* json_result) { |
229 | 2 | uint64_t tablet_id = 0; |
230 | | // check & retrieve tablet_id from req if it contains |
231 | 2 | RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id, TABLET_ID_KEY), |
232 | 2 | "check param failed"); |
233 | | |
234 | 2 | if (tablet_id == 0) { |
235 | | // overall compaction status |
236 | 1 | _engine.get_compaction_status_json(json_result); |
237 | 1 | return Status::OK(); |
238 | 1 | } else { |
239 | | // fetch the tablet by tablet_id |
240 | 1 | TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id); |
241 | 1 | if (tablet == nullptr) { |
242 | 1 | LOG(WARNING) << "invalid argument.tablet_id:" << tablet_id; |
243 | 1 | return Status::InternalError("fail to get {}", tablet_id); |
244 | 1 | } |
245 | | |
246 | 0 | std::string json_template = R"({ |
247 | 0 | "status" : "Success", |
248 | 0 | "run_status" : $0, |
249 | 0 | "msg" : "$1", |
250 | 0 | "tablet_id" : $2, |
251 | 0 | "compact_type" : "$3" |
252 | 0 | })"; |
253 | |
|
254 | 0 | std::string msg = "compaction task for this tablet is not running"; |
255 | 0 | std::string compaction_type; |
256 | 0 | bool run_status = false; |
257 | |
|
258 | 0 | { |
259 | | // Full compaction holds both base compaction lock and cumu compaction lock. |
260 | | // So we can not judge if full compaction is running by check these two locks holding. |
261 | | // Here, we use a variable 'is_full_compaction_running' to check if full compaction is running. |
262 | 0 | if (tablet->is_full_compaction_running()) { |
263 | 0 | msg = "compaction task for this tablet is running"; |
264 | 0 | compaction_type = "full"; |
265 | 0 | run_status = true; |
266 | 0 | *json_result = absl::Substitute(json_template, run_status, msg, tablet_id, |
267 | 0 | compaction_type); |
268 | 0 | return Status::OK(); |
269 | 0 | } |
270 | 0 | } |
271 | | |
272 | 0 | { |
273 | | // use try lock to check this tablet is running cumulative compaction |
274 | 0 | std::unique_lock<std::mutex> lock_cumulative(tablet->get_cumulative_compaction_lock(), |
275 | 0 | std::try_to_lock); |
276 | 0 | if (!lock_cumulative.owns_lock()) { |
277 | 0 | msg = "compaction task for this tablet is running"; |
278 | 0 | compaction_type = "cumulative"; |
279 | 0 | run_status = true; |
280 | 0 | *json_result = absl::Substitute(json_template, run_status, msg, tablet_id, |
281 | 0 | compaction_type); |
282 | 0 | return Status::OK(); |
283 | 0 | } |
284 | 0 | } |
285 | | |
286 | 0 | { |
287 | | // use try lock to check this tablet is running base compaction |
288 | 0 | std::unique_lock<std::mutex> lock_base(tablet->get_base_compaction_lock(), |
289 | 0 | std::try_to_lock); |
290 | 0 | if (!lock_base.owns_lock()) { |
291 | 0 | msg = "compaction task for this tablet is running"; |
292 | 0 | compaction_type = "base"; |
293 | 0 | run_status = true; |
294 | 0 | *json_result = absl::Substitute(json_template, run_status, msg, tablet_id, |
295 | 0 | compaction_type); |
296 | 0 | return Status::OK(); |
297 | 0 | } |
298 | 0 | } |
299 | | // not running any compaction |
300 | 0 | *json_result = absl::Substitute(json_template, run_status, msg, tablet_id, compaction_type); |
301 | 0 | return Status::OK(); |
302 | 0 | } |
303 | 2 | } |
304 | | |
305 | | Status CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet, |
306 | | const std::string& compaction_type, |
307 | 0 | bool fetch_from_remote) { |
308 | 0 | MonotonicStopWatch timer; |
309 | 0 | timer.start(); |
310 | |
|
311 | 0 | std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy = |
312 | 0 | CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy( |
313 | 0 | tablet->tablet_meta()->compaction_policy()); |
314 | 0 | if (tablet->get_cumulative_compaction_policy() == nullptr) { |
315 | 0 | tablet->set_cumulative_compaction_policy(cumulative_compaction_policy); |
316 | 0 | } |
317 | 0 | Status res = Status::OK(); |
318 | 0 | auto* tracker = CompactionTaskTracker::instance(); |
319 | 0 | auto do_compact = [&](Compaction& compaction, CompactionProfileType profile_type) { |
320 | 0 | RETURN_IF_ERROR(compaction.prepare_compact()); |
321 | | // Register task as RUNNING with tracker (manual trigger, direct execution path) |
322 | | // Use compaction.compaction_id() which was allocated in constructor. |
323 | 0 | int64_t compaction_id = compaction.compaction_id(); |
324 | 0 | { |
325 | 0 | CompactionTaskInfo info; |
326 | 0 | info.compaction_id = compaction_id; |
327 | 0 | info.tablet_id = tablet->tablet_id(); |
328 | 0 | info.table_id = tablet->get_table_id(); |
329 | 0 | info.partition_id = tablet->partition_id(); |
330 | 0 | info.compaction_type = profile_type; |
331 | 0 | info.status = CompactionTaskStatus::RUNNING; |
332 | 0 | info.trigger_method = TriggerMethod::MANUAL; |
333 | 0 | auto now_ms = std::chrono::duration_cast<std::chrono::milliseconds>( |
334 | 0 | std::chrono::system_clock::now().time_since_epoch()) |
335 | 0 | .count(); |
336 | 0 | info.scheduled_time_ms = now_ms; |
337 | 0 | info.start_time_ms = now_ms; |
338 | 0 | info.compaction_score = tablet->get_real_compaction_score(); |
339 | 0 | info.input_rowsets_count = compaction.input_rowsets_count(); |
340 | 0 | info.input_row_num = compaction.input_row_num_value(); |
341 | 0 | info.input_data_size = compaction.input_rowsets_data_size(); |
342 | 0 | info.input_index_size = compaction.input_rowsets_index_size(); |
343 | 0 | info.input_total_size = compaction.input_rowsets_total_size(); |
344 | 0 | info.input_segments_num = compaction.input_segments_num_value(); |
345 | 0 | info.input_version_range = compaction.input_version_range_str(); |
346 | 0 | info.is_vertical = compaction.is_vertical(); |
347 | 0 | info.backend_id = BackendOptions::get_backend_id(); |
348 | 0 | tracker->register_task(std::move(info)); |
349 | 0 | } |
350 | 0 | auto st = compaction.execute_compact(); |
351 | | // Idempotent cleanup: if execute returned early (e.g. TRY_LOCK_FAILED) |
352 | | // before submit_profile_record was called, task remains in active_tasks. |
353 | 0 | tracker->remove_task(compaction_id); |
354 | 0 | return st; |
355 | 0 | }; |
356 | 0 | if (compaction_type == PARAM_COMPACTION_BASE) { |
357 | 0 | BaseCompaction base_compaction(_engine, tablet); |
358 | 0 | res = do_compact(base_compaction, CompactionProfileType::BASE); |
359 | 0 | if (!res) { |
360 | 0 | if (!res.is<BE_NO_SUITABLE_VERSION>()) { |
361 | 0 | DorisMetrics::instance()->base_compaction_request_failed->increment(1); |
362 | 0 | } |
363 | 0 | } |
364 | 0 | } else if (compaction_type == PARAM_COMPACTION_CUMULATIVE) { |
365 | 0 | if (fetch_from_remote) { |
366 | 0 | SingleReplicaCompaction single_compaction(_engine, tablet, |
367 | 0 | CompactionType::CUMULATIVE_COMPACTION); |
368 | 0 | res = do_compact(single_compaction, CompactionProfileType::CUMULATIVE); |
369 | 0 | if (!res) { |
370 | 0 | LOG(WARNING) << "failed to do single compaction. res=" << res |
371 | 0 | << ", table=" << tablet->tablet_id(); |
372 | 0 | } |
373 | 0 | } else { |
374 | 0 | CumulativeCompaction cumulative_compaction(_engine, tablet); |
375 | 0 | res = do_compact(cumulative_compaction, CompactionProfileType::CUMULATIVE); |
376 | 0 | if (!res) { |
377 | 0 | if (res.is<CUMULATIVE_NO_SUITABLE_VERSION>()) { |
378 | | // Ignore this error code. |
379 | 0 | VLOG_NOTICE |
380 | 0 | << "failed to init cumulative compaction due to no suitable version," |
381 | 0 | << "tablet=" << tablet->tablet_id(); |
382 | 0 | } else { |
383 | 0 | DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1); |
384 | 0 | LOG(WARNING) << "failed to do cumulative compaction. res=" << res |
385 | 0 | << ", table=" << tablet->tablet_id(); |
386 | 0 | } |
387 | 0 | } |
388 | 0 | } |
389 | 0 | } |
390 | 0 | timer.stop(); |
391 | 0 | LOG(INFO) << "Manual compaction task finish, status=" << res |
392 | 0 | << ", compaction_use_time=" << timer.elapsed_time() / 1000000 << "ms"; |
393 | 0 | return res; |
394 | 0 | } |
395 | | |
396 | 0 | void CompactionAction::handle(HttpRequest* req) { |
397 | 0 | req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.data()); |
398 | |
|
399 | 0 | if (_compaction_type == CompactionActionType::SHOW_INFO) { |
400 | 0 | std::string json_result; |
401 | 0 | Status st = _handle_show_compaction(req, &json_result); |
402 | 0 | if (!st.ok()) { |
403 | 0 | HttpChannel::send_reply(req, HttpStatus::OK, st.to_json()); |
404 | 0 | } else { |
405 | 0 | HttpChannel::send_reply(req, HttpStatus::OK, json_result); |
406 | 0 | } |
407 | 0 | } else if (_compaction_type == CompactionActionType::RUN_COMPACTION) { |
408 | 0 | std::string json_result; |
409 | 0 | Status st = _handle_run_compaction(req, &json_result); |
410 | 0 | if (!st.ok()) { |
411 | 0 | HttpChannel::send_reply(req, HttpStatus::OK, st.to_json()); |
412 | 0 | } else { |
413 | 0 | HttpChannel::send_reply(req, HttpStatus::OK, json_result); |
414 | 0 | } |
415 | 0 | } else { |
416 | 0 | std::string json_result; |
417 | 0 | Status st = _handle_run_status_compaction(req, &json_result); |
418 | 0 | if (!st.ok()) { |
419 | 0 | HttpChannel::send_reply(req, HttpStatus::OK, st.to_json()); |
420 | 0 | } else { |
421 | 0 | HttpChannel::send_reply(req, HttpStatus::OK, json_result); |
422 | 0 | } |
423 | 0 | } |
424 | 0 | } |
425 | | |
426 | | } // end namespace doris |