/root/doris/cloud/src/common/util.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | // clang-format off |
19 | | #include "util.h" |
20 | | |
21 | | #include <bthread/butex.h> |
22 | | #include <butil/iobuf.h> |
23 | | #include <google/protobuf/util/json_util.h> |
24 | | |
25 | | // FIXME: we should not rely other modules that may rely on this common module |
26 | | #include "common/logging.h" |
27 | | #include "meta-service/keys.h" |
28 | | #include "meta-service/codec.h" |
29 | | #include "meta-service/txn_kv.h" |
30 | | #include "meta-service/txn_kv_error.h" |
31 | | |
32 | | #include <iomanip> |
33 | | #include <sstream> |
34 | | #include <unordered_map> |
35 | | #include <variant> |
36 | | // clang-format on |
37 | | |
38 | | namespace doris::cloud { |
39 | | |
40 | | /** |
41 | | * This is a naïve implementation of hex, DONOT use it on retical path. |
42 | | */ |
43 | 138k | std::string hex(std::string_view str) { |
44 | 138k | std::stringstream ss; |
45 | 10.5M | for (auto& i : str) { |
46 | 10.5M | ss << std::hex << std::setw(2) << std::setfill('0') << ((int16_t)i & 0xff); |
47 | 10.5M | } |
48 | 138k | return ss.str(); |
49 | 138k | } |
50 | | |
51 | | /** |
52 | | * This is a naïve implementation of unhex. |
53 | | */ |
54 | 589 | std::string unhex(std::string_view hex_str) { |
55 | | // clang-format off |
56 | 589 | const static std::unordered_map<char, char> table = { |
57 | 589 | {'0', 0}, {'1', 1}, {'2', 2}, {'3', 3}, {'4', 4}, |
58 | 589 | {'5', 5}, {'6', 6}, {'7', 7}, {'8', 8}, {'9', 9}, |
59 | 589 | {'a', 10}, {'b', 11}, {'c', 12}, {'d', 13}, {'e', 14}, {'f', 15}, |
60 | 589 | {'A', 10}, {'B', 11}, {'C', 12}, {'D', 13}, {'E', 14}, {'F', 15}}; |
61 | 589 | [[maybe_unused]] static int8_t lut[std::max({'9', 'f', 'F'}) + 1]; |
62 | 589 | lut[(int)'0'] = 0; lut[(int)'1'] = 1; lut[(int)'2'] = 2; lut[(int)'3'] = 3; lut[(int)'4'] = 4; lut[(int)'5'] = 5; lut[(int)'6'] = 6; lut[(int)'7'] = 7; lut[(int)'8'] = 8; lut[(int)'9'] = 9; |
63 | 589 | lut[(int)'a'] = 10; lut[(int)'b'] = 11; lut[(int)'c'] = 12; lut[(int)'d'] = 13; lut[(int)'e'] = 14; lut[(int)'f'] = 15; |
64 | 589 | lut[(int)'A'] = 10; lut[(int)'B'] = 11; lut[(int)'C'] = 12; lut[(int)'D'] = 13; lut[(int)'E'] = 14; lut[(int)'F'] = 15; |
65 | | // clang-format on |
66 | 589 | size_t len = hex_str.length(); |
67 | 589 | len &= ~0x01UL; |
68 | 589 | std::string buf(len >> 1, '\0'); |
69 | 13.4k | for (size_t i = 0; i < len; ++i) { |
70 | 12.8k | const auto it = table.find(hex_str[i]); |
71 | 12.8k | if (it == table.end()) break; |
72 | 12.8k | buf[i >> 1] |= i & 0x1 ? (it->second & 0x0f) : (it->second & 0x0f) << 4; |
73 | 12.8k | } |
74 | 589 | return buf; |
75 | 589 | } |
76 | | |
77 | | static std::string explain_fields(std::string_view text, const std::vector<std::string>& fields, |
78 | 38 | const std::vector<int>& pos, bool unicode = false) { |
79 | 38 | if (fields.size() != pos.size() || fields.size() == 0 || pos.size() == 0) { |
80 | 0 | return std::string(text.data(), text.size()); |
81 | 0 | } |
82 | 38 | size_t last_hyphen_pos = pos.back() + 1; |
83 | 38 | std::stringstream ss; |
84 | 38 | std::string blank_line(last_hyphen_pos + 1, ' '); |
85 | | |
86 | | // clang-format off |
87 | 38 | static const std::string hyphen("\xe2\x94\x80"); // ─ e2 94 80 |
88 | 38 | static const std::string bar ("\xe2\x94\x82"); // │ e2 94 82 |
89 | 38 | static const std::string angle ("\xe2\x94\x8c"); // ┌ e2 94 8c |
90 | 38 | static const std::string arrow ("\xe2\x96\xbc"); // ▼ e2 96 bc |
91 | | // clang-format on |
92 | | |
93 | | // Each line with hyphens |
94 | 260 | for (size_t i = 0; i < fields.size(); ++i) { |
95 | 222 | std::string line = blank_line; |
96 | 222 | line[pos[i]] = '/'; |
97 | 222 | int nbar = i; |
98 | 800 | for (size_t j = 0; j < i; ++j) { |
99 | 578 | line[pos[j]] = '|'; |
100 | 578 | } |
101 | 222 | int nhyphen = 0; |
102 | 12.3k | for (size_t j = pos[i] + 1; j <= last_hyphen_pos; ++j) { |
103 | 12.1k | line[j] = '-'; |
104 | 12.1k | ++nhyphen; |
105 | 12.1k | } |
106 | | |
107 | 222 | if (unicode) { |
108 | 204 | int i = line.size(); |
109 | 204 | line.resize(line.size() + 2 * (1 /*angle*/ + nbar + nhyphen), ' '); |
110 | 204 | int j = line.size(); |
111 | 20.0k | while (--i >= 0) { |
112 | 19.8k | if (line[i] == '-') { |
113 | 11.1k | line[--j] = hyphen[2]; |
114 | 11.1k | line[--j] = hyphen[1]; |
115 | 11.1k | line[--j] = hyphen[0]; |
116 | 11.1k | } else if (line[i] == '|') { |
117 | 533 | line[--j] = bar[2]; |
118 | 533 | line[--j] = bar[1]; |
119 | 533 | line[--j] = bar[0]; |
120 | 8.09k | } else if (line[i] == '/') { |
121 | 204 | line[--j] = angle[2]; |
122 | 204 | line[--j] = angle[1]; |
123 | 204 | line[--j] = angle[0]; |
124 | 7.89k | } else { |
125 | 7.89k | --j; |
126 | 7.89k | continue; |
127 | 7.89k | } |
128 | 11.9k | line[i] = i != j ? ' ' : line[i]; // Replace if needed |
129 | 11.9k | } |
130 | 204 | } |
131 | | |
132 | 222 | ss << line << " " << i << ". " << fields[i] << "\n"; |
133 | 222 | } |
134 | | |
135 | | // Mark position indicator |
136 | 38 | std::string line = blank_line; |
137 | 260 | for (size_t i = 0; i < fields.size(); ++i) { |
138 | 222 | line[pos[i]] = '|'; |
139 | 222 | } |
140 | | |
141 | 38 | if (unicode) { |
142 | 35 | int i = line.size(); |
143 | 35 | line.resize(line.size() + 2 * fields.size(), ' '); |
144 | 35 | int j = line.size(); |
145 | 3.17k | while (--i >= 0) { |
146 | 3.13k | if (line[i] != '|') { |
147 | 2.93k | --j; |
148 | 2.93k | continue; |
149 | 2.93k | } |
150 | 204 | line[--j] = bar[2]; |
151 | 204 | line[--j] = bar[1]; |
152 | 204 | line[--j] = bar[0]; |
153 | 204 | line[i] = i != j ? ' ' : line[i]; // Replace if needed |
154 | 204 | } |
155 | 35 | } |
156 | | |
157 | 38 | ss << line << "\n"; |
158 | | |
159 | 38 | line = blank_line; |
160 | 260 | for (size_t i = 0; i < fields.size(); ++i) { |
161 | 222 | line[pos[i]] = 'v'; |
162 | 222 | } |
163 | | |
164 | 38 | if (unicode) { |
165 | 35 | int i = line.size(); |
166 | 35 | line.resize(line.size() + 2 * fields.size(), ' '); |
167 | 35 | int j = line.size(); |
168 | 3.17k | while (--i >= 0) { |
169 | 3.13k | if (line[i] != 'v') { |
170 | 2.93k | --j; |
171 | 2.93k | continue; |
172 | 2.93k | } |
173 | 204 | line[--j] = arrow[2]; |
174 | 204 | line[--j] = arrow[1]; |
175 | 204 | line[--j] = arrow[0]; |
176 | 204 | line[i] = i != j ? ' ' : line[i]; // Replace if needed |
177 | 204 | } |
178 | 35 | } |
179 | | |
180 | 38 | ss << line << "\n"; |
181 | | |
182 | | // Original text to explain |
183 | 38 | ss << text << "\n"; |
184 | | |
185 | 38 | return ss.str(); |
186 | 38 | } |
187 | | |
188 | 38 | std::string prettify_key(std::string_view key_hex, bool unicode) { |
189 | | // Decoded result container |
190 | | // val tag pos |
191 | | // .---------------^----------------. .^. .^. |
192 | 38 | std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> fields; |
193 | 38 | std::string unhex_key = unhex(key_hex); |
194 | 38 | int key_space = unhex_key[0]; |
195 | 38 | std::string_view key_copy = unhex_key; |
196 | 38 | key_copy.remove_prefix(1); // Remove the first key space byte |
197 | 38 | int ret = decode_key(&key_copy, &fields); |
198 | 38 | if (ret != 0) return ""; |
199 | | |
200 | 38 | std::vector<std::string> fields_str; |
201 | 38 | std::vector<int> fields_pos; |
202 | 38 | fields_str.reserve(fields.size() + 1); |
203 | 38 | fields_pos.reserve(fields.size() + 1); |
204 | | // Key space byte |
205 | 38 | fields_str.push_back("key space: " + std::to_string(key_space)); |
206 | 38 | fields_pos.push_back(0); |
207 | | |
208 | 184 | for (auto& i : fields) { |
209 | 184 | fields_str.emplace_back(std::get<1>(i) == EncodingTag::BYTES_TAG |
210 | 184 | ? std::get<std::string>(std::get<0>(i)) |
211 | 184 | : std::to_string(std::get<int64_t>(std::get<0>(i)))); |
212 | 184 | fields_pos.push_back((std::get<2>(i) + 1) * 2); |
213 | 184 | } |
214 | | |
215 | 38 | return explain_fields(key_hex, fields_str, fields_pos, unicode); |
216 | 38 | } |
217 | | |
218 | 1.19k | std::string proto_to_json(const ::google::protobuf::Message& msg, bool add_whitespace) { |
219 | 1.19k | std::string json; |
220 | 1.19k | google::protobuf::util::JsonPrintOptions opts; |
221 | 1.19k | opts.add_whitespace = add_whitespace; |
222 | 1.19k | opts.preserve_proto_field_names = true; |
223 | 1.19k | google::protobuf::util::MessageToJsonString(msg, &json, opts); |
224 | 1.19k | return json; |
225 | 1.19k | } |
226 | | |
227 | 100k | std::vector<std::string_view> split_string(const std::string_view& str, int n) { |
228 | 100k | std::vector<std::string_view> substrings; |
229 | | |
230 | 204k | for (size_t i = 0; i < str.size(); i += n) { |
231 | 104k | substrings.push_back(str.substr(i, n)); |
232 | 104k | } |
233 | | |
234 | 100k | return substrings; |
235 | 100k | } |
236 | | |
237 | 6.76k | bool ValueBuf::to_pb(google::protobuf::Message* pb) const { |
238 | 6.76k | butil::IOBuf merge; |
239 | 6.79k | for (auto&& it : iters) { |
240 | 6.79k | it->reset(); |
241 | 20.1k | while (it->has_next()) { |
242 | 13.3k | auto [k, v] = it->next(); |
243 | 13.3k | merge.append_user_data((void*)v.data(), v.size(), +[](void*) {}); |
244 | 13.3k | } |
245 | 6.79k | } |
246 | 6.76k | butil::IOBufAsZeroCopyInputStream merge_stream(merge); |
247 | 6.76k | return pb->ParseFromZeroCopyStream(&merge_stream); |
248 | 6.76k | } |
249 | | |
250 | 1 | std::string ValueBuf::value() const { |
251 | 1 | butil::IOBuf merge; |
252 | 1 | for (auto&& it : iters) { |
253 | 1 | it->reset(); |
254 | 2 | while (it->has_next()) { |
255 | 1 | auto [k, v] = it->next(); |
256 | 1 | merge.append_user_data((void*)v.data(), v.size(), +[](void*) {}); |
257 | 1 | } |
258 | 1 | } |
259 | 1 | return merge.to_string(); |
260 | 1 | } |
261 | | |
262 | 2 | std::vector<std::string> ValueBuf::keys() const { |
263 | 2 | std::vector<std::string> ret; |
264 | 2 | for (auto&& it : iters) { |
265 | 2 | it->reset(); |
266 | 4 | while (it->has_next()) { |
267 | 2 | auto [k, _] = it->next(); |
268 | 2 | ret.push_back({k.data(), k.size()}); |
269 | 2 | } |
270 | 2 | } |
271 | 2 | return ret; |
272 | 2 | } |
273 | | |
274 | 3 | void ValueBuf::remove(Transaction* txn) const { |
275 | 35 | for (auto&& it : iters) { |
276 | 35 | it->reset(); |
277 | 3.32k | while (it->has_next()) { |
278 | 3.29k | txn->remove(it->next().first); |
279 | 3.29k | } |
280 | 35 | } |
281 | 3 | } |
282 | | |
283 | 7.29k | TxnErrorCode ValueBuf::get(Transaction* txn, std::string_view key, bool snapshot) { |
284 | 7.29k | iters.clear(); |
285 | 7.29k | ver = -1; |
286 | | |
287 | 7.29k | std::string begin_key {key}; |
288 | 7.29k | std::string end_key {key}; |
289 | 7.29k | encode_int64(INT64_MAX, &end_key); |
290 | 7.29k | std::unique_ptr<RangeGetIterator> it; |
291 | 7.29k | TxnErrorCode err = txn->get(begin_key, end_key, &it, snapshot); |
292 | 7.29k | if (err != TxnErrorCode::TXN_OK) { |
293 | 24 | return err; |
294 | 24 | } |
295 | 7.27k | if (!it->has_next()) { |
296 | 504 | return TxnErrorCode::TXN_KEY_NOT_FOUND; |
297 | 504 | } |
298 | | // Extract version |
299 | 6.76k | auto [k, _] = it->next(); |
300 | 6.76k | if (k.size() == key.size()) { // Old version KV |
301 | 92 | DCHECK(k == key) << hex(k) << ' ' << hex(key); |
302 | 92 | DCHECK_EQ(it->size(), 1) << hex(k) << ' ' << hex(key); |
303 | 92 | ver = 0; |
304 | 6.67k | } else { |
305 | 6.67k | k.remove_prefix(key.size()); |
306 | 6.67k | int64_t suffix; |
307 | 6.67k | if (decode_int64(&k, &suffix) != 0) [[unlikely]] { |
308 | 0 | LOG_WARNING("failed to decode key").tag("key", hex(k)); |
309 | 0 | return TxnErrorCode::TXN_UNIDENTIFIED_ERROR; |
310 | 0 | } |
311 | 6.67k | ver = suffix >> 56 & 0xff; |
312 | 6.67k | } |
313 | 6.76k | bool more = it->more(); |
314 | 6.76k | if (!more) { |
315 | 6.76k | iters.push_back(std::move(it)); |
316 | 6.76k | return TxnErrorCode::TXN_OK; |
317 | 6.76k | } |
318 | 1 | begin_key = it->next_begin_key(); |
319 | 1 | iters.push_back(std::move(it)); |
320 | 32 | do { |
321 | 32 | err = txn->get(begin_key, end_key, &it, snapshot); |
322 | 32 | if (err != TxnErrorCode::TXN_OK) { |
323 | 0 | return err; |
324 | 0 | } |
325 | 32 | more = it->more(); |
326 | 32 | if (more) { |
327 | 31 | begin_key = it->next_begin_key(); |
328 | 31 | } |
329 | 32 | iters.push_back(std::move(it)); |
330 | 32 | } while (more); |
331 | 1 | return TxnErrorCode::TXN_OK; |
332 | 1 | } |
333 | | |
334 | 7.29k | TxnErrorCode get(Transaction* txn, std::string_view key, ValueBuf* val, bool snapshot) { |
335 | 7.29k | return val->get(txn, key, snapshot); |
336 | 7.29k | } |
337 | | |
338 | 7.19k | TxnErrorCode key_exists(Transaction* txn, std::string_view key, bool snapshot) { |
339 | 7.19k | std::string end_key {key}; |
340 | 7.19k | encode_int64(INT64_MAX, &end_key); |
341 | 7.19k | std::unique_ptr<RangeGetIterator> it; |
342 | 7.19k | TxnErrorCode err = txn->get(key, end_key, &it, snapshot, 1); |
343 | 7.19k | if (err != TxnErrorCode::TXN_OK) { |
344 | 120 | return err; |
345 | 120 | } |
346 | 7.07k | return it->has_next() ? TxnErrorCode::TXN_OK : TxnErrorCode::TXN_KEY_NOT_FOUND; |
347 | 7.19k | } |
348 | | |
349 | | void put(Transaction* txn, std::string_view key, const google::protobuf::Message& pb, uint8_t ver, |
350 | 131 | size_t split_size) { |
351 | 131 | std::string value; |
352 | 131 | bool ret = pb.SerializeToString(&value); // Always success |
353 | 131 | DCHECK(ret) << hex(key) << ' ' << pb.ShortDebugString(); |
354 | 131 | put(txn, key, value, ver, split_size); |
355 | 131 | } |
356 | | |
357 | | void put(Transaction* txn, std::string_view key, std::string_view value, uint8_t ver, |
358 | 100k | size_t split_size) { |
359 | 100k | auto split_vec = split_string(value, split_size); |
360 | 100k | int64_t suffix_base = ver; |
361 | 100k | suffix_base <<= 56; |
362 | 204k | for (size_t i = 0; i < split_vec.size(); ++i) { |
363 | 104k | std::string k(key); |
364 | 104k | encode_int64(suffix_base + i, &k); |
365 | 104k | txn->put(k, split_vec[i]); |
366 | 104k | } |
367 | 100k | } |
368 | | |
369 | | } // namespace doris::cloud |