Coverage Report

Created: 2025-05-23 10:28

/root/doris/cloud/src/common/util.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
// clang-format off
19
#include "util.h"
20
21
#include <bthread/butex.h>
22
#include <butil/iobuf.h>
23
#include <google/protobuf/util/json_util.h>
24
25
// FIXME: we should not rely other modules that may rely on this common module
26
#include "common/logging.h"
27
#include "meta-service/keys.h"
28
#include "meta-service/codec.h"
29
#include "meta-service/txn_kv.h"
30
#include "meta-service/txn_kv_error.h"
31
32
#include <iomanip>
33
#include <sstream>
34
#include <unordered_map>
35
#include <variant>
36
// clang-format on
37
38
namespace doris::cloud {
39
40
/**
41
 * This is a naïve implementation of hex, DONOT use it on retical path.
42
 */
43
138k
std::string hex(std::string_view str) {
44
138k
    std::stringstream ss;
45
10.5M
    for (auto& i : str) {
46
10.5M
        ss << std::hex << std::setw(2) << std::setfill('0') << ((int16_t)i & 0xff);
47
10.5M
    }
48
138k
    return ss.str();
49
138k
}
50
51
/**
52
 * This is a naïve implementation of unhex.
53
 */
54
589
std::string unhex(std::string_view hex_str) {
55
    // clang-format off
56
589
    const static std::unordered_map<char, char> table = {
57
589
            {'0', 0},  {'1', 1},  {'2', 2},  {'3', 3},  {'4', 4}, 
58
589
            {'5', 5},  {'6', 6},  {'7', 7},  {'8', 8},  {'9', 9}, 
59
589
            {'a', 10}, {'b', 11}, {'c', 12}, {'d', 13}, {'e', 14}, {'f', 15},
60
589
            {'A', 10}, {'B', 11}, {'C', 12}, {'D', 13}, {'E', 14}, {'F', 15}};
61
589
    [[maybe_unused]] static int8_t lut[std::max({'9', 'f', 'F'}) + 1];
62
589
    lut[(int)'0'] = 0; lut[(int)'1'] = 1; lut[(int)'2'] = 2; lut[(int)'3'] = 3; lut[(int)'4'] = 4; lut[(int)'5'] = 5; lut[(int)'6'] = 6; lut[(int)'7'] = 7; lut[(int)'8'] = 8; lut[(int)'9'] = 9;
63
589
    lut[(int)'a'] = 10; lut[(int)'b'] = 11; lut[(int)'c'] = 12; lut[(int)'d'] = 13; lut[(int)'e'] = 14; lut[(int)'f'] = 15;
64
589
    lut[(int)'A'] = 10; lut[(int)'B'] = 11; lut[(int)'C'] = 12; lut[(int)'D'] = 13; lut[(int)'E'] = 14; lut[(int)'F'] = 15;
65
    // clang-format on
66
589
    size_t len = hex_str.length();
67
589
    len &= ~0x01UL;
68
589
    std::string buf(len >> 1, '\0');
69
13.4k
    for (size_t i = 0; i < len; ++i) {
70
12.8k
        const auto it = table.find(hex_str[i]);
71
12.8k
        if (it == table.end()) break;
72
12.8k
        buf[i >> 1] |= i & 0x1 ? (it->second & 0x0f) : (it->second & 0x0f) << 4;
73
12.8k
    }
74
589
    return buf;
75
589
}
76
77
static std::string explain_fields(std::string_view text, const std::vector<std::string>& fields,
78
38
                                  const std::vector<int>& pos, bool unicode = false) {
79
38
    if (fields.size() != pos.size() || fields.size() == 0 || pos.size() == 0) {
80
0
        return std::string(text.data(), text.size());
81
0
    }
82
38
    size_t last_hyphen_pos = pos.back() + 1;
83
38
    std::stringstream ss;
84
38
    std::string blank_line(last_hyphen_pos + 1, ' ');
85
86
    // clang-format off
87
38
    static const std::string hyphen("\xe2\x94\x80"); // ─ e2 94 80
88
38
    static const std::string bar   ("\xe2\x94\x82"); // │ e2 94 82
89
38
    static const std::string angle ("\xe2\x94\x8c"); // ┌ e2 94 8c
90
38
    static const std::string arrow ("\xe2\x96\xbc"); // ▼ e2 96 bc
91
    // clang-format on
92
93
    // Each line with hyphens
94
260
    for (size_t i = 0; i < fields.size(); ++i) {
95
222
        std::string line = blank_line;
96
222
        line[pos[i]] = '/';
97
222
        int nbar = i;
98
800
        for (size_t j = 0; j < i; ++j) {
99
578
            line[pos[j]] = '|';
100
578
        }
101
222
        int nhyphen = 0;
102
12.3k
        for (size_t j = pos[i] + 1; j <= last_hyphen_pos; ++j) {
103
12.1k
            line[j] = '-';
104
12.1k
            ++nhyphen;
105
12.1k
        }
106
107
222
        if (unicode) {
108
204
            int i = line.size();
109
204
            line.resize(line.size() + 2 * (1 /*angle*/ + nbar + nhyphen), ' ');
110
204
            int j = line.size();
111
20.0k
            while (--i >= 0) {
112
19.8k
                if (line[i] == '-') {
113
11.1k
                    line[--j] = hyphen[2];
114
11.1k
                    line[--j] = hyphen[1];
115
11.1k
                    line[--j] = hyphen[0];
116
11.1k
                } else if (line[i] == '|') {
117
533
                    line[--j] = bar[2];
118
533
                    line[--j] = bar[1];
119
533
                    line[--j] = bar[0];
120
8.09k
                } else if (line[i] == '/') {
121
204
                    line[--j] = angle[2];
122
204
                    line[--j] = angle[1];
123
204
                    line[--j] = angle[0];
124
7.89k
                } else {
125
7.89k
                    --j;
126
7.89k
                    continue;
127
7.89k
                }
128
11.9k
                line[i] = i != j ? ' ' : line[i]; // Replace if needed
129
11.9k
            }
130
204
        }
131
132
222
        ss << line << " " << i << ". " << fields[i] << "\n";
133
222
    }
134
135
    // Mark position indicator
136
38
    std::string line = blank_line;
137
260
    for (size_t i = 0; i < fields.size(); ++i) {
138
222
        line[pos[i]] = '|';
139
222
    }
140
141
38
    if (unicode) {
142
35
        int i = line.size();
143
35
        line.resize(line.size() + 2 * fields.size(), ' ');
144
35
        int j = line.size();
145
3.17k
        while (--i >= 0) {
146
3.13k
            if (line[i] != '|') {
147
2.93k
                --j;
148
2.93k
                continue;
149
2.93k
            }
150
204
            line[--j] = bar[2];
151
204
            line[--j] = bar[1];
152
204
            line[--j] = bar[0];
153
204
            line[i] = i != j ? ' ' : line[i]; // Replace if needed
154
204
        }
155
35
    }
156
157
38
    ss << line << "\n";
158
159
38
    line = blank_line;
160
260
    for (size_t i = 0; i < fields.size(); ++i) {
161
222
        line[pos[i]] = 'v';
162
222
    }
163
164
38
    if (unicode) {
165
35
        int i = line.size();
166
35
        line.resize(line.size() + 2 * fields.size(), ' ');
167
35
        int j = line.size();
168
3.17k
        while (--i >= 0) {
169
3.13k
            if (line[i] != 'v') {
170
2.93k
                --j;
171
2.93k
                continue;
172
2.93k
            }
173
204
            line[--j] = arrow[2];
174
204
            line[--j] = arrow[1];
175
204
            line[--j] = arrow[0];
176
204
            line[i] = i != j ? ' ' : line[i]; // Replace if needed
177
204
        }
178
35
    }
179
180
38
    ss << line << "\n";
181
182
    // Original text to explain
183
38
    ss << text << "\n";
184
185
38
    return ss.str();
186
38
}
187
188
38
std::string prettify_key(std::string_view key_hex, bool unicode) {
189
    // Decoded result container
190
    //                                    val                  tag  pos
191
    //                     .---------------^----------------.  .^.  .^.
192
38
    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> fields;
193
38
    std::string unhex_key = unhex(key_hex);
194
38
    int key_space = unhex_key[0];
195
38
    std::string_view key_copy = unhex_key;
196
38
    key_copy.remove_prefix(1); // Remove the first key space byte
197
38
    int ret = decode_key(&key_copy, &fields);
198
38
    if (ret != 0) return "";
199
200
38
    std::vector<std::string> fields_str;
201
38
    std::vector<int> fields_pos;
202
38
    fields_str.reserve(fields.size() + 1);
203
38
    fields_pos.reserve(fields.size() + 1);
204
    // Key space byte
205
38
    fields_str.push_back("key space: " + std::to_string(key_space));
206
38
    fields_pos.push_back(0);
207
208
184
    for (auto& i : fields) {
209
184
        fields_str.emplace_back(std::get<1>(i) == EncodingTag::BYTES_TAG
210
184
                                        ? std::get<std::string>(std::get<0>(i))
211
184
                                        : std::to_string(std::get<int64_t>(std::get<0>(i))));
212
184
        fields_pos.push_back((std::get<2>(i) + 1) * 2);
213
184
    }
214
215
38
    return explain_fields(key_hex, fields_str, fields_pos, unicode);
216
38
}
217
218
1.19k
std::string proto_to_json(const ::google::protobuf::Message& msg, bool add_whitespace) {
219
1.19k
    std::string json;
220
1.19k
    google::protobuf::util::JsonPrintOptions opts;
221
1.19k
    opts.add_whitespace = add_whitespace;
222
1.19k
    opts.preserve_proto_field_names = true;
223
1.19k
    google::protobuf::util::MessageToJsonString(msg, &json, opts);
224
1.19k
    return json;
225
1.19k
}
226
227
100k
std::vector<std::string_view> split_string(const std::string_view& str, int n) {
228
100k
    std::vector<std::string_view> substrings;
229
230
204k
    for (size_t i = 0; i < str.size(); i += n) {
231
104k
        substrings.push_back(str.substr(i, n));
232
104k
    }
233
234
100k
    return substrings;
235
100k
}
236
237
6.76k
bool ValueBuf::to_pb(google::protobuf::Message* pb) const {
238
6.76k
    butil::IOBuf merge;
239
6.79k
    for (auto&& it : iters) {
240
6.79k
        it->reset();
241
20.1k
        while (it->has_next()) {
242
13.3k
            auto [k, v] = it->next();
243
13.3k
            merge.append_user_data((void*)v.data(), v.size(), +[](void*) {});
244
13.3k
        }
245
6.79k
    }
246
6.76k
    butil::IOBufAsZeroCopyInputStream merge_stream(merge);
247
6.76k
    return pb->ParseFromZeroCopyStream(&merge_stream);
248
6.76k
}
249
250
1
std::string ValueBuf::value() const {
251
1
    butil::IOBuf merge;
252
1
    for (auto&& it : iters) {
253
1
        it->reset();
254
2
        while (it->has_next()) {
255
1
            auto [k, v] = it->next();
256
1
            merge.append_user_data((void*)v.data(), v.size(), +[](void*) {});
257
1
        }
258
1
    }
259
1
    return merge.to_string();
260
1
}
261
262
2
std::vector<std::string> ValueBuf::keys() const {
263
2
    std::vector<std::string> ret;
264
2
    for (auto&& it : iters) {
265
2
        it->reset();
266
4
        while (it->has_next()) {
267
2
            auto [k, _] = it->next();
268
2
            ret.push_back({k.data(), k.size()});
269
2
        }
270
2
    }
271
2
    return ret;
272
2
}
273
274
3
void ValueBuf::remove(Transaction* txn) const {
275
35
    for (auto&& it : iters) {
276
35
        it->reset();
277
3.32k
        while (it->has_next()) {
278
3.29k
            txn->remove(it->next().first);
279
3.29k
        }
280
35
    }
281
3
}
282
283
7.29k
TxnErrorCode ValueBuf::get(Transaction* txn, std::string_view key, bool snapshot) {
284
7.29k
    iters.clear();
285
7.29k
    ver = -1;
286
287
7.29k
    std::string begin_key {key};
288
7.29k
    std::string end_key {key};
289
7.29k
    encode_int64(INT64_MAX, &end_key);
290
7.29k
    std::unique_ptr<RangeGetIterator> it;
291
7.29k
    TxnErrorCode err = txn->get(begin_key, end_key, &it, snapshot);
292
7.29k
    if (err != TxnErrorCode::TXN_OK) {
293
24
        return err;
294
24
    }
295
7.27k
    if (!it->has_next()) {
296
504
        return TxnErrorCode::TXN_KEY_NOT_FOUND;
297
504
    }
298
    // Extract version
299
6.76k
    auto [k, _] = it->next();
300
6.76k
    if (k.size() == key.size()) { // Old version KV
301
92
        DCHECK(k == key) << hex(k) << ' ' << hex(key);
302
92
        DCHECK_EQ(it->size(), 1) << hex(k) << ' ' << hex(key);
303
92
        ver = 0;
304
6.67k
    } else {
305
6.67k
        k.remove_prefix(key.size());
306
6.67k
        int64_t suffix;
307
6.67k
        if (decode_int64(&k, &suffix) != 0) [[unlikely]] {
308
0
            LOG_WARNING("failed to decode key").tag("key", hex(k));
309
0
            return TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
310
0
        }
311
6.67k
        ver = suffix >> 56 & 0xff;
312
6.67k
    }
313
6.76k
    bool more = it->more();
314
6.76k
    if (!more) {
315
6.76k
        iters.push_back(std::move(it));
316
6.76k
        return TxnErrorCode::TXN_OK;
317
6.76k
    }
318
1
    begin_key = it->next_begin_key();
319
1
    iters.push_back(std::move(it));
320
32
    do {
321
32
        err = txn->get(begin_key, end_key, &it, snapshot);
322
32
        if (err != TxnErrorCode::TXN_OK) {
323
0
            return err;
324
0
        }
325
32
        more = it->more();
326
32
        if (more) {
327
31
            begin_key = it->next_begin_key();
328
31
        }
329
32
        iters.push_back(std::move(it));
330
32
    } while (more);
331
1
    return TxnErrorCode::TXN_OK;
332
1
}
333
334
7.29k
TxnErrorCode get(Transaction* txn, std::string_view key, ValueBuf* val, bool snapshot) {
335
7.29k
    return val->get(txn, key, snapshot);
336
7.29k
}
337
338
7.19k
TxnErrorCode key_exists(Transaction* txn, std::string_view key, bool snapshot) {
339
7.19k
    std::string end_key {key};
340
7.19k
    encode_int64(INT64_MAX, &end_key);
341
7.19k
    std::unique_ptr<RangeGetIterator> it;
342
7.19k
    TxnErrorCode err = txn->get(key, end_key, &it, snapshot, 1);
343
7.19k
    if (err != TxnErrorCode::TXN_OK) {
344
120
        return err;
345
120
    }
346
7.07k
    return it->has_next() ? TxnErrorCode::TXN_OK : TxnErrorCode::TXN_KEY_NOT_FOUND;
347
7.19k
}
348
349
void put(Transaction* txn, std::string_view key, const google::protobuf::Message& pb, uint8_t ver,
350
131
         size_t split_size) {
351
131
    std::string value;
352
131
    bool ret = pb.SerializeToString(&value); // Always success
353
131
    DCHECK(ret) << hex(key) << ' ' << pb.ShortDebugString();
354
131
    put(txn, key, value, ver, split_size);
355
131
}
356
357
void put(Transaction* txn, std::string_view key, std::string_view value, uint8_t ver,
358
100k
         size_t split_size) {
359
100k
    auto split_vec = split_string(value, split_size);
360
100k
    int64_t suffix_base = ver;
361
100k
    suffix_base <<= 56;
362
204k
    for (size_t i = 0; i < split_vec.size(); ++i) {
363
104k
        std::string k(key);
364
104k
        encode_int64(suffix_base + i, &k);
365
104k
        txn->put(k, split_vec[i]);
366
104k
    }
367
100k
}
368
369
} // namespace doris::cloud