Coverage Report

Created: 2026-03-17 00:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/rowset/rowset_meta_manager.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/rowset/rowset_meta_manager.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/olap_file.pb.h>
22
23
#include <boost/algorithm/string/trim.hpp>
24
#include <fstream>
25
#include <functional>
26
#include <memory>
27
#include <new>
28
#include <string>
29
#include <string_view>
30
#include <vector>
31
32
#include "common/logging.h"
33
#include "storage/binlog.h"
34
#include "storage/olap_define.h"
35
#include "storage/olap_meta.h"
36
#include "storage/utils.h"
37
#include "util/debug_points.h"
38
39
namespace doris {
40
#include "common/compile_check_begin.h"
41
using namespace ErrorCode;
42
43
bool RowsetMetaManager::check_rowset_meta(OlapMeta* meta, TabletUid tablet_uid,
44
0
                                          const RowsetId& rowset_id) {
45
0
    std::string key = ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string();
46
0
    std::string value;
47
0
    return meta->key_may_exist(META_COLUMN_FAMILY_INDEX, key, &value);
48
0
}
49
50
40
Status RowsetMetaManager::exists(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id) {
51
40
    std::string key = ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string();
52
40
    std::string value;
53
40
    return meta->get(META_COLUMN_FAMILY_INDEX, key, &value);
54
40
}
55
56
Status RowsetMetaManager::get_rowset_meta(OlapMeta* meta, TabletUid tablet_uid,
57
                                          const RowsetId& rowset_id,
58
9
                                          RowsetMetaSharedPtr rowset_meta) {
59
9
    std::string key = ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string();
60
9
    std::string value;
61
9
    Status s = meta->get(META_COLUMN_FAMILY_INDEX, key, &value);
62
9
    if (s.is<META_KEY_NOT_FOUND>()) {
63
3
        return Status::Error<META_KEY_NOT_FOUND>("rowset id: {} not found.", key);
64
6
    } else if (!s.ok()) {
65
0
        return Status::Error<IO_ERROR>("load rowset id: {} failed.", key);
66
0
    }
67
6
    bool ret = rowset_meta->init(value);
68
6
    if (!ret) {
69
0
        return Status::Error<SERIALIZE_PROTOBUF_ERROR>("parse rowset meta failed. rowset id: {}",
70
0
                                                       key);
71
0
    }
72
6
    return Status::OK();
73
6
}
74
75
Status RowsetMetaManager::save(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id,
76
69
                               const RowsetMetaPB& rowset_meta_pb, bool enable_binlog) {
77
69
    if (rowset_meta_pb.partition_id() <= 0) {
78
22
        LOG(WARNING) << "invalid partition id " << rowset_meta_pb.partition_id() << " tablet "
79
22
                     << rowset_meta_pb.tablet_id();
80
        // TODO(dx): after fix partition id eq 0 bug, fix it
81
        // return Status::InternalError("invaid partition id {} tablet {}",
82
        //  rowset_meta_pb.partition_id(), rowset_meta_pb.tablet_id());
83
22
    }
84
69
    DBUG_EXECUTE_IF("RowsetMetaManager::save::zero_partition_id", {
85
69
        long partition_id = rowset_meta_pb.partition_id();
86
69
        auto& rs_pb = const_cast<std::decay_t<decltype(rowset_meta_pb)>&>(rowset_meta_pb);
87
69
        rs_pb.set_partition_id(0);
88
69
        LOG(WARNING) << "set debug point RowsetMetaManager::save::zero_partition_id old="
89
69
                     << partition_id << " new=" << rowset_meta_pb.DebugString();
90
69
    });
91
69
    if (enable_binlog) {
92
2
        return _save_with_binlog(meta, tablet_uid, rowset_id, rowset_meta_pb);
93
67
    } else {
94
67
        return _save(meta, tablet_uid, rowset_id, rowset_meta_pb);
95
67
    }
96
69
}
97
98
Status RowsetMetaManager::_save(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id,
99
67
                                const RowsetMetaPB& rowset_meta_pb) {
100
67
    std::string key =
101
67
            fmt::format("{}{}_{}", ROWSET_PREFIX, tablet_uid.to_string(), rowset_id.to_string());
102
67
    std::string value;
103
67
    if (!rowset_meta_pb.SerializeToString(&value)) {
104
0
        return Status::Error<SERIALIZE_PROTOBUF_ERROR>("serialize rowset pb failed. rowset id:{}",
105
0
                                                       key);
106
0
    }
107
108
67
    return meta->put(META_COLUMN_FAMILY_INDEX, key, value);
109
67
}
110
111
Status RowsetMetaManager::_save_with_binlog(OlapMeta* meta, TabletUid tablet_uid,
112
                                            const RowsetId& rowset_id,
113
2
                                            const RowsetMetaPB& rowset_meta_pb) {
114
    // create rowset write data
115
2
    std::string rowset_key =
116
2
            fmt::format("{}{}_{}", ROWSET_PREFIX, tablet_uid.to_string(), rowset_id.to_string());
117
2
    std::string rowset_value;
118
2
    if (!rowset_meta_pb.SerializeToString(&rowset_value)) {
119
0
        return Status::Error<SERIALIZE_PROTOBUF_ERROR>("serialize rowset pb failed. rowset id:{}",
120
0
                                                       rowset_key);
121
0
    }
122
123
    // create binlog write data
124
    // binlog_meta_key format: {kBinlogPrefix}meta_{tablet_uid}_{version}_{rowset_id}
125
    // binlog_data_key format: {kBinlogPrefix}data_{tablet_uid}_{version}_{rowset_id}
126
    // version is formatted to 20 bytes to avoid the problem of sorting, version is lower, timestamp is lower
127
    // binlog key is not supported for cumulative rowset
128
2
    if (rowset_meta_pb.start_version() != rowset_meta_pb.end_version()) {
129
0
        return Status::Error<ROWSET_BINLOG_NOT_ONLY_ONE_VERSION>(
130
0
                "binlog key is not supported for cumulative rowset. rowset id:{}", rowset_key);
131
0
    }
132
2
    auto version = rowset_meta_pb.start_version();
133
2
    std::string binlog_meta_key = make_binlog_meta_key(tablet_uid, version, rowset_id);
134
2
    std::string binlog_data_key = make_binlog_data_key(tablet_uid, version, rowset_id);
135
2
    BinlogMetaEntryPB binlog_meta_entry_pb;
136
2
    binlog_meta_entry_pb.set_version(version);
137
2
    binlog_meta_entry_pb.set_tablet_id(rowset_meta_pb.tablet_id());
138
2
    binlog_meta_entry_pb.set_rowset_id(rowset_meta_pb.rowset_id());
139
2
    binlog_meta_entry_pb.set_num_segments(rowset_meta_pb.num_segments());
140
2
    binlog_meta_entry_pb.set_creation_time(rowset_meta_pb.creation_time());
141
2
    binlog_meta_entry_pb.set_rowset_id_v2(rowset_meta_pb.rowset_id_v2());
142
2
    std::string binlog_meta_value;
143
2
    if (!binlog_meta_entry_pb.SerializeToString(&binlog_meta_value)) {
144
0
        return Status::Error<SERIALIZE_PROTOBUF_ERROR>("serialize binlog pb failed. rowset id:{}",
145
0
                                                       binlog_meta_key);
146
0
    }
147
148
    // create batch entries
149
2
    std::vector<OlapMeta::BatchEntry> entries = {
150
2
            {std::cref(rowset_key), std::cref(rowset_value)},
151
2
            {std::cref(binlog_meta_key), std::cref(binlog_meta_value)},
152
2
            {std::cref(binlog_data_key), std::cref(rowset_value)}};
153
154
2
    return meta->put(META_COLUMN_FAMILY_INDEX, entries);
155
2
}
156
157
std::vector<std::string> RowsetMetaManager::get_binlog_filenames(OlapMeta* meta,
158
                                                                 TabletUid tablet_uid,
159
                                                                 std::string_view binlog_version,
160
0
                                                                 int64_t segment_idx) {
161
0
    auto prefix_key = make_binlog_filename_key(tablet_uid, binlog_version);
162
0
    VLOG_DEBUG << fmt::format("prefix_key:{}", prefix_key);
163
164
0
    std::vector<std::string> binlog_files;
165
0
    std::string rowset_id;
166
0
    int64_t num_segments = -1;
167
0
    auto traverse_func = [&rowset_id, &num_segments](std::string_view key,
168
0
                                                     std::string_view value) -> bool {
169
0
        VLOG_DEBUG << fmt::format("key:{}, value:{}", key, value);
170
        // key is 'binlog_meta_6943f1585fe834b5-e542c2b83a21d0b7_00000000000000000069_020000000000000135449d7cd7eadfe672aa0f928fa99593', extract last part '020000000000000135449d7cd7eadfe672aa0f928fa99593'
171
        // check starts with "binlog_meta_"
172
0
        if (!starts_with_binlog_meta(key)) {
173
0
            LOG(WARNING) << fmt::format("invalid binlog meta key:{}", key);
174
0
            return false;
175
0
        }
176
0
        if (auto pos = key.rfind('_'); pos == std::string::npos) {
177
0
            LOG(WARNING) << fmt::format("invalid binlog meta key:{}", key);
178
0
            return false;
179
0
        } else {
180
0
            rowset_id = key.substr(pos + 1);
181
0
        }
182
183
0
        BinlogMetaEntryPB binlog_meta_entry_pb;
184
0
        if (!binlog_meta_entry_pb.ParseFromArray(value.data(), cast_set<int32_t>(value.size()))) {
185
0
            LOG(WARNING) << fmt::format("invalid binlog meta value:{}", value);
186
0
            return false;
187
0
        }
188
0
        num_segments = binlog_meta_entry_pb.num_segments();
189
190
0
        return false;
191
0
    };
192
193
    // get binlog meta by prefix
194
0
    Status status = meta->iterate(META_COLUMN_FAMILY_INDEX, prefix_key, traverse_func);
195
0
    if (!status.ok() || rowset_id.empty() || num_segments < 0) {
196
0
        LOG(WARNING) << fmt::format(
197
0
                "fail to get binlog filename. tablet uid:{}, binlog version:{}, status:{}, "
198
0
                "rowset_id:{}, num_segments:{}",
199
0
                tablet_uid.to_string(), binlog_version, status.to_string(), rowset_id,
200
0
                num_segments);
201
0
    }
202
203
    // construct binlog_files list
204
0
    if (segment_idx >= num_segments) {
205
0
        LOG(WARNING) << fmt::format("invalid segment idx:{}, num_segments:{}", segment_idx,
206
0
                                    num_segments);
207
0
        return binlog_files;
208
0
    }
209
0
    for (int64_t i = 0; i < num_segments; ++i) {
210
        // TODO(Drogon): Update to filesystem path
211
0
        auto segment_file = fmt::format("{}_{}.dat", rowset_id, i);
212
0
        binlog_files.emplace_back(std::move(segment_file));
213
0
    }
214
0
    return binlog_files;
215
0
}
216
217
std::pair<std::string, int64_t> RowsetMetaManager::get_binlog_info(
218
0
        OlapMeta* meta, TabletUid tablet_uid, std::string_view binlog_version) {
219
0
    VLOG_DEBUG << fmt::format("tablet_uid:{}, binlog_version:{}", tablet_uid.to_string(),
220
0
                              binlog_version);
221
0
    auto prefix_key = make_binlog_filename_key(tablet_uid, binlog_version);
222
0
    VLOG_DEBUG << fmt::format("prefix_key:{}", prefix_key);
223
224
0
    std::string rowset_id;
225
0
    int64_t num_segments = -1;
226
0
    auto traverse_func = [&rowset_id, &num_segments](std::string_view key,
227
0
                                                     std::string_view value) -> bool {
228
0
        VLOG_DEBUG << fmt::format("key:{}, value:{}", key, value);
229
        // key is 'binlog_meta_6943f1585fe834b5-e542c2b83a21d0b7_00000000000000000069_020000000000000135449d7cd7eadfe672aa0f928fa99593', extract last part '020000000000000135449d7cd7eadfe672aa0f928fa99593'
230
0
        auto pos = key.rfind('_');
231
0
        if (pos == std::string::npos) {
232
0
            LOG(WARNING) << fmt::format("invalid binlog meta key:{}", key);
233
0
            return false;
234
0
        }
235
0
        rowset_id = key.substr(pos + 1);
236
237
0
        BinlogMetaEntryPB binlog_meta_entry_pb;
238
0
        binlog_meta_entry_pb.ParseFromArray(value.data(), cast_set<int32_t>(value.size()));
239
0
        num_segments = binlog_meta_entry_pb.num_segments();
240
241
0
        return false;
242
0
    };
243
244
    // get binlog meta by prefix
245
0
    Status status = meta->iterate(META_COLUMN_FAMILY_INDEX, prefix_key, traverse_func);
246
0
    if (!status.ok() || rowset_id.empty() || num_segments < 0) {
247
0
        LOG(WARNING) << fmt::format(
248
0
                "fail to get binlog filename. tablet uid:{}, binlog version:{}, status:{}, "
249
0
                "rowset_id:{}, num_segments:{}",
250
0
                tablet_uid.to_string(), binlog_version, status.to_string(), rowset_id,
251
0
                num_segments);
252
0
    }
253
254
0
    return std::make_pair(rowset_id, num_segments);
255
0
}
256
257
std::string RowsetMetaManager::get_rowset_binlog_meta(OlapMeta* meta, TabletUid tablet_uid,
258
                                                      std::string_view binlog_version,
259
0
                                                      std::string_view rowset_id) {
260
0
    auto binlog_data_key = make_binlog_data_key(tablet_uid.to_string(), binlog_version, rowset_id);
261
0
    VLOG_DEBUG << fmt::format("get binlog_meta_key:{}", binlog_data_key);
262
263
0
    std::string binlog_meta_value;
264
0
    Status status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_meta_value);
265
0
    if (!status.ok()) {
266
0
        LOG(WARNING) << fmt::format(
267
0
                "fail to get binlog meta. tablet uid:{}, binlog version:{}, "
268
0
                "rowset_id:{}, status:{}",
269
0
                tablet_uid.to_string(), binlog_version, rowset_id, status.to_string());
270
0
        return "";
271
0
    }
272
0
    return binlog_meta_value;
273
0
}
274
275
Status RowsetMetaManager::get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid,
276
                                                  const std::vector<int64_t>& binlog_versions,
