be/src/service/http/action/compaction_action.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "service/http/action/compaction_action.h" |
19 | | |
20 | | // IWYU pragma: no_include <bits/chrono.h> |
21 | | #include <chrono> // IWYU pragma: keep |
22 | | #include <exception> |
23 | | #include <future> |
24 | | #include <memory> |
25 | | #include <mutex> |
26 | | #include <sstream> |
27 | | #include <string> |
28 | | #include <thread> |
29 | | #include <utility> |
30 | | |
31 | | #include "absl/strings/substitute.h" |
32 | | #include "common/logging.h" |
33 | | #include "common/metrics/doris_metrics.h" |
34 | | #include "common/status.h" |
35 | | #include "service/http/http_channel.h" |
36 | | #include "service/http/http_headers.h" |
37 | | #include "service/http/http_request.h" |
38 | | #include "service/http/http_status.h" |
39 | | #include "storage/compaction/base_compaction.h" |
40 | | #include "storage/compaction/cumulative_compaction.h" |
41 | | #include "storage/compaction/cumulative_compaction_policy.h" |
42 | | #include "storage/compaction/cumulative_compaction_time_series_policy.h" |
43 | | #include "storage/compaction/full_compaction.h" |
44 | | #include "storage/compaction/single_replica_compaction.h" |
45 | | #include "storage/olap_define.h" |
46 | | #include "storage/storage_engine.h" |
47 | | #include "storage/tablet/tablet_manager.h" |
48 | | #include "util/stopwatch.hpp" |
49 | | |
50 | | namespace doris { |
51 | | using namespace ErrorCode; |
52 | | |
53 | | namespace { |
54 | | |
55 | | constexpr std::string_view HEADER_JSON = "application/json"; |
56 | | |
57 | | } // namespace |
58 | | |
59 | | CompactionAction::CompactionAction(CompactionActionType ctype, ExecEnv* exec_env, |
60 | | StorageEngine& engine, TPrivilegeHier::type hier, |
61 | | TPrivilegeType::type ptype) |
62 | 18 | : HttpHandlerWithAuth(exec_env, hier, ptype), _engine(engine), _compaction_type(ctype) {} |
63 | | |
64 | | /// check param and fetch tablet_id & table_id from req |
65 | 0 | static Status _check_param(HttpRequest* req, uint64_t* tablet_id, uint64_t* table_id) { |
66 | | // req tablet id and table id, we have to set only one of them. |
67 | 0 | const auto& req_tablet_id = req->param(TABLET_ID_KEY); |
68 | 0 | const auto& req_table_id = req->param(TABLE_ID_KEY); |
69 | 0 | if (req_tablet_id.empty()) { |
70 | 0 | if (req_table_id.empty()) { |
71 | | // both tablet id and table id are empty, return error. |
72 | 0 | return Status::InternalError( |
73 | 0 | "tablet id and table id can not be empty at the same time!"); |
74 | 0 | } else { |
75 | 0 | try { |
76 | 0 | *table_id = std::stoull(req_table_id); |
77 | 0 | } catch (const std::exception& e) { |
78 | 0 | return Status::InternalError("convert table_id failed, {}", e.what()); |
79 | 0 | } |
80 | 0 | return Status::OK(); |
81 | 0 | } |
82 | 0 | } else if (req_table_id.empty()) { |
83 | 0 | try { |
84 | 0 | *tablet_id = std::stoull(req_tablet_id); |
85 | 0 | } catch (const std::exception& e) { |
86 | 0 | return Status::InternalError("convert tablet_id failed, {}", e.what()); |
87 | 0 | } |
88 | 0 | return Status::OK(); |
89 | 0 | } else { |
90 | | // both tablet id and table id are not empty, return err. |
91 | 0 | return Status::InternalError("tablet id and table id can not be set at the same time!"); |
92 | 0 | } |
93 | 0 | } |
94 | | |
95 | | /// retrieve specific id from req |
96 | 0 | static Status _check_param(HttpRequest* req, uint64_t* id_param, const std::string param_name) { |
97 | 0 | const auto& req_id_param = req->param(param_name); |
98 | 0 | if (!req_id_param.empty()) { |
99 | 0 | try { |
100 | 0 | *id_param = std::stoull(req_id_param); |
101 | 0 | } catch (const std::exception& e) { |
102 | 0 | return Status::InternalError("convert {} failed, {}", param_name, e.what()); |
103 | 0 | } |
104 | 0 | } |
105 | | |
106 | 0 | return Status::OK(); |
107 | 0 | } |
108 | | |
109 | | // for viewing the compaction status |
110 | 0 | Status CompactionAction::_handle_show_compaction(HttpRequest* req, std::string* json_result) { |
111 | 0 | uint64_t tablet_id = 0; |
112 | | // check & retrieve tablet_id from req if it contains |
113 | 0 | RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id, TABLET_ID_KEY), |
114 | 0 | "check param failed"); |
115 | 0 | if (tablet_id == 0) { |
116 | 0 | return Status::InternalError("check param failed: missing tablet_id"); |
117 | 0 | } |
118 | | |
119 | 0 | TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id); |
120 | 0 | if (tablet == nullptr) { |
121 | 0 | return Status::NotFound("Tablet not found. tablet_id={}", tablet_id); |
122 | 0 | } |
123 | | |
124 | 0 | tablet->get_compaction_status(json_result); |
125 | 0 | return Status::OK(); |
126 | 0 | } |
127 | | |
128 | 0 | Status CompactionAction::_handle_run_compaction(HttpRequest* req, std::string* json_result) { |
129 | | // 1. param check |
130 | | // check req_tablet_id or req_table_id is not empty and can not be set together. |
131 | 0 | uint64_t tablet_id = 0; |
132 | 0 | uint64_t table_id = 0; |
133 | 0 | RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id, &table_id), "check param failed"); |
134 | | |
135 | | // check compaction_type equals 'base' or 'cumulative' |
136 | 0 | std::string compaction_type = req->param(PARAM_COMPACTION_TYPE); |
137 | 0 | if (compaction_type != PARAM_COMPACTION_BASE && |
138 | 0 | compaction_type != PARAM_COMPACTION_CUMULATIVE && |
139 | 0 | compaction_type != PARAM_COMPACTION_FULL) { |
140 | 0 | return Status::NotSupported("The compaction type '{}' is not supported", compaction_type); |
141 | 0 | } |
142 | | |
143 | | // "remote" = "true" means tablet should do single replica compaction to fetch rowset from peer |
144 | 0 | bool fetch_from_remote = false; |
145 | 0 | std::string param_remote = req->param(PARAM_COMPACTION_REMOTE); |
146 | 0 | if (param_remote == "true") { |
147 | 0 | fetch_from_remote = true; |
148 | 0 | } else if (!param_remote.empty() && param_remote != "false") { |
149 | 0 | return Status::NotSupported("The remote = '{}' is not supported", param_remote); |
150 | 0 | } |
151 | | |
152 | 0 | if (tablet_id == 0 && table_id != 0) { |
153 | 0 | std::vector<TabletSharedPtr> tablet_vec = _engine.tablet_manager()->get_all_tablet( |
154 | 0 | [table_id](Tablet* tablet) -> bool { return tablet->get_table_id() == table_id; }); |
155 | 0 | for (const auto& tablet : tablet_vec) { |
156 | 0 | tablet->set_last_full_compaction_schedule_time(UnixMillis()); |
157 | 0 | RETURN_IF_ERROR( |
158 | 0 | _engine.submit_compaction_task(tablet, CompactionType::FULL_COMPACTION, false)); |
159 | 0 | } |
160 | 0 | } else { |
161 | | // 2. fetch the tablet by tablet_id |
162 | 0 | TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id); |
163 | 0 | if (tablet == nullptr) { |
164 | 0 | return Status::NotFound("Tablet not found. tablet_id={}", tablet_id); |
165 | 0 | } |
166 | | |
167 | 0 | if (fetch_from_remote && !tablet->should_fetch_from_peer()) { |
168 | 0 | return Status::NotSupported("tablet should do compaction locally"); |
169 | 0 | } |
170 | 0 | DBUG_EXECUTE_IF("CompactionAction._handle_run_compaction.submit_cumu_task", { |
171 | 0 | RETURN_IF_ERROR(_engine.submit_compaction_task( |
172 | 0 | tablet, CompactionType::CUMULATIVE_COMPACTION, false)); |
173 | 0 | LOG(INFO) << "Manual debug compaction task is successfully triggered"; |
174 | 0 | *json_result = |
175 | 0 | R"({"status": "Success", "msg": "debug compaction task is successfully triggered. Table id: )" + |
176 | 0 | std::to_string(table_id) + ". Tablet id: " + std::to_string(tablet_id) + "\"}"; |
177 | 0 | return Status::OK(); |
178 | 0 | }) |
179 | | |
180 | | // 3. execute compaction task |
181 | 0 | std::packaged_task<Status()> task([this, tablet, compaction_type, fetch_from_remote]() { |
182 | 0 | return _execute_compaction_callback(tablet, compaction_type, fetch_from_remote); |
183 | 0 | }); |
184 | 0 | std::future<Status> future_obj = task.get_future(); |
185 | 0 | std::thread(std::move(task)).detach(); |
186 | | |
187 | | // 更新schedule_time |
188 | 0 | if (compaction_type == PARAM_COMPACTION_BASE) { |
189 | 0 | tablet->set_last_base_compaction_schedule_time(UnixMillis()); |
190 | 0 | } else if (compaction_type == PARAM_COMPACTION_CUMULATIVE) { |
191 | 0 | tablet->set_last_cumu_compaction_schedule_time(UnixMillis()); |
192 | 0 | } else if (compaction_type == PARAM_COMPACTION_FULL) { |
193 | 0 | tablet->set_last_full_compaction_schedule_time(UnixMillis()); |
194 | 0 | } |
195 | | |
196 | | // 4. wait for result for 2 seconds by async |
197 | 0 | std::future_status status = future_obj.wait_for(std::chrono::seconds(2)); |
198 | 0 | if (status == std::future_status::ready) { |
199 | | // fetch execute result |
200 | 0 | Status olap_status = future_obj.get(); |
201 | 0 | if (!olap_status.ok()) { |
202 | 0 | return olap_status; |
203 | 0 | } |
204 | 0 | } else { |
205 | 0 | LOG(INFO) << "Manual compaction task is timeout for waiting " |
206 | 0 | << (status == std::future_status::timeout); |
207 | 0 | } |
208 | 0 | } |
209 | 0 | LOG(INFO) << "Manual compaction task is successfully triggered"; |
210 | 0 | *json_result = |
211 | 0 | R"({"status": "Success", "msg": "compaction task is successfully triggered. Table id: )" + |
212 | 0 | std::to_string(table_id) + ". Tablet id: " + std::to_string(tablet_id) + "\"}"; |
213 | 0 | return Status::OK(); |
214 | 0 | } |
215 | | |
216 | 0 | Status CompactionAction::_handle_run_status_compaction(HttpRequest* req, std::string* json_result) { |
217 | 0 | uint64_t tablet_id = 0; |
218 | | // check & retrieve tablet_id from req if it contains |
219 | 0 | RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id, TABLET_ID_KEY), |
220 | 0 | "check param failed"); |
221 | |
|
222 | 0 | if (tablet_id == 0) { |
223 | | // overall compaction status |
224 | 0 | _engine.get_compaction_status_json(json_result); |
225 | 0 | return Status::OK(); |
226 | 0 | } else { |
227 | | // fetch the tablet by tablet_id |
228 | 0 | TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id); |
229 | 0 | if (tablet == nullptr) { |
230 | 0 | LOG(WARNING) << "invalid argument.tablet_id:" << tablet_id; |
231 | 0 | return Status::InternalError("fail to get {}", tablet_id); |
232 | 0 | } |
233 | | |
234 | 0 | std::string json_template = R"({ |
235 | 0 | "status" : "Success", |
236 | 0 | "run_status" : $0, |
237 | 0 | "msg" : "$1", |
238 | 0 | "tablet_id" : $2, |
239 | 0 | "compact_type" : "$3" |
240 | 0 | })"; |
241 | |
|
242 | 0 | std::string msg = "compaction task for this tablet is not running"; |
243 | 0 | std::string compaction_type; |
244 | 0 | bool run_status = false; |
245 | |
|
246 | 0 | { |
247 | | // Full compaction holds both base compaction lock and cumu compaction lock. |
248 | | // So we can not judge if full compaction is running by check these two locks holding. |
249 | | // Here, we use a variable 'is_full_compaction_running' to check if full compaction is running. |
250 | 0 | if (tablet->is_full_compaction_running()) { |
251 | 0 | msg = "compaction task for this tablet is running"; |
252 | 0 | compaction_type = "full"; |
253 | 0 | run_status = true; |
254 | 0 | *json_result = absl::Substitute(json_template, run_status, msg, tablet_id, |
255 | 0 | compaction_type); |
256 | 0 | return Status::OK(); |
257 | 0 | } |
258 | 0 | } |
259 | | |
260 | 0 | { |
261 | | // use try lock to check this tablet is running cumulative compaction |
262 | 0 | std::unique_lock<std::mutex> lock_cumulative(tablet->get_cumulative_compaction_lock(), |
263 | 0 | std::try_to_lock); |
264 | 0 | if (!lock_cumulative.owns_lock()) { |
265 | 0 | msg = "compaction task for this tablet is running"; |
266 | 0 | compaction_type = "cumulative"; |
267 | 0 | run_status = true; |
268 | 0 | *json_result = absl::Substitute(json_template, run_status, msg, tablet_id, |
269 | 0 | compaction_type); |
270 | 0 | return Status::OK(); |
271 | 0 | } |
272 | 0 | } |
273 | | |
274 | 0 | { |
275 | | // use try lock to check this tablet is running base compaction |
276 | 0 | std::unique_lock<std::mutex> lock_base(tablet->get_base_compaction_lock(), |
277 | 0 | std::try_to_lock); |
278 | 0 | if (!lock_base.owns_lock()) { |
279 | 0 | msg = "compaction task for this tablet is running"; |
280 | 0 | compaction_type = "base"; |
281 | 0 | run_status = true; |
282 | 0 | *json_result = absl::Substitute(json_template, run_status, msg, tablet_id, |
283 | 0 | compaction_type); |
284 | 0 | return Status::OK(); |
285 | 0 | } |
286 | 0 | } |
287 | | // not running any compaction |
288 | 0 | *json_result = absl::Substitute(json_template, run_status, msg, tablet_id, compaction_type); |
289 | 0 | return Status::OK(); |
290 | 0 | } |
291 | 0 | } |
292 | | |
293 | | Status CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet, |
294 | | const std::string& compaction_type, |
295 | 0 | bool fetch_from_remote) { |
296 | 0 | MonotonicStopWatch timer; |
297 | 0 | timer.start(); |
298 | |
|
299 | 0 | std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy = |
300 | 0 | CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy( |
301 | 0 | tablet->tablet_meta()->compaction_policy()); |
302 | 0 | if (tablet->get_cumulative_compaction_policy() == nullptr) { |
303 | 0 | tablet->set_cumulative_compaction_policy(cumulative_compaction_policy); |
304 | 0 | } |
305 | 0 | Status res = Status::OK(); |
306 | 0 | auto do_compact = [](Compaction& compaction) { |
307 | 0 | RETURN_IF_ERROR(compaction.prepare_compact()); |
308 | 0 | return compaction.execute_compact(); |
309 | 0 | }; |
310 | 0 | if (compaction_type == PARAM_COMPACTION_BASE) { |
311 | 0 | BaseCompaction base_compaction(_engine, tablet); |
312 | 0 | res = do_compact(base_compaction); |
313 | 0 | if (!res) { |
314 | 0 | if (!res.is<BE_NO_SUITABLE_VERSION>()) { |
315 | 0 | DorisMetrics::instance()->base_compaction_request_failed->increment(1); |
316 | 0 | } |
317 | 0 | } |
318 | 0 | } else if (compaction_type == PARAM_COMPACTION_CUMULATIVE) { |
319 | 0 | if (fetch_from_remote) { |
320 | 0 | SingleReplicaCompaction single_compaction(_engine, tablet, |
321 | 0 | CompactionType::CUMULATIVE_COMPACTION); |
322 | 0 | res = do_compact(single_compaction); |
323 | 0 | if (!res) { |
324 | 0 | LOG(WARNING) << "failed to do single compaction. res=" << res |
325 | 0 | << ", table=" << tablet->tablet_id(); |
326 | 0 | } |
327 | 0 | } else { |
328 | 0 | CumulativeCompaction cumulative_compaction(_engine, tablet); |
329 | 0 | res = do_compact(cumulative_compaction); |
330 | 0 | if (!res) { |
331 | 0 | if (res.is<CUMULATIVE_NO_SUITABLE_VERSION>()) { |
332 | | // Ignore this error code. |
333 | 0 | VLOG_NOTICE |
334 | 0 | << "failed to init cumulative compaction due to no suitable version," |
335 | 0 | << "tablet=" << tablet->tablet_id(); |
336 | 0 | } else { |
337 | 0 | DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1); |
338 | 0 | LOG(WARNING) << "failed to do cumulative compaction. res=" << res |
339 | 0 | << ", table=" << tablet->tablet_id(); |
340 | 0 | } |
341 | 0 | } |
342 | 0 | } |
343 | 0 | } else if (compaction_type == PARAM_COMPACTION_FULL) { |
344 | 0 | FullCompaction full_compaction(_engine, tablet); |
345 | 0 | res = do_compact(full_compaction); |
346 | 0 | if (!res) { |
347 | 0 | if (res.is<FULL_NO_SUITABLE_VERSION>()) { |
348 | | // Ignore this error code. |
349 | 0 | VLOG_NOTICE << "failed to init full compaction due to no suitable version," |
350 | 0 | << "tablet=" << tablet->tablet_id(); |
351 | 0 | } else { |
352 | 0 | LOG(WARNING) << "failed to do full compaction. res=" << res |
353 | 0 | << ", table=" << tablet->tablet_id(); |
354 | 0 | } |
355 | 0 | } |
356 | 0 | } |
357 | 0 | timer.stop(); |
358 | 0 | LOG(INFO) << "Manual compaction task finish, status=" << res |
359 | 0 | << ", compaction_use_time=" << timer.elapsed_time() / 1000000 << "ms"; |
360 | 0 | return res; |
361 | 0 | } |
362 | | |
363 | 0 | void CompactionAction::handle(HttpRequest* req) { |
364 | 0 | req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.data()); |
365 | |
|
366 | 0 | if (_compaction_type == CompactionActionType::SHOW_INFO) { |
367 | 0 | std::string json_result; |
368 | 0 | Status st = _handle_show_compaction(req, &json_result); |
369 | 0 | if (!st.ok()) { |
370 | 0 | HttpChannel::send_reply(req, HttpStatus::OK, st.to_json()); |
371 | 0 | } else { |
372 | 0 | HttpChannel::send_reply(req, HttpStatus::OK, json_result); |
373 | 0 | } |
374 | 0 | } else if (_compaction_type == CompactionActionType::RUN_COMPACTION) { |
375 | 0 | std::string json_result; |
376 | 0 | Status st = _handle_run_compaction(req, &json_result); |
377 | 0 | if (!st.ok()) { |
378 | 0 | HttpChannel::send_reply(req, HttpStatus::OK, st.to_json()); |
379 | 0 | } else { |
380 | 0 | HttpChannel::send_reply(req, HttpStatus::OK, json_result); |
381 | 0 | } |
382 | 0 | } else { |
383 | 0 | std::string json_result; |
384 | 0 | Status st = _handle_run_status_compaction(req, &json_result); |
385 | 0 | if (!st.ok()) { |
386 | 0 | HttpChannel::send_reply(req, HttpStatus::OK, st.to_json()); |
387 | 0 | } else { |
388 | 0 | HttpChannel::send_reply(req, HttpStatus::OK, json_result); |
389 | 0 | } |
390 | 0 | } |
391 | 0 | } |
392 | | |
393 | | } // end namespace doris |