be/src/cloud/cloud_compaction_action.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "cloud/cloud_compaction_action.h" |
19 | | |
20 | | // IWYU pragma: no_include <bits/chrono.h> |
21 | | #include <chrono> // IWYU pragma: keep |
22 | | #include <exception> |
23 | | #include <future> |
24 | | #include <memory> |
25 | | #include <mutex> |
26 | | #include <sstream> |
27 | | #include <string> |
28 | | #include <thread> |
29 | | #include <utility> |
30 | | |
31 | | #include "absl/strings/substitute.h" |
32 | | #include "cloud/cloud_base_compaction.h" |
33 | | #include "cloud/cloud_compaction_action.h" |
34 | | #include "cloud/cloud_cumulative_compaction.h" |
35 | | #include "cloud/cloud_full_compaction.h" |
36 | | #include "cloud/cloud_tablet.h" |
37 | | #include "cloud/cloud_tablet_mgr.h" |
38 | | #include "common/logging.h" |
39 | | #include "common/metrics/doris_metrics.h" |
40 | | #include "common/status.h" |
41 | | #include "service/http/http_channel.h" |
42 | | #include "service/http/http_headers.h" |
43 | | #include "service/http/http_request.h" |
44 | | #include "service/http/http_status.h" |
45 | | #include "storage/compaction/base_compaction.h" |
46 | | #include "storage/compaction/cumulative_compaction.h" |
47 | | #include "storage/compaction/cumulative_compaction_policy.h" |
48 | | #include "storage/compaction/cumulative_compaction_time_series_policy.h" |
49 | | #include "storage/compaction/full_compaction.h" |
50 | | #include "storage/olap_define.h" |
51 | | #include "storage/storage_engine.h" |
52 | | #include "storage/tablet/tablet_manager.h" |
53 | | #include "util/stopwatch.hpp" |
54 | | |
55 | | namespace doris { |
56 | | using namespace ErrorCode; |
57 | | |
58 | | namespace {} |
59 | | |
60 | | const static std::string HEADER_JSON = "application/json"; |
61 | | |
62 | | CloudCompactionAction::CloudCompactionAction(CompactionActionType ctype, ExecEnv* exec_env, |
63 | | CloudStorageEngine& engine, TPrivilegeHier::type hier, |
64 | | TPrivilegeType::type ptype) |
65 | 3 | : HttpHandlerWithAuth(exec_env, hier, ptype), _engine(engine), _compaction_type(ctype) {} |
66 | | |
67 | | /// check param and fetch tablet_id & table_id from req |
68 | 594 | static Status _check_param(HttpRequest* req, uint64_t* tablet_id, uint64_t* table_id) { |
69 | | // req tablet id and table id, we have to set only one of them. |
70 | 594 | std::string req_tablet_id = req->param(TABLET_ID_KEY); |
71 | 594 | std::string req_table_id = req->param(TABLE_ID_KEY); |
72 | 594 | if (req_tablet_id == "") { |
73 | 100 | if (req_table_id == "") { |
74 | | // both tablet id and table id are empty, return error. |
75 | 0 | return Status::InternalError( |
76 | 0 | "tablet id and table id can not be empty at the same time!"); |
77 | 100 | } else { |
78 | 100 | try { |
79 | 100 | *table_id = std::stoull(req_table_id); |
80 | 100 | } catch (const std::exception& e) { |
81 | 0 | return Status::InternalError("convert table_id failed, {}", e.what()); |
82 | 0 | } |
83 | 100 | return Status::OK(); |
84 | 100 | } |
85 | 494 | } else { |
86 | 494 | if (req_table_id == "") { |
87 | 494 | try { |
88 | 494 | *tablet_id = std::stoull(req_tablet_id); |
89 | 494 | } catch (const std::exception& e) { |
90 | 0 | return Status::InternalError("convert tablet_id failed, {}", e.what()); |
91 | 0 | } |
92 | 494 | return Status::OK(); |
93 | 494 | } else { |
94 | | // both tablet id and table id are not empty, return err. |
95 | 0 | return Status::InternalError("tablet id and table id can not be set at the same time!"); |
96 | 0 | } |
97 | 494 | } |
98 | 594 | } |
99 | | |
100 | | /// retrieve specific id from req |
101 | 2.99k | static Status _check_param(HttpRequest* req, uint64_t* id_param, const std::string param_name) { |
102 | 2.99k | const auto& req_id_param = req->param(param_name); |
103 | 2.99k | if (!req_id_param.empty()) { |
104 | 2.99k | try { |
105 | 2.99k | *id_param = std::stoull(req_id_param); |
106 | 2.99k | } catch (const std::exception& e) { |
107 | 0 | return Status::InternalError("convert {} failed, {}", param_name, e.what()); |
108 | 0 | } |
109 | 2.99k | } |
110 | | |
111 | 2.99k | return Status::OK(); |
112 | 2.99k | } |
113 | | |
114 | | // for viewing the compaction status |
115 | 2.51k | Status CloudCompactionAction::_handle_show_compaction(HttpRequest* req, std::string* json_result) { |
116 | 2.51k | uint64_t tablet_id = 0; |
117 | 2.51k | RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id, TABLET_ID_KEY), |
118 | 2.51k | "check param failed"); |
119 | 2.51k | if (tablet_id == 0) { |
120 | 0 | return Status::InternalError("check param failed: missing tablet_id"); |
121 | 0 | } |
122 | | |
123 | 2.51k | LOG(INFO) << "begin to handle show compaction, tablet id: " << tablet_id; |
124 | | |
125 | | //TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id); |
126 | 2.51k | CloudTabletSPtr tablet = DORIS_TRY(_engine.tablet_mgr().get_tablet(tablet_id)); |
127 | 2.51k | if (tablet == nullptr) { |
128 | 0 | return Status::NotFound("Tablet not found. tablet_id={}", tablet_id); |
129 | 0 | } |
130 | | |
131 | 2.51k | tablet->get_compaction_status(json_result); |
132 | 2.51k | LOG(INFO) << "finished to handle show compaction, tablet id: " << tablet_id; |
133 | 2.51k | return Status::OK(); |
134 | 2.51k | } |
135 | | |
136 | 594 | Status CloudCompactionAction::_handle_run_compaction(HttpRequest* req, std::string* json_result) { |
137 | | // 1. param check |
138 | | // check req_tablet_id or req_table_id is not empty and can not be set together. |
139 | 594 | uint64_t tablet_id = 0; |
140 | 594 | uint64_t table_id = 0; |
141 | 594 | RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id, &table_id), "check param failed"); |
142 | 594 | LOG(INFO) << "begin to handle run compaction, tablet id: " << tablet_id |
143 | 594 | << " table id: " << table_id; |
144 | | |
145 | | // check compaction_type equals 'base' or 'cumulative' |
146 | 594 | auto& compaction_type = req->param(PARAM_COMPACTION_TYPE); |
147 | 594 | if (compaction_type != PARAM_COMPACTION_BASE && |
148 | 594 | compaction_type != PARAM_COMPACTION_CUMULATIVE && |
149 | 594 | compaction_type != PARAM_COMPACTION_FULL) { |
150 | 0 | return Status::NotSupported("The compaction type '{}' is not supported", compaction_type); |
151 | 0 | } |
152 | 594 | bool sync_delete_bitmap = compaction_type != PARAM_COMPACTION_FULL; |
153 | 594 | CloudTabletSPtr tablet = |
154 | 594 | DORIS_TRY(_engine.tablet_mgr().get_tablet(tablet_id, false, sync_delete_bitmap)); |
155 | 494 | if (tablet == nullptr) { |
156 | 0 | return Status::NotFound("Tablet not found. tablet_id={}", tablet_id); |
157 | 0 | } |
158 | | |
159 | 494 | if (compaction_type == PARAM_COMPACTION_BASE) { |
160 | 1 | tablet->set_last_base_compaction_schedule_time(UnixMillis()); |
161 | 493 | } else if (compaction_type == PARAM_COMPACTION_CUMULATIVE) { |
162 | 388 | tablet->set_last_cumu_compaction_schedule_time(UnixMillis()); |
163 | 388 | } else if (compaction_type == PARAM_COMPACTION_FULL) { |
164 | 105 | tablet->set_last_full_compaction_schedule_time(UnixMillis()); |
165 | 105 | } |
166 | | |
167 | 494 | LOG(INFO) << "manual submit compaction task, tablet id: " << tablet_id |
168 | 494 | << " table id: " << table_id; |
169 | | // 3. submit compaction task |
170 | 494 | RETURN_IF_ERROR(_engine.submit_compaction_task( |
171 | 494 | tablet, compaction_type == PARAM_COMPACTION_BASE ? CompactionType::BASE_COMPACTION |
172 | 494 | : compaction_type == PARAM_COMPACTION_CUMULATIVE |
173 | 494 | ? CompactionType::CUMULATIVE_COMPACTION |
174 | 494 | : CompactionType::FULL_COMPACTION)); |
175 | | |
176 | 494 | LOG(INFO) << "Manual compaction task is successfully triggered, tablet id: " << tablet_id |
177 | 347 | << " table id: " << table_id; |
178 | 347 | *json_result = |
179 | 347 | R"({"status": "Success", "msg": "compaction task is successfully triggered. Table id: )" + |
180 | 347 | std::to_string(table_id) + ". Tablet id: " + std::to_string(tablet_id) + "\"}"; |
181 | 347 | return Status::OK(); |
182 | 494 | } |
183 | | |
184 | | Status CloudCompactionAction::_handle_run_status_compaction(HttpRequest* req, |
185 | 480 | std::string* json_result) { |
186 | 480 | uint64_t tablet_id = 0; |
187 | 480 | RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id, TABLET_ID_KEY), |
188 | 480 | "check param failed"); |
189 | 480 | LOG(INFO) << "begin to handle run status compaction, tablet id: " << tablet_id; |
190 | | |
191 | 480 | if (tablet_id == 0) { |
192 | | // overall compaction status |
193 | 0 | RETURN_IF_ERROR(_engine.get_compaction_status_json(json_result)); |
194 | 480 | } else { |
195 | 480 | std::string json_template = R"({ |
196 | 480 | "status" : "Success", |
197 | 480 | "run_status" : $0, |
198 | 480 | "msg" : "$1", |
199 | 480 | "tablet_id" : $2, |
200 | 480 | "compact_type" : "$3" |
201 | 480 | })"; |
202 | | |
203 | 480 | std::string msg = "compaction task for this tablet is not running"; |
204 | 480 | std::string compaction_type; |
205 | 480 | bool run_status = false; |
206 | | |
207 | 480 | if (_engine.has_cumu_compaction(tablet_id)) { |
208 | 55 | msg = "compaction task for this tablet is running"; |
209 | 55 | compaction_type = "cumulative"; |
210 | 55 | run_status = true; |
211 | 55 | *json_result = |
212 | 55 | absl::Substitute(json_template, run_status, msg, tablet_id, compaction_type); |
213 | 55 | return Status::OK(); |
214 | 55 | } |
215 | | |
216 | 425 | if (_engine.has_base_compaction(tablet_id)) { |
217 | 0 | msg = "compaction task for this tablet is running"; |
218 | 0 | compaction_type = "base"; |
219 | 0 | run_status = true; |
220 | 0 | *json_result = |
221 | 0 | absl::Substitute(json_template, run_status, msg, tablet_id, compaction_type); |
222 | 0 | return Status::OK(); |
223 | 0 | } |
224 | | |
225 | 425 | if (_engine.has_full_compaction(tablet_id)) { |
226 | 26 | msg = "compaction task for this tablet is running"; |
227 | 26 | compaction_type = "full"; |
228 | 26 | run_status = true; |
229 | 26 | *json_result = |
230 | 26 | absl::Substitute(json_template, run_status, msg, tablet_id, compaction_type); |
231 | 26 | return Status::OK(); |
232 | 26 | } |
233 | | // not running any compaction |
234 | 399 | *json_result = absl::Substitute(json_template, run_status, msg, tablet_id, compaction_type); |
235 | 399 | } |
236 | 480 | LOG(INFO) << "finished to handle run status compaction, tablet id: " << tablet_id; |
237 | 399 | return Status::OK(); |
238 | 480 | } |
239 | | |
240 | 3.58k | void CloudCompactionAction::handle(HttpRequest* req) { |
241 | 3.58k | req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str()); |
242 | | |
243 | 3.58k | if (_compaction_type == CompactionActionType::SHOW_INFO) { |
244 | 2.51k | std::string json_result; |
245 | 2.51k | Status st = _handle_show_compaction(req, &json_result); |
246 | 2.51k | if (!st.ok()) { |
247 | 0 | HttpChannel::send_reply(req, HttpStatus::OK, st.to_json()); |
248 | 2.51k | } else { |
249 | 2.51k | HttpChannel::send_reply(req, HttpStatus::OK, json_result); |
250 | 2.51k | } |
251 | 2.51k | } else if (_compaction_type == CompactionActionType::RUN_COMPACTION) { |
252 | 594 | std::string json_result; |
253 | 594 | Status st = _handle_run_compaction(req, &json_result); |
254 | 594 | if (!st.ok()) { |
255 | 247 | HttpChannel::send_reply(req, HttpStatus::OK, st.to_json()); |
256 | 347 | } else { |
257 | 347 | HttpChannel::send_reply(req, HttpStatus::OK, json_result); |
258 | 347 | } |
259 | 594 | } else { |
260 | 480 | std::string json_result; |
261 | 480 | Status st = _handle_run_status_compaction(req, &json_result); |
262 | 480 | if (!st.ok()) { |
263 | 0 | HttpChannel::send_reply(req, HttpStatus::OK, st.to_json()); |
264 | 480 | } else { |
265 | 480 | HttpChannel::send_reply(req, HttpStatus::OK, json_result); |
266 | 480 | } |
267 | 480 | } |
268 | 3.58k | } |
269 | | |
270 | | } // end namespace doris |