277
0
                                                  RowsetBinlogMetasPB* metas_pb) {
278
0
    if (binlog_versions.empty()) {
279
0
        return _get_all_rowset_binlog_metas(meta, tablet_uid, metas_pb);
280
0
    } else {
281
0
        return _get_rowset_binlog_metas(meta, tablet_uid, binlog_versions, metas_pb);
282
0
    }
283
0
}
284
285
Status RowsetMetaManager::_get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid,
286
                                                   const std::vector<int64_t>& binlog_versions,
287
0
                                                   RowsetBinlogMetasPB* metas_pb) {
288
0
    Status status;
289
0
    auto tablet_uid_str = tablet_uid.to_string();
290
0
    auto traverse_func = [meta, metas_pb, &status, &tablet_uid_str](
291
0
                                 std::string_view key, std::string_view value) -> bool {
292
0
        VLOG_DEBUG << fmt::format("key:{}, value:{}", key, value);
293
0
        if (!starts_with_binlog_meta(key)) {
294
0
            auto err_msg = fmt::format("invalid binlog meta key:{}", key);
295
0
            status = Status::InternalError(err_msg);
296
0
            LOG(WARNING) << err_msg;
297
0
            return false;
298
0
        }
299
300
0
        BinlogMetaEntryPB binlog_meta_entry_pb;
301
0
        if (!binlog_meta_entry_pb.ParseFromArray(value.data(), cast_set<int32_t>(value.size()))) {
302
0
            auto err_msg = fmt::format("fail to parse binlog meta value:{}", value);
303
0
            status = Status::InternalError(err_msg);
304
0
            LOG(WARNING) << err_msg;
305
0
            return false;
306
0
        }
307
0
        auto& rowset_id = binlog_meta_entry_pb.rowset_id_v2();
308
309
0
        auto binlog_meta_pb = metas_pb->add_rowset_binlog_metas();
310
0
        binlog_meta_pb->set_rowset_id(rowset_id);
311
0
        binlog_meta_pb->set_version(binlog_meta_entry_pb.version());
312
0
        binlog_meta_pb->set_num_segments(binlog_meta_entry_pb.num_segments());
313
0
        binlog_meta_pb->set_meta_key(std::string {key});
314
0
        binlog_meta_pb->set_meta(std::string {value});
315
316
0
        auto binlog_data_key =
317
0
                make_binlog_data_key(tablet_uid_str, binlog_meta_entry_pb.version(), rowset_id);
318
0
        std::string binlog_data;
319
0
        status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_data);
320
0
        if (!status.ok()) {
321
0
            LOG(WARNING) << status.to_string();
322
0
            return false;
323
0
        }
324
0
        binlog_meta_pb->set_data_key(binlog_data_key);
325
0
        binlog_meta_pb->set_data(binlog_data);
326
327
0
        return false;
328
0
    };
