be/src/cloud/cloud_compaction_action.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "cloud/cloud_compaction_action.h" |
19 | | |
20 | | // IWYU pragma: no_include <bits/chrono.h> |
21 | | #include <chrono> // IWYU pragma: keep |
22 | | #include <exception> |
23 | | #include <future> |
24 | | #include <memory> |
25 | | #include <mutex> |
26 | | #include <sstream> |
27 | | #include <string> |
28 | | #include <thread> |
29 | | #include <utility> |
30 | | |
31 | | #include "absl/strings/substitute.h" |
32 | | #include "cloud/cloud_base_compaction.h" |
33 | | #include "cloud/cloud_compaction_action.h" |
34 | | #include "cloud/cloud_cumulative_compaction.h" |
35 | | #include "cloud/cloud_full_compaction.h" |
36 | | #include "cloud/cloud_tablet.h" |
37 | | #include "cloud/cloud_tablet_mgr.h" |
38 | | #include "common/logging.h" |
39 | | #include "common/metrics/doris_metrics.h" |
40 | | #include "common/status.h" |
41 | | #include "service/http/http_channel.h" |
42 | | #include "service/http/http_headers.h" |
43 | | #include "service/http/http_request.h" |
44 | | #include "service/http/http_status.h" |
45 | | #include "storage/compaction/base_compaction.h" |
46 | | #include "storage/compaction/cumulative_compaction.h" |
47 | | #include "storage/compaction/cumulative_compaction_policy.h" |
48 | | #include "storage/compaction/cumulative_compaction_time_series_policy.h" |
49 | | #include "storage/compaction/full_compaction.h" |
50 | | #include "storage/compaction_task_tracker.h" |
51 | | #include "storage/olap_define.h" |
52 | | #include "storage/storage_engine.h" |
53 | | #include "storage/tablet/tablet_manager.h" |
54 | | #include "util/stopwatch.hpp" |
55 | | |
56 | | namespace doris { |
57 | | using namespace ErrorCode; |
58 | | |
59 | | namespace {} |
60 | | |
61 | | const static std::string HEADER_JSON = "application/json"; |
62 | | |
63 | | CloudCompactionAction::CloudCompactionAction(CompactionActionType ctype, ExecEnv* exec_env, |
64 | | CloudStorageEngine& engine, TPrivilegeHier::type hier, |
65 | | TPrivilegeType::type ptype) |
66 | 3 | : HttpHandlerWithAuth(exec_env, hier, ptype), _engine(engine), _compaction_type(ctype) {} |
67 | | |
68 | | /// check param and fetch tablet_id & table_id from req |
69 | 596 | static Status _check_param(HttpRequest* req, uint64_t* tablet_id, uint64_t* table_id) { |
70 | | // req tablet id and table id, we have to set only one of them. |
71 | 596 | std::string req_tablet_id = req->param(TABLET_ID_KEY); |
72 | 596 | std::string req_table_id = req->param(TABLE_ID_KEY); |
73 | 596 | if (req_tablet_id == "") { |
74 | 100 | if (req_table_id == "") { |
75 | | // both tablet id and table id are empty, return error. |
76 | 0 | return Status::InternalError( |
77 | 0 | "tablet id and table id can not be empty at the same time!"); |
78 | 100 | } else { |
79 | 100 | try { |
80 | 100 | *table_id = std::stoull(req_table_id); |
81 | 100 | } catch (const std::exception& e) { |
82 | 0 | return Status::InternalError("convert table_id failed, {}", e.what()); |
83 | 0 | } |
84 | 100 | return Status::OK(); |
85 | 100 | } |
86 | 496 | } else { |
87 | 496 | if (req_table_id == "") { |
88 | 496 | try { |
89 | 496 | *tablet_id = std::stoull(req_tablet_id); |
90 | 496 | } catch (const std::exception& e) { |
91 | 0 | return Status::InternalError("convert tablet_id failed, {}", e.what()); |
92 | 0 | } |
93 | 496 | return Status::OK(); |
94 | 496 | } else { |
95 | | // both tablet id and table id are not empty, return err. |
96 | 0 | return Status::InternalError("tablet id and table id can not be set at the same time!"); |
97 | 0 | } |
98 | 496 | } |
99 | 596 | } |
100 | | |
101 | | /// retrieve specific id from req |
102 | 3.18k | static Status _check_param(HttpRequest* req, uint64_t* id_param, const std::string param_name) { |
103 | 3.18k | const auto& req_id_param = req->param(param_name); |
104 | 3.18k | if (!req_id_param.empty()) { |
105 | 3.18k | try { |
106 | 3.18k | *id_param = std::stoull(req_id_param); |
107 | 3.18k | } catch (const std::exception& e) { |
108 | 0 | return Status::InternalError("convert {} failed, {}", param_name, e.what()); |
109 | 0 | } |
110 | 3.18k | } |
111 | | |
112 | 3.18k | return Status::OK(); |
113 | 3.18k | } |
114 | | |
115 | | // for viewing the compaction status |
116 | 2.61k | Status CloudCompactionAction::_handle_show_compaction(HttpRequest* req, std::string* json_result) { |
117 | 2.61k | uint64_t tablet_id = 0; |
118 | 2.61k | RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id, TABLET_ID_KEY), |
119 | 2.61k | "check param failed"); |
120 | 2.61k | if (tablet_id == 0) { |
121 | 0 | return Status::InternalError("check param failed: missing tablet_id"); |
122 | 0 | } |
123 | | |
124 | 2.61k | LOG(INFO) << "begin to handle show compaction, tablet id: " << tablet_id; |
125 | | |
126 | | //TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id); |
127 | 2.61k | CloudTabletSPtr tablet = DORIS_TRY(_engine.tablet_mgr().get_tablet(tablet_id)); |
128 | 2.61k | if (tablet == nullptr) { |
129 | 0 | return Status::NotFound("Tablet not found. tablet_id={}", tablet_id); |
130 | 0 | } |
131 | | |
132 | 2.61k | tablet->get_compaction_status(json_result); |
133 | 2.61k | LOG(INFO) << "finished to handle show compaction, tablet id: " << tablet_id; |
134 | 2.61k | return Status::OK(); |
135 | 2.61k | } |
136 | | |
137 | 596 | Status CloudCompactionAction::_handle_run_compaction(HttpRequest* req, std::string* json_result) { |
138 | | // 1. param check |
139 | | // check req_tablet_id or req_table_id is not empty and can not be set together. |
140 | 596 | uint64_t tablet_id = 0; |
141 | 596 | uint64_t table_id = 0; |
142 | 596 | RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id, &table_id), "check param failed"); |
143 | 596 | LOG(INFO) << "begin to handle run compaction, tablet id: " << tablet_id |
144 | 596 | << " table id: " << table_id; |
145 | | |
146 | | // check compaction_type equals 'base' or 'cumulative' |
147 | 596 | auto& compaction_type = req->param(PARAM_COMPACTION_TYPE); |
148 | 596 | if (compaction_type != PARAM_COMPACTION_BASE && |
149 | 596 | compaction_type != PARAM_COMPACTION_CUMULATIVE && |
150 | 596 | compaction_type != PARAM_COMPACTION_FULL) { |
151 | 0 | return Status::NotSupported("The compaction type '{}' is not supported", compaction_type); |
152 | 0 | } |
153 | 596 | bool sync_delete_bitmap = compaction_type != PARAM_COMPACTION_FULL; |
154 | 596 | CloudTabletSPtr tablet = |
155 | 596 | DORIS_TRY(_engine.tablet_mgr().get_tablet(tablet_id, false, sync_delete_bitmap)); |
156 | 496 | if (tablet == nullptr) { |
157 | 0 | return Status::NotFound("Tablet not found. tablet_id={}", tablet_id); |
158 | 0 | } |
159 | | |
160 | 496 | if (compaction_type == PARAM_COMPACTION_BASE) { |
161 | 1 | tablet->set_last_base_compaction_schedule_time(UnixMillis()); |
162 | 495 | } else if (compaction_type == PARAM_COMPACTION_CUMULATIVE) { |
163 | 390 | tablet->set_last_cumu_compaction_schedule_time(UnixMillis()); |
164 | 390 | } else if (compaction_type == PARAM_COMPACTION_FULL) { |
165 | 105 | tablet->set_last_full_compaction_schedule_time(UnixMillis()); |
166 | 105 | } |
167 | | |
168 | 496 | LOG(INFO) << "manual submit compaction task, tablet id: " << tablet_id |
169 | 496 | << " table id: " << table_id; |
170 | | // 3. submit compaction task (trigger_method=1 for MANUAL) |
171 | 496 | RETURN_IF_ERROR(_engine.submit_compaction_task( |
172 | 496 | tablet, |
173 | 496 | compaction_type == PARAM_COMPACTION_BASE ? CompactionType::BASE_COMPACTION |
174 | 496 | : compaction_type == PARAM_COMPACTION_CUMULATIVE ? CompactionType::CUMULATIVE_COMPACTION |
175 | 496 | : CompactionType::FULL_COMPACTION, |
176 | 496 | /*trigger_method=*/1)); |
177 | | |
178 | 496 | LOG(INFO) << "Manual compaction task is successfully triggered, tablet id: " << tablet_id |
179 | 456 | << " table id: " << table_id; |
180 | 456 | *json_result = |
181 | 456 | R"({"status": "Success", "msg": "compaction task is successfully triggered. Table id: )" + |
182 | 456 | std::to_string(table_id) + ". Tablet id: " + std::to_string(tablet_id) + "\"}"; |
183 | 456 | return Status::OK(); |
184 | 496 | } |
185 | | |
186 | | Status CloudCompactionAction::_handle_run_status_compaction(HttpRequest* req, |
187 | 573 | std::string* json_result) { |
188 | 573 | uint64_t tablet_id = 0; |
189 | 573 | RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id, TABLET_ID_KEY), |
190 | 573 | "check param failed"); |
191 | 573 | LOG(INFO) << "begin to handle run status compaction, tablet id: " << tablet_id; |
192 | | |
193 | 573 | if (tablet_id == 0) { |
194 | | // overall compaction status |
195 | 0 | RETURN_IF_ERROR(_engine.get_compaction_status_json(json_result)); |
196 | 573 | } else { |
197 | 573 | std::string json_template = R"({ |
198 | 573 | "status" : "Success", |
199 | 573 | "run_status" : $0, |
200 | 573 | "msg" : "$1", |
201 | 573 | "tablet_id" : $2, |
202 | 573 | "compact_type" : "$3" |
203 | 573 | })"; |
204 | | |
205 | 573 | std::string msg = "compaction task for this tablet is not running"; |
206 | 573 | std::string compaction_type; |
207 | 573 | bool run_status = false; |
208 | | |
209 | 573 | if (_engine.has_cumu_compaction(tablet_id)) { |
210 | 49 | msg = "compaction task for this tablet is running"; |
211 | 49 | compaction_type = "cumulative"; |
212 | 49 | run_status = true; |
213 | 49 | *json_result = |
214 | 49 | absl::Substitute(json_template, run_status, msg, tablet_id, compaction_type); |
215 | 49 | return Status::OK(); |
216 | 49 | } |
217 | | |
218 | 524 | if (_engine.has_base_compaction(tablet_id)) { |
219 | 0 | msg = "compaction task for this tablet is running"; |
220 | 0 | compaction_type = "base"; |
221 | 0 | run_status = true; |
222 | 0 | *json_result = |
223 | 0 | absl::Substitute(json_template, run_status, msg, tablet_id, compaction_type); |
224 | 0 | return Status::OK(); |
225 | 0 | } |
226 | | |
227 | 524 | if (_engine.has_full_compaction(tablet_id)) { |
228 | 39 | msg = "compaction task for this tablet is running"; |
229 | 39 | compaction_type = "full"; |
230 | 39 | run_status = true; |
231 | 39 | *json_result = |
232 | 39 | absl::Substitute(json_template, run_status, msg, tablet_id, compaction_type); |
233 | 39 | return Status::OK(); |
234 | 39 | } |
235 | | // not running any compaction |
236 | 485 | *json_result = absl::Substitute(json_template, run_status, msg, tablet_id, compaction_type); |
237 | 485 | } |
238 | 573 | LOG(INFO) << "finished to handle run status compaction, tablet id: " << tablet_id; |
239 | 485 | return Status::OK(); |
240 | 573 | } |
241 | | |
242 | 3.78k | void CloudCompactionAction::handle(HttpRequest* req) { |
243 | 3.78k | req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str()); |
244 | | |
245 | 3.78k | if (_compaction_type == CompactionActionType::SHOW_INFO) { |
246 | 2.61k | std::string json_result; |
247 | 2.61k | Status st = _handle_show_compaction(req, &json_result); |
248 | 2.61k | if (!st.ok()) { |
249 | 0 | HttpChannel::send_reply(req, HttpStatus::OK, st.to_json()); |
250 | 2.61k | } else { |
251 | 2.61k | HttpChannel::send_reply(req, HttpStatus::OK, json_result); |
252 | 2.61k | } |
253 | 2.61k | } else if (_compaction_type == CompactionActionType::RUN_COMPACTION) { |
254 | 596 | std::string json_result; |
255 | 596 | Status st = _handle_run_compaction(req, &json_result); |
256 | 596 | if (!st.ok()) { |
257 | 140 | HttpChannel::send_reply(req, HttpStatus::OK, st.to_json()); |
258 | 456 | } else { |
259 | 456 | HttpChannel::send_reply(req, HttpStatus::OK, json_result); |
260 | 456 | } |
261 | 596 | } else { |
262 | 573 | std::string json_result; |
263 | 573 | Status st = _handle_run_status_compaction(req, &json_result); |
264 | 573 | if (!st.ok()) { |
265 | 0 | HttpChannel::send_reply(req, HttpStatus::OK, st.to_json()); |
266 | 573 | } else { |
267 | 573 | HttpChannel::send_reply(req, HttpStatus::OK, json_result); |
268 | 573 | } |
269 | 573 | } |
270 | 3.78k | } |
271 | | |
272 | | } // end namespace doris |