Coverage Report

Created: 2025-05-22 18:31

/root/doris/cloud/src/recycler/util.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "recycler/util.h"
19
20
#include <glog/logging.h>
21
22
#include <cstdint>
23
24
#include "common/util.h"
25
#include "meta-service/keys.h"
26
#include "meta-service/meta_service_schema.h"
27
#include "meta-service/txn_kv.h"
28
#include "meta-service/txn_kv_error.h"
29
30
namespace doris::cloud {
31
namespace config {
32
extern int32_t recycle_job_lease_expired_ms;
33
} // namespace config
34
35
10
int get_all_instances(TxnKv* txn_kv, std::vector<InstanceInfoPB>& res) {
36
10
    InstanceKeyInfo key0_info {""};
37
10
    InstanceKeyInfo key1_info {"\xff"}; // instance id are human readable strings
38
10
    std::string key0;
39
10
    std::string key1;
40
10
    instance_key(key0_info, &key0);
41
10
    instance_key(key1_info, &key1);
42
43
10
    std::unique_ptr<Transaction> txn;
44
10
    TxnErrorCode err = txn_kv->create_txn(&txn);
45
10
    if (err != TxnErrorCode::TXN_OK) {
46
0
        LOG(INFO) << "failed to init txn, err=" << err;
47
0
        return -1;
48
0
    }
49
50
10
    std::unique_ptr<RangeGetIterator> it;
51
10
    do {
52
10
        TxnErrorCode err = txn->get(key0, key1, &it);
53
10
        if (err != TxnErrorCode::TXN_OK) {
54
0
            LOG(WARNING) << "failed to get instance, err=" << err;
55
0
            return -1;
56
0
        }
57
58
100
        while (it->has_next()) {
59
90
            auto [k, v] = it->next();
60
90
            if (!it->has_next()) key0 = k;
61
62
90
            InstanceInfoPB instance_info;
63
90
            if (!instance_info.ParseFromArray(v.data(), v.size())) {
64
0
                LOG(WARNING) << "malformed instance info, key=" << hex(k);
65
0
                return -1;
66
0
            }
67
90
            res.push_back(std::move(instance_info));
68
90
        }
69
10
        key0.push_back('\x00'); // Update to next smallest key for iteration
70
10
    } while (it->more());
71
72
10
    return 0;
73
10
}
74
75
int prepare_instance_recycle_job(TxnKv* txn_kv, std::string_view key,
76
                                 const std::string& instance_id, const std::string& ip_port,
77
60
                                 int64_t interval_ms) {
78
60
    std::string val;
79
60
    std::unique_ptr<Transaction> txn;
80
60
    TxnErrorCode err = txn_kv->create_txn(&txn);
81
60
    if (err != TxnErrorCode::TXN_OK) {
82
0
        LOG(WARNING) << "failed to create txn";
83
0
        return -1;
84
0
    }
85
60
    err = txn->get(key, &val);
86
60
    if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
87
0
        LOG(WARNING) << "failed to get kv, err=" << err << " key=" << hex(key);
88
0
        return -1;
89
0
    }
90
60
    using namespace std::chrono;
91
60
    auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
92
60
    JobRecyclePB job_info;
93
94
60
    auto is_expired = [&]() {
95
40
        if (!job_info.ParseFromString(val)) {
96
0
            LOG(WARNING) << "failed to parse JobRecyclePB, key=" << hex(key);
97
            // if failed to parse, just recycle it.
98
0
            return true;
99
0
        }
100
40
        DCHECK(job_info.instance_id() == instance_id);
101
40
        if (job_info.status() == JobRecyclePB::BUSY) {
102
40
            if (now < job_info.expiration_time_ms()) {
103
40
                LOG(INFO) << "job is busy. host=" << job_info.ip_port()
104
40
                          << " expiration=" << job_info.expiration_time_ms()
105
40
                          << " instance=" << instance_id;
106
40
                return false;
107
40
            }
108
40
        }
109
110
0
        bool finish_expired = now - job_info.last_ctime_ms() > interval_ms;
111
0
        if (!finish_expired) {
112
0
            LOG(INFO) << "the time since last finished job is too short. host="
113
0
                      << job_info.ip_port() << " ctime=" << job_info.last_ctime_ms()
114
0
                      << " instance=" << instance_id;
115
0
        }
116
117
0
        return finish_expired;
118
40
    };
119
120
60
    if (err == TxnErrorCode::TXN_KEY_NOT_FOUND || is_expired()) {
121
20
        job_info.set_status(JobRecyclePB::BUSY);
122
20
        job_info.set_instance_id(instance_id);
123
20
        job_info.set_ip_port(ip_port);
124
20
        job_info.set_expiration_time_ms(now + config::recycle_job_lease_expired_ms);
125
20
        val = job_info.SerializeAsString();
126
20
        txn->put(key, val);
127
20
        err = txn->commit();
128
20
        if (err != TxnErrorCode::TXN_OK) {
129
0
            LOG(WARNING) << "failed to commit, err=" << err << " key=" << hex(key);
130
0
            return -1;
131
0
        }
132
20
        return 0;
133
20
    }
134
40
    return 1;
135
60
}
136
137
void finish_instance_recycle_job(TxnKv* txn_kv, std::string_view key,
138
                                 const std::string& instance_id, const std::string& ip_port,
139
20
                                 bool success, int64_t ctime_ms) {
140
20
    std::string val;
141
20
    int retry_times = 0;
142
20
    do {
143
20
        std::unique_ptr<Transaction> txn;
144
20
        TxnErrorCode err = txn_kv->create_txn(&txn);
145
20
        if (err != TxnErrorCode::TXN_OK) {
146
0
            LOG(WARNING) << "failed to create txn";
147
0
            return;
148
0
        }
149
20
        err = txn->get(key, &val);
150
20
        if (err != TxnErrorCode::TXN_OK) {
151
0
            LOG(WARNING) << "failed to get kv, err=" << err << " key=" << hex(key);
152
0
            return;
153
0
        }
154
155
20
        using namespace std::chrono;
156
20
        auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
157
20
        JobRecyclePB job_info;
158
20
        if (!job_info.ParseFromString(val)) {
159
0
            LOG(WARNING) << "failed to parse JobRecyclePB, key=" << hex(key);
160
0
            return;
161
0
        }
162
20
        DCHECK(job_info.instance_id() == instance_id);
163
20
        if (job_info.ip_port() != ip_port) {
164
0
            LOG(WARNING) << "job is doing at other machine: " << job_info.ip_port()
165
0
                         << " key=" << hex(key);
166
0
            return;
167
0
        }
168
20
        if (job_info.status() != JobRecyclePB::BUSY) {
169
0
            LOG(WARNING) << "job is not busy, key=" << hex(key);
170
0
            return;
171
0
        }
172
20
        job_info.set_status(JobRecyclePB::IDLE);
173
20
        job_info.set_instance_id(instance_id);
174
20
        job_info.set_last_finish_time_ms(now);
175
20
        job_info.set_last_ctime_ms(ctime_ms);
176
20
        if (success) {
177
20
            job_info.set_last_success_time_ms(now);
178
20
        }
179
20
        val = job_info.SerializeAsString();
180
20
        txn->put(key, val);
181
20
        err = txn->commit();
182
20
        if (err == TxnErrorCode::TXN_OK) {
183
20
            LOG(INFO) << "succ to commit to finish recycle job, key=" << hex(key);
184
20
            return;
185
20
        }
186
        // maybe conflict with the commit of the leased thread
187
0
        LOG(WARNING) << "failed to commit to finish recycle job, err=" << err << " key=" << hex(key)
188
0
                     << " retry_times=" << retry_times;
189
0
    } while (retry_times++ < 3);
190
0
    LOG(WARNING) << "finally failed to commit to finish recycle job, key=" << hex(key);
191
0
}
192
193
int lease_instance_recycle_job(TxnKv* txn_kv, std::string_view key, const std::string& instance_id,
194
60
                               const std::string& ip_port) {
195
60
    std::string val;
196
60
    std::unique_ptr<Transaction> txn;
197
60
    TxnErrorCode err = txn_kv->create_txn(&txn);
198
60
    if (err != TxnErrorCode::TXN_OK) {
199
0
        LOG(WARNING) << "failed to create txn";
200
0
        return -1;
201
0
    }
202
60
    err = txn->get(key, &val);
203
60
    if (err != TxnErrorCode::TXN_OK) {
204
0
        LOG(WARNING) << "failed to get kv, err=" << err << " key=" << hex(key);
205
0
        return -1;
206
0
    }
207
208
60
    using namespace std::chrono;
209
60
    auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
210
60
    JobRecyclePB job_info;
211
60
    if (!job_info.ParseFromString(val)) {
212
0
        LOG(WARNING) << "failed to parse JobRecyclePB, key=" << hex(key);
213
0
        return 1;
214
0
    }
215
60
    DCHECK(job_info.instance_id() == instance_id);
216
60
    if (job_info.ip_port() != ip_port) {
217
0
        LOG(WARNING) << "job is doing at other machine: " << job_info.ip_port()
218
0
                     << " key=" << hex(key);
219
0
        return 1;
220
0
    }
221
60
    if (job_info.status() != JobRecyclePB::BUSY) {
222
0
        LOG(WARNING) << "job is not busy, key=" << hex(key);
223
0
        return 1;
224
0
    }
225
60
    job_info.set_expiration_time_ms(now + config::recycle_job_lease_expired_ms);
226
60
    val = job_info.SerializeAsString();
227
60
    txn->put(key, val);
228
60
    err = txn->commit();
229
60
    if (err != TxnErrorCode::TXN_OK) {
230
0
        LOG(WARNING) << "failed to commit, failed to lease recycle job, err=" << err
231
0
                     << " key=" << hex(key);
232
0
        return -1;
233
0
    }
234
60
    return 0;
235
60
}
236
237
int get_tablet_idx(TxnKv* txn_kv, const std::string& instance_id, int64_t tablet_id,
238
65
                   TabletIndexPB& tablet_idx) {
239
65
    std::unique_ptr<Transaction> txn;
240
65
    TxnErrorCode err = txn_kv->create_txn(&txn);
241
65
    if (err != TxnErrorCode::TXN_OK) {
242
0
        LOG(WARNING) << "failed to create txn";
243
0
        return -1;
244
0
    }
245
246
65
    std::string key, val;
247
65
    meta_tablet_idx_key({instance_id, tablet_id}, &key);
248
65
    err = txn->get(key, &val);
249
65
    if (err != TxnErrorCode::TXN_OK) {
250
0
        LOG(WARNING) << fmt::format("failed to get tablet_idx, err={} tablet_id={} key={}", err,
251
0
                                    tablet_id, hex(key));
252
0
        return -1;
253
0
    }
254
65
    if (!tablet_idx.ParseFromString(val)) [[unlikely]] {
255
0
        LOG(WARNING) << fmt::format("malformed tablet index value, tablet_id={} key={}", tablet_id,
256
0
                                    hex(key));
257
0
        return -1;
258
0
    }
259
65
    if (tablet_id != tablet_idx.tablet_id()) [[unlikely]] {
260
0
        LOG(WARNING) << "unexpected error given_tablet_id=" << tablet_id
261
0
                     << " idx_pb_tablet_id=" << tablet_idx.tablet_id() << " key=" << hex(key);
262
0
        return -1;
263
0
    }
264
65
    return 0;
265
65
}
266
267
int get_tablet_meta(TxnKv* txn_kv, const std::string& instance_id, int64_t tablet_id,
268
65
                    TabletMetaCloudPB& tablet_meta) {
269
65
    TabletIndexPB tablet_idx;
270
65
    int ret = get_tablet_idx(txn_kv, instance_id, tablet_id, tablet_idx);
271
65
    if (ret < 0) {
272
0
        return ret;
273
0
    }
274
275
65
    std::unique_ptr<Transaction> txn;
276
65
    TxnErrorCode err = txn_kv->create_txn(&txn);
277
65
    if (err != TxnErrorCode::TXN_OK) {
278
0
        LOG(WARNING) << "failed to create txn";
279
0
        return -1;
280
0
    }
281
282
65
    std::string key, val;
283
65
    meta_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
284
65
                     tablet_idx.partition_id(), tablet_id},
285
65
                    &key);
286
65
    err = txn->get(key, &val);
287
65
    if (err != TxnErrorCode::TXN_OK) {
288
0
        LOG(WARNING) << fmt::format(
289
0
                "failed to get tablet, err={}, table_id={}, index_id={}, partition_id={}, "
290
0
                "tablet_id={} key={}",
291
0
                err, tablet_idx.table_id(), tablet_idx.index_id(), tablet_idx.partition_id(),
292
0
                tablet_id, hex(key));
293
0
        return -1;
294
0
    }
295
65
    if (!tablet_meta.ParseFromString(val)) [[unlikely]] {
296
0
        LOG(WARNING) << fmt::format("malformed tablet meta, tablet_id={} key={}", tablet_id,
297
0
                                    hex(key));
298
0
        return -1;
299
0
    }
300
65
    return 0;
301
65
}
302
} // namespace doris::cloud