329
330
0
    for (auto& binlog_version : binlog_versions) {
331
0
        auto prefix_key = make_binlog_meta_key_prefix(tablet_uid, binlog_version);
332
0
        Status iterStatus = meta->iterate(META_COLUMN_FAMILY_INDEX, prefix_key, traverse_func);
333
0
        if (!iterStatus.ok()) {
334
0
            LOG(WARNING) << fmt::format("fail to iterate binlog meta. prefix_key:{}, status:{}",
335
0
                                        prefix_key, iterStatus.to_string());
336
0
            return iterStatus;
337
0
        }
338
0
        if (!status.ok()) {
339
0
            return status;
340
0
        }
341
0
    }
342
0
    return status;
343
0
}
344
345
Status RowsetMetaManager::get_rowset_binlog_metas(OlapMeta* meta, TabletUid tablet_uid,
346
4
                                                  Version version, RowsetBinlogMetasPB* metas_pb) {
347
4
    Status status;
348
4
    auto tablet_uid_str = tablet_uid.to_string();
349
4
    auto prefix_key = make_binlog_meta_key_prefix(tablet_uid);
350
4
    auto begin_key = make_binlog_meta_key_prefix(tablet_uid, version.first);
351
4
    auto end_key = make_binlog_meta_key_prefix(tablet_uid, version.second + 1);
352
4
    auto traverse_func = [meta, metas_pb, &status, &tablet_uid_str, &end_key](
353
4
                                 std::string_view key, std::string_view value) -> bool {
354
0
        VLOG_DEBUG << fmt::format("get rowset binlog metas, key={}, value={}", key, value);
355
0
        if (key.compare(end_key) > 0) { // the binlog meta key is binary comparable.
356
            // All binlog meta has been scanned
357
0
            return false;
358
0
        }
359
360
0
        if (!starts_with_binlog_meta(key)) {
361
0
            auto err_msg = fmt::format("invalid binlog meta key:{}", key);
362
0
            status = Status::InternalError(err_msg);
363
0
            LOG(WARNING) << err_msg;
364
0
            return false;
365
0
        }
366
367
0
        BinlogMetaEntryPB binlog_meta_entry_pb;
368
0
        if (!binlog_meta_entry_pb.ParseFromArray(value.data(), cast_set<int32_t>(value.size()))) {
369
0
            auto err_msg = fmt::format("fail to parse binlog meta value:{}", value);
370
0
            status = Status::InternalError(err_msg);
371
0
            LOG(WARNING) << err_msg;
372
0
            return false;
373
0
        }
374
375
0
        const auto& rowset_id = binlog_meta_entry_pb.rowset_id_v2();
376
0
        auto* binlog_meta_pb = metas_pb->add_rowset_binlog_metas();
377
0
        binlog_meta_pb->set_rowset_id(rowset_id);
378
0
        binlog_meta_pb->set_version(binlog_meta_entry_pb.version());
379
0
        binlog_meta_pb->set_num_segments(binlog_meta_entry_pb.num_segments());
380
0
        binlog_meta_pb->set_meta_key(std::string {key});
381
0
        binlog_meta_pb->set_meta(std::string {value});
382
383
0
        auto binlog_data_key =
384
0
                make_binlog_data_key(tablet_uid_str, binlog_meta_entry_pb.version(), rowset_id);
385
0
        std::string binlog_data;
386
0
        status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_data);
387
0
        if (!status.ok()) {
388
0
            LOG(WARNING) << status.to_string();
389
0
            return false;
390
0
        }
391
0
        binlog_meta_pb->set_data_key(binlog_data_key);
392
0
        binlog_meta_pb->set_data(binlog_data);
393
394
0
        return true;
395
0
    };
396
397
4
    Status iterStatus =
398
4
            meta->iterate(META_COLUMN_FAMILY_INDEX, begin_key, prefix_key, traverse_func);
399
4
    if (!iterStatus.ok()) {
400
0
        LOG(WARNING) << fmt::format(
401
0
                "fail to iterate binlog meta. prefix_key:{}, version:{}, status:{}", prefix_key,
402
0
                version.to_string(), iterStatus.to_string());
403
0
        return iterStatus;
404
0
    }
405
4
    return status;
406
4
}
407
408
Status RowsetMetaManager::_get_all_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid,
409
0
                                                       RowsetBinlogMetasPB* metas_pb) {
410
0
    Status status;
411
0
    auto tablet_uid_str = tablet_uid.to_string();
412
0
    int64_t tablet_id = 0;
413
0
    auto traverse_func = [meta, metas_pb, &status, &tablet_uid_str, &tablet_id](
414
0
                                 std::string_view key, std::string_view value) -> bool {
415
0
        VLOG_DEBUG << fmt::format("key:{}, value:{}", key, value);
416
0
        if (!starts_with_binlog_meta(key)) {
417
0
            LOG(INFO) << fmt::format("end scan binlog meta. key:{}", key);
418
0
            return false;
419
0
        }
420
421
0
        BinlogMetaEntryPB binlog_meta_entry_pb;
422
0
        if (!binlog_meta_entry_pb.ParseFromArray(value.data(), cast_set<int32_t>(value.size()))) {
423
0
            auto err_msg = fmt::format("fail to parse binlog meta value:{}", value);
424
0
            status = Status::InternalError(err_msg);
425
0
            LOG(WARNING) << err_msg;
426
0
            return false;
427
0
        }
428
0
        if (tablet_id == 0) {
429
0
            tablet_id = binlog_meta_entry_pb.tablet_id();
430
0
        } else if (tablet_id != binlog_meta_entry_pb.tablet_id()) {
431
            // scan all binlog meta, so tablet_id should be same:
432
0
            return false;
433
0
        }
434
0
        auto& rowset_id = binlog_meta_entry_pb.rowset_id_v2();
435
436
0
        auto binlog_meta_pb = metas_pb->add_rowset_binlog_metas();
437
0
        binlog_meta_pb->set_rowset_id(rowset_id);
438
0
        binlog_meta_pb->set_version(binlog_meta_entry_pb.version());
439
0
        binlog_meta_pb->set_num_segments(binlog_meta_entry_pb.num_segments());
440
0
        binlog_meta_pb->set_meta_key(std::string {key});
441
0
        binlog_meta_pb->set_meta(std::string {value});
442
443
0
        auto binlog_data_key =
444
0
                make_binlog_data_key(tablet_uid_str, binlog_meta_entry_pb.version(), rowset_id);
445
0
        std::string binlog_data;
446
0
        status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_data);
447
0
        if (!status.ok()) {
448
0
            LOG(WARNING) << status;
449
0
            return false;
450
0
        }
451
0
        binlog_meta_pb->set_data_key(binlog_data_key);
452
0
        binlog_meta_pb->set_data(binlog_data);
453
454
0
        return true;
455
0
    };
456
457
0
    auto prefix_key = make_binlog_meta_key_prefix(tablet_uid);
458
0
    Status iterStatus = meta->iterate(META_COLUMN_FAMILY_INDEX, prefix_key, traverse_func);
459
0
    if (!iterStatus.ok()) {
460
0
        LOG(WARNING) << fmt::format("fail to iterate binlog meta. prefix_key:{}, status:{}",
461
0
                                    prefix_key, iterStatus.to_string());
462
0
        return iterStatus;
463
0
    }
464
0
    return status;
465
0
}
466
467
7
Status RowsetMetaManager::remove(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id) {
468
7
    std::string key = ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string();
469
7
    VLOG_NOTICE << "start to remove rowset, key:" << key;
470
7
    Status status = meta->remove(META_COLUMN_FAMILY_INDEX, key);
471
7
    VLOG_NOTICE << "remove rowset key:" << key << " finished";
472
7
    return status;
473
7
}
474
475
0
Status RowsetMetaManager::remove_binlog(OlapMeta* meta, const std::string& suffix) {
476
    // Please do not remove std::vector<std::string>, more info refer to pr#23190
477
0
    return meta->remove(META_COLUMN_FAMILY_INDEX,
478
0
                        std::vector<std::string> {kBinlogMetaPrefix.data() + suffix,
479
0
                                                  kBinlogDataPrefix.data() + suffix});
480
0
}
481
482
Status RowsetMetaManager::ingest_binlog_metas(OlapMeta* meta, TabletUid tablet_uid,
483
0
                                              RowsetBinlogMetasPB* metas_pb) {
484
0
    std::vector<OlapMeta::BatchEntry> entries;
485
0
    const auto tablet_uid_str = tablet_uid.to_string();
486
487
0
    for (auto& rowset_binlog_meta : *metas_pb->mutable_rowset_binlog_metas()) {
488
0
        auto& rowset_id = rowset_binlog_meta.rowset_id();
489
0
        auto version = rowset_binlog_meta.version();
490
491
0
        auto meta_key = rowset_binlog_meta.mutable_meta_key();
492
0
        *meta_key = make_binlog_meta_key(tablet_uid_str, version, rowset_id);
493
0
        auto data_key = rowset_binlog_meta.mutable_data_key();
494
0
        *data_key = make_binlog_data_key(tablet_uid_str, version, rowset_id);
495
496
0
        entries.emplace_back(*meta_key, rowset_binlog_meta.meta());
497
0
        entries.emplace_back(*data_key, rowset_binlog_meta.data());
498
0
    }
499
500
0
    return meta->put(META_COLUMN_FAMILY_INDEX, entries);
501
0
}
502
503
Status RowsetMetaManager::traverse_rowset_metas(
504
        OlapMeta* meta,
505
45
        std::function<bool(const TabletUid&, const RowsetId&, std::string_view)> const& func) {
506
45
    auto traverse_rowset_meta_func = [&func](std::string_view key, std::string_view value) -> bool {
507
0
        std::vector<std::string> parts;
508
        // key format: rst_uuid_rowset_id
509
0
        RETURN_IF_ERROR(split_string(key, '_', &parts));
510
0
        if (parts.size() != 3) {
511
0
            LOG(WARNING) << "invalid rowset key:" << key << ", splitted size:" << parts.size();
512
0
            return true;
513
0
        }
514
0
        RowsetId rowset_id;
515
0
        rowset_id.init(parts[2]);
516
0
        std::vector<std::string> uid_parts;
517
0
        RETURN_IF_ERROR(split_string(parts[1], '-', &uid_parts));
518
0
        TabletUid tablet_uid(uid_parts[0], uid_parts[1]);
519
0
        return func(tablet_uid, rowset_id, value);
520
0
    };
521
45
    Status status =
522
45
            meta->iterate(META_COLUMN_FAMILY_INDEX, ROWSET_PREFIX, traverse_rowset_meta_func);
523
45
    return status;
524
45
}
525
526
Status RowsetMetaManager::traverse_binlog_metas(
527
        OlapMeta* meta,
528
0
        std::function<bool(std::string_view, std::string_view, bool)> const& collector) {
529
0
    std::pair<std::string, bool> last_info = std::make_pair(kBinlogMetaPrefix.data(), false);
530
0
    bool seek_found = false;
531
0
    Status status;
532
0
    auto traverse_binlog_meta_func = [&last_info, &seek_found, &collector](
533
0
                                             std::string_view key, std::string_view value) -> bool {
534
0
        seek_found = true;
535
0
        auto& [last_prefix, need_collect] = last_info;
536
0
        size_t pos = key.find('_', kBinlogMetaPrefix.size());
537
0
        if (pos == std::string::npos) {
538
0
            LOG(WARNING) << "invalid binlog meta key: " << key;
539
0
            return true;
540
0
        }
541
0
        std::string_view key_view(key.data(), pos);
542
0
        std::string_view last_prefix_view(last_prefix.data(), last_prefix.size() - 1);
543
544
0
        if (last_prefix_view != key_view) {
545
0
            need_collect = collector(key, value, true);
546
0
            last_prefix = std::string(key_view) + "~";
547
0
        } else if (need_collect) {
548
0
            collector(key, value, false);
549
0
        }
550
551
0
        return need_collect;
552
0
    };
553
554
0
    do {
555
0
        seek_found = false;
556
0
        status = meta->iterate(META_COLUMN_FAMILY_INDEX, last_info.first, kBinlogMetaPrefix.data(),
557
0
                               traverse_binlog_meta_func);
558
0
    } while (status.ok() && seek_found);
559
560
0
    return status;
561
0
}
562
563
Status RowsetMetaManager::save_partial_update_info(
564
        OlapMeta* meta, int64_t tablet_id, int64_t partition_id, int64_t txn_id,
565
0
        const PartialUpdateInfoPB& partial_update_info_pb) {
566
0
    std::string key =
567
0
            fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, partition_id, txn_id);
568
0
    std::string value;
569
0
    if (!partial_update_info_pb.SerializeToString(&value)) {
570
0
        return Status::Error<SERIALIZE_PROTOBUF_ERROR>(
571
0
                "serialize partial update info failed. key={}", key);
572
0
    }
573
0
    VLOG_NOTICE << "save partial update info, key=" << key << ", value_size=" << value.size();
574
0
    return meta->put(META_COLUMN_FAMILY_INDEX, key, value);
575
0
}
576
577
Status RowsetMetaManager::try_get_partial_update_info(OlapMeta* meta, int64_t tablet_id,
578
                                                      int64_t partition_id, int64_t txn_id,
579
0
                                                      PartialUpdateInfoPB* partial_update_info_pb) {
580
0
    std::string key =
581
0
            fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, partition_id, txn_id);
582
0
    std::string value;
583
0
    Status status = meta->get(META_COLUMN_FAMILY_INDEX, key, &value);
584
0
    if (status.is<META_KEY_NOT_FOUND>()) {
585
0
        return status;
586
0
    }
587
0
    if (!status.ok()) {
588
0
        LOG_WARNING("failed to get partial update info. tablet_id={}, partition_id={}, txn_id={}",
589
0
                    tablet_id, partition_id, txn_id);
590
0
        return status;
591
0
    }
592
0
    if (!partial_update_info_pb->ParseFromString(value)) {
593
0
        return Status::Error<ErrorCode::PARSE_PROTOBUF_ERROR>(
594
0
                "fail to parse partial update info content to protobuf object. tablet_id={}, "
595
0
                "partition_id={}, txn_id={}",
596
0
                tablet_id, partition_id, txn_id);
597
0
    }
598
0
    return Status::OK();
599
0
}
600
601
Status RowsetMetaManager::traverse_partial_update_info(
602
        OlapMeta* meta,
603
0
        std::function<bool(int64_t, int64_t, int64_t, std::string_view)> const& func) {
604
0
    auto traverse_partial_update_info_func = [&func](std::string_view key,
605
0
                                                     std::string_view value) -> bool {
606
0
        std::vector<std::string> parts;
607
        // key format: pui_{tablet_id}_{partition_id}_{txn_id}
608
0
        RETURN_IF_ERROR(split_string(key, '_', &parts));
609
0
        if (parts.size() != 4) {
610
0
            LOG_WARNING("invalid rowset key={}, splitted size={}", key, parts.size());
611
0
            return true;
612
0
        }
613
0
        int64_t tablet_id = std::stoll(parts[1]);
614
0
        int64_t partition_id = std::stoll(parts[2]);
615
0
        int64_t txn_id = std::stoll(parts[3]);
616
0
        return func(tablet_id, partition_id, txn_id, value);
617
0
    };
618
0
    return meta->iterate(META_COLUMN_FAMILY_INDEX, PARTIAL_UPDATE_INFO_PREFIX,
619
0
                         traverse_partial_update_info_func);
620
0
}
621
622
Status RowsetMetaManager::remove_partial_update_info(OlapMeta* meta, int64_t tablet_id,
623
0
                                                     int64_t partition_id, int64_t txn_id) {
624
0
    std::string key =
625
0
            fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, partition_id, txn_id);
626
0
    Status res = meta->remove(META_COLUMN_FAMILY_INDEX, key);
627
0
    VLOG_NOTICE << "remove partial update info, key=" << key;
628
0
    return res;
629
0
}
630
631
Status RowsetMetaManager::remove_partial_update_infos(
632
0
        OlapMeta* meta, const std::vector<std::tuple<int64_t, int64_t, int64_t>>& keys) {
633
0
    std::vector<std::string> remove_keys;
634
0
    for (auto [tablet_id, partition_id, txn_id] : keys) {
635
0
        remove_keys.push_back(fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id,
636
0
                                          partition_id, txn_id));
637
0
    }
638
0
    Status res = meta->remove(META_COLUMN_FAMILY_INDEX, remove_keys);
639
0
    VLOG_NOTICE << "remove partial update info, remove_keys.size()=" << remove_keys.size();
640
0
    return res;
641
0
}
642
643
Status RowsetMetaManager::remove_tablet_related_partial_update_info(OlapMeta* meta,
644
0
                                                                    int64_t tablet_id) {
645
0
    std::string prefix = fmt::format("{}{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id);
646
0
    std::vector<std::string> remove_keys;
647
0
    auto get_remove_keys_func = [&](std::string_view key, std::string_view val) -> bool {
648
0
        remove_keys.emplace_back(key);
649
0
        return true;
650
0
    };
651
0
    VLOG_NOTICE << "remove tablet related partial update info, tablet_id: " << tablet_id
652
0
                << " removed keys size: " << remove_keys.size();
653
0
    RETURN_IF_ERROR(meta->iterate(META_COLUMN_FAMILY_INDEX, prefix, get_remove_keys_func));
654
0
    return meta->remove(META_COLUMN_FAMILY_INDEX, remove_keys);
655
0
}
656
#include "common/compile_check_end.h"
657
} // namespace doris