/root/doris/cloud/src/recycler/util.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "recycler/util.h" |
19 | | |
20 | | #include <glog/logging.h> |
21 | | |
22 | | #include <cstdint> |
23 | | |
24 | | #include "common/util.h" |
25 | | #include "meta-service/keys.h" |
26 | | #include "meta-service/meta_service_schema.h" |
27 | | #include "meta-service/txn_kv.h" |
28 | | #include "meta-service/txn_kv_error.h" |
29 | | |
30 | | namespace doris::cloud { |
31 | | namespace config { |
32 | | extern int32_t recycle_job_lease_expired_ms; |
33 | | } // namespace config |
34 | | |
35 | 10 | int get_all_instances(TxnKv* txn_kv, std::vector<InstanceInfoPB>& res) { |
36 | 10 | InstanceKeyInfo key0_info {""}; |
37 | 10 | InstanceKeyInfo key1_info {"\xff"}; // instance id are human readable strings |
38 | 10 | std::string key0; |
39 | 10 | std::string key1; |
40 | 10 | instance_key(key0_info, &key0); |
41 | 10 | instance_key(key1_info, &key1); |
42 | | |
43 | 10 | std::unique_ptr<Transaction> txn; |
44 | 10 | TxnErrorCode err = txn_kv->create_txn(&txn); |
45 | 10 | if (err != TxnErrorCode::TXN_OK) { |
46 | 0 | LOG(INFO) << "failed to init txn, err=" << err; |
47 | 0 | return -1; |
48 | 0 | } |
49 | | |
50 | 10 | std::unique_ptr<RangeGetIterator> it; |
51 | 10 | do { |
52 | 10 | TxnErrorCode err = txn->get(key0, key1, &it); |
53 | 10 | if (err != TxnErrorCode::TXN_OK) { |
54 | 0 | LOG(WARNING) << "failed to get instance, err=" << err; |
55 | 0 | return -1; |
56 | 0 | } |
57 | | |
58 | 100 | while (it->has_next()) { |
59 | 90 | auto [k, v] = it->next(); |
60 | 90 | if (!it->has_next()) key0 = k; |
61 | | |
62 | 90 | InstanceInfoPB instance_info; |
63 | 90 | if (!instance_info.ParseFromArray(v.data(), v.size())) { |
64 | 0 | LOG(WARNING) << "malformed instance info, key=" << hex(k); |
65 | 0 | return -1; |
66 | 0 | } |
67 | 90 | res.push_back(std::move(instance_info)); |
68 | 90 | } |
69 | 10 | key0.push_back('\x00'); // Update to next smallest key for iteration |
70 | 10 | } while (it->more()); |
71 | | |
72 | 10 | return 0; |
73 | 10 | } |
74 | | |
75 | | int prepare_instance_recycle_job(TxnKv* txn_kv, std::string_view key, |
76 | | const std::string& instance_id, const std::string& ip_port, |
77 | 60 | int64_t interval_ms) { |
78 | 60 | std::string val; |
79 | 60 | std::unique_ptr<Transaction> txn; |
80 | 60 | TxnErrorCode err = txn_kv->create_txn(&txn); |
81 | 60 | if (err != TxnErrorCode::TXN_OK) { |
82 | 0 | LOG(WARNING) << "failed to create txn"; |
83 | 0 | return -1; |
84 | 0 | } |
85 | 60 | err = txn->get(key, &val); |
86 | 60 | if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { |
87 | 0 | LOG(WARNING) << "failed to get kv, err=" << err << " key=" << hex(key); |
88 | 0 | return -1; |
89 | 0 | } |
90 | 60 | using namespace std::chrono; |
91 | 60 | auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
92 | 60 | JobRecyclePB job_info; |
93 | | |
94 | 60 | auto is_expired = [&]() { |
95 | 40 | if (!job_info.ParseFromString(val)) { |
96 | 0 | LOG(WARNING) << "failed to parse JobRecyclePB, key=" << hex(key); |
97 | | // if failed to parse, just recycle it. |
98 | 0 | return true; |
99 | 0 | } |
100 | 40 | DCHECK(job_info.instance_id() == instance_id); |
101 | 40 | if (job_info.status() == JobRecyclePB::BUSY) { |
102 | 40 | if (now < job_info.expiration_time_ms()) { |
103 | 40 | LOG(INFO) << "job is busy. host=" << job_info.ip_port() |
104 | 40 | << " expiration=" << job_info.expiration_time_ms() |
105 | 40 | << " instance=" << instance_id; |
106 | 40 | return false; |
107 | 40 | } |
108 | 40 | } |
109 | | |
110 | 0 | bool finish_expired = now - job_info.last_ctime_ms() > interval_ms; |
111 | 0 | if (!finish_expired) { |
112 | 0 | LOG(INFO) << "the time since last finished job is too short. host=" |
113 | 0 | << job_info.ip_port() << " ctime=" << job_info.last_ctime_ms() |
114 | 0 | << " instance=" << instance_id; |
115 | 0 | } |
116 | |
|
117 | 0 | return finish_expired; |
118 | 40 | }; |
119 | | |
120 | 60 | if (err == TxnErrorCode::TXN_KEY_NOT_FOUND || is_expired()) { |
121 | 20 | job_info.set_status(JobRecyclePB::BUSY); |
122 | 20 | job_info.set_instance_id(instance_id); |
123 | 20 | job_info.set_ip_port(ip_port); |
124 | 20 | job_info.set_expiration_time_ms(now + config::recycle_job_lease_expired_ms); |
125 | 20 | val = job_info.SerializeAsString(); |
126 | 20 | txn->put(key, val); |
127 | 20 | err = txn->commit(); |
128 | 20 | if (err != TxnErrorCode::TXN_OK) { |
129 | 0 | LOG(WARNING) << "failed to commit, err=" << err << " key=" << hex(key); |
130 | 0 | return -1; |
131 | 0 | } |
132 | 20 | return 0; |
133 | 20 | } |
134 | 40 | return 1; |
135 | 60 | } |
136 | | |
137 | | void finish_instance_recycle_job(TxnKv* txn_kv, std::string_view key, |
138 | | const std::string& instance_id, const std::string& ip_port, |
139 | 20 | bool success, int64_t ctime_ms) { |
140 | 20 | std::string val; |
141 | 20 | int retry_times = 0; |
142 | 20 | do { |
143 | 20 | std::unique_ptr<Transaction> txn; |
144 | 20 | TxnErrorCode err = txn_kv->create_txn(&txn); |
145 | 20 | if (err != TxnErrorCode::TXN_OK) { |
146 | 0 | LOG(WARNING) << "failed to create txn"; |
147 | 0 | return; |
148 | 0 | } |
149 | 20 | err = txn->get(key, &val); |
150 | 20 | if (err != TxnErrorCode::TXN_OK) { |
151 | 0 | LOG(WARNING) << "failed to get kv, err=" << err << " key=" << hex(key); |
152 | 0 | return; |
153 | 0 | } |
154 | | |
155 | 20 | using namespace std::chrono; |
156 | 20 | auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
157 | 20 | JobRecyclePB job_info; |
158 | 20 | if (!job_info.ParseFromString(val)) { |
159 | 0 | LOG(WARNING) << "failed to parse JobRecyclePB, key=" << hex(key); |
160 | 0 | return; |
161 | 0 | } |
162 | 20 | DCHECK(job_info.instance_id() == instance_id); |
163 | 20 | if (job_info.ip_port() != ip_port) { |
164 | 0 | LOG(WARNING) << "job is doing at other machine: " << job_info.ip_port() |
165 | 0 | << " key=" << hex(key); |
166 | 0 | return; |
167 | 0 | } |
168 | 20 | if (job_info.status() != JobRecyclePB::BUSY) { |
169 | 0 | LOG(WARNING) << "job is not busy, key=" << hex(key); |
170 | 0 | return; |
171 | 0 | } |
172 | 20 | job_info.set_status(JobRecyclePB::IDLE); |
173 | 20 | job_info.set_instance_id(instance_id); |
174 | 20 | job_info.set_last_finish_time_ms(now); |
175 | 20 | job_info.set_last_ctime_ms(ctime_ms); |
176 | 20 | if (success) { |
177 | 20 | job_info.set_last_success_time_ms(now); |
178 | 20 | } |
179 | 20 | val = job_info.SerializeAsString(); |
180 | 20 | txn->put(key, val); |
181 | 20 | err = txn->commit(); |
182 | 20 | if (err == TxnErrorCode::TXN_OK) { |
183 | 20 | LOG(INFO) << "succ to commit to finish recycle job, key=" << hex(key); |
184 | 20 | return; |
185 | 20 | } |
186 | | // maybe conflict with the commit of the leased thread |
187 | 0 | LOG(WARNING) << "failed to commit to finish recycle job, err=" << err << " key=" << hex(key) |
188 | 0 | << " retry_times=" << retry_times; |
189 | 0 | } while (retry_times++ < 3); |
190 | 0 | LOG(WARNING) << "finally failed to commit to finish recycle job, key=" << hex(key); |
191 | 0 | } |
192 | | |
193 | | int lease_instance_recycle_job(TxnKv* txn_kv, std::string_view key, const std::string& instance_id, |
194 | 60 | const std::string& ip_port) { |
195 | 60 | std::string val; |
196 | 60 | std::unique_ptr<Transaction> txn; |
197 | 60 | TxnErrorCode err = txn_kv->create_txn(&txn); |
198 | 60 | if (err != TxnErrorCode::TXN_OK) { |
199 | 0 | LOG(WARNING) << "failed to create txn"; |
200 | 0 | return -1; |
201 | 0 | } |
202 | 60 | err = txn->get(key, &val); |
203 | 60 | if (err != TxnErrorCode::TXN_OK) { |
204 | 0 | LOG(WARNING) << "failed to get kv, err=" << err << " key=" << hex(key); |
205 | 0 | return -1; |
206 | 0 | } |
207 | | |
208 | 60 | using namespace std::chrono; |
209 | 60 | auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
210 | 60 | JobRecyclePB job_info; |
211 | 60 | if (!job_info.ParseFromString(val)) { |
212 | 0 | LOG(WARNING) << "failed to parse JobRecyclePB, key=" << hex(key); |
213 | 0 | return 1; |
214 | 0 | } |
215 | 60 | DCHECK(job_info.instance_id() == instance_id); |
216 | 60 | if (job_info.ip_port() != ip_port) { |
217 | 0 | LOG(WARNING) << "job is doing at other machine: " << job_info.ip_port() |
218 | 0 | << " key=" << hex(key); |
219 | 0 | return 1; |
220 | 0 | } |
221 | 60 | if (job_info.status() != JobRecyclePB::BUSY) { |
222 | 0 | LOG(WARNING) << "job is not busy, key=" << hex(key); |
223 | 0 | return 1; |
224 | 0 | } |
225 | 60 | job_info.set_expiration_time_ms(now + config::recycle_job_lease_expired_ms); |
226 | 60 | val = job_info.SerializeAsString(); |
227 | 60 | txn->put(key, val); |
228 | 60 | err = txn->commit(); |
229 | 60 | if (err != TxnErrorCode::TXN_OK) { |
230 | 0 | LOG(WARNING) << "failed to commit, failed to lease recycle job, err=" << err |
231 | 0 | << " key=" << hex(key); |
232 | 0 | return -1; |
233 | 0 | } |
234 | 60 | return 0; |
235 | 60 | } |
236 | | |
237 | | int get_tablet_idx(TxnKv* txn_kv, const std::string& instance_id, int64_t tablet_id, |
238 | 65 | TabletIndexPB& tablet_idx) { |
239 | 65 | std::unique_ptr<Transaction> txn; |
240 | 65 | TxnErrorCode err = txn_kv->create_txn(&txn); |
241 | 65 | if (err != TxnErrorCode::TXN_OK) { |
242 | 0 | LOG(WARNING) << "failed to create txn"; |
243 | 0 | return -1; |
244 | 0 | } |
245 | | |
246 | 65 | std::string key, val; |
247 | 65 | meta_tablet_idx_key({instance_id, tablet_id}, &key); |
248 | 65 | err = txn->get(key, &val); |
249 | 65 | if (err != TxnErrorCode::TXN_OK) { |
250 | 0 | LOG(WARNING) << fmt::format("failed to get tablet_idx, err={} tablet_id={} key={}", err, |
251 | 0 | tablet_id, hex(key)); |
252 | 0 | return -1; |
253 | 0 | } |
254 | 65 | if (!tablet_idx.ParseFromString(val)) [[unlikely]] { |
255 | 0 | LOG(WARNING) << fmt::format("malformed tablet index value, tablet_id={} key={}", tablet_id, |
256 | 0 | hex(key)); |
257 | 0 | return -1; |
258 | 0 | } |
259 | 65 | if (tablet_id != tablet_idx.tablet_id()) [[unlikely]] { |
260 | 0 | LOG(WARNING) << "unexpected error given_tablet_id=" << tablet_id |
261 | 0 | << " idx_pb_tablet_id=" << tablet_idx.tablet_id() << " key=" << hex(key); |
262 | 0 | return -1; |
263 | 0 | } |
264 | 65 | return 0; |
265 | 65 | } |
266 | | |
267 | | int get_tablet_meta(TxnKv* txn_kv, const std::string& instance_id, int64_t tablet_id, |
268 | 65 | TabletMetaCloudPB& tablet_meta) { |
269 | 65 | TabletIndexPB tablet_idx; |
270 | 65 | int ret = get_tablet_idx(txn_kv, instance_id, tablet_id, tablet_idx); |
271 | 65 | if (ret < 0) { |
272 | 0 | return ret; |
273 | 0 | } |
274 | | |
275 | 65 | std::unique_ptr<Transaction> txn; |
276 | 65 | TxnErrorCode err = txn_kv->create_txn(&txn); |
277 | 65 | if (err != TxnErrorCode::TXN_OK) { |
278 | 0 | LOG(WARNING) << "failed to create txn"; |
279 | 0 | return -1; |
280 | 0 | } |
281 | | |
282 | 65 | std::string key, val; |
283 | 65 | meta_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(), |
284 | 65 | tablet_idx.partition_id(), tablet_id}, |
285 | 65 | &key); |
286 | 65 | err = txn->get(key, &val); |
287 | 65 | if (err != TxnErrorCode::TXN_OK) { |
288 | 0 | LOG(WARNING) << fmt::format( |
289 | 0 | "failed to get tablet, err={}, table_id={}, index_id={}, partition_id={}, " |
290 | 0 | "tablet_id={} key={}", |
291 | 0 | err, tablet_idx.table_id(), tablet_idx.index_id(), tablet_idx.partition_id(), |
292 | 0 | tablet_id, hex(key)); |
293 | 0 | return -1; |
294 | 0 | } |
295 | 65 | if (!tablet_meta.ParseFromString(val)) [[unlikely]] { |
296 | 0 | LOG(WARNING) << fmt::format("malformed tablet meta, tablet_id={} key={}", tablet_id, |
297 | 0 | hex(key)); |
298 | 0 | return -1; |
299 | 0 | } |
300 | 65 | return 0; |
301 | 65 | } |
302 | | } // namespace doris::cloud |