Coverage Report

Created: 2026-05-12 22:35

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/rowset/rowset_meta_manager.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/rowset/rowset_meta_manager.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/olap_file.pb.h>
22
23
#include <boost/algorithm/string/trim.hpp>
24
#include <fstream>
25
#include <functional>
26
#include <memory>
27
#include <new>
28
#include <string>
29
#include <string_view>
30
#include <vector>
31
32
#include "common/logging.h"
33
#include "storage/binlog.h"
34
#include "storage/olap_define.h"
35
#include "storage/olap_meta.h"
36
#include "storage/utils.h"
37
#include "util/debug_points.h"
38
39
namespace doris {
40
using namespace ErrorCode;
41
42
bool RowsetMetaManager::check_rowset_meta(OlapMeta* meta, TabletUid tablet_uid,
43
0
                                          const RowsetId& rowset_id) {
44
0
    std::string key = ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string();
45
0
    std::string value;
46
0
    return meta->key_may_exist(META_COLUMN_FAMILY_INDEX, key, &value);
47
0
}
48
49
41
Status RowsetMetaManager::exists(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id) {
50
41
    std::string key = ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string();
51
41
    std::string value;
52
41
    return meta->get(META_COLUMN_FAMILY_INDEX, key, &value);
53
41
}
54
55
Status RowsetMetaManager::get_rowset_meta(OlapMeta* meta, TabletUid tablet_uid,
56
                                          const RowsetId& rowset_id,
57
12
                                          RowsetMetaSharedPtr rowset_meta) {
58
12
    std::string key = ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string();
59
12
    std::string value;
60
12
    Status s = meta->get(META_COLUMN_FAMILY_INDEX, key, &value);
61
12
    if (s.is<META_KEY_NOT_FOUND>()) {
62
3
        return Status::Error<META_KEY_NOT_FOUND>("rowset id: {} not found.", key);
63
9
    } else if (!s.ok()) {
64
0
        return Status::Error<IO_ERROR>("load rowset id: {} failed.", key);
65
0
    }
66
9
    bool ret = rowset_meta->init(value);
67
9
    if (!ret) {
68
0
        return Status::Error<SERIALIZE_PROTOBUF_ERROR>("parse rowset meta failed. rowset id: {}",
69
0
                                                       key);
70
0
    }
71
9
    return Status::OK();
72
9
}
73
74
Status RowsetMetaManager::save(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id,
75
                               const RowsetMetaPB& rowset_meta_pb,
76
                               std::optional<BinlogFormatPB> binlog_format,
77
77
                               const std::map<RowsetId, RowsetMetaPB>* attach_rowset_map) {
78
77
    if (rowset_meta_pb.partition_id() <= 0) {
79
22
        LOG(WARNING) << "invalid partition id " << rowset_meta_pb.partition_id() << " tablet "
80
22
                     << rowset_meta_pb.tablet_id();
81
        // TODO(dx): after fix partition id eq 0 bug, fix it
82
        // return Status::InternalError("invaid partition id {} tablet {}",
83
        //  rowset_meta_pb.partition_id(), rowset_meta_pb.tablet_id());
84
22
    }
85
77
    DBUG_EXECUTE_IF("RowsetMetaManager::save::zero_partition_id", {
86
77
        long partition_id = rowset_meta_pb.partition_id();
87
77
        auto& rs_pb = const_cast<std::decay_t<decltype(rowset_meta_pb)>&>(rowset_meta_pb);
88
77
        rs_pb.set_partition_id(0);
89
77
        LOG(WARNING) << "set debug point RowsetMetaManager::save::zero_partition_id old="
90
77
                     << partition_id << " new=" << rowset_meta_pb.DebugString();
91
77
    });
92
77
    if (!binlog_format.has_value()) {
93
69
        return _save(meta, tablet_uid, rowset_id, rowset_meta_pb);
94
69
    }
95
8
    if (*binlog_format == BinlogFormatPB::STATEMENT_AND_SNAPSHOT) {
96
2
        return _save_with_ccr_binlog(meta, tablet_uid, rowset_id, rowset_meta_pb);
97
2
    }
98
8
    DCHECK_EQ(*binlog_format, BinlogFormatPB::ROW);
99
6
    DCHECK(attach_rowset_map != nullptr);
100
6
    DCHECK(!attach_rowset_map->empty());
101
6
    return _save_with_row_binlog(meta, tablet_uid, rowset_id, rowset_meta_pb, *attach_rowset_map);
102
8
}
103
104
Status RowsetMetaManager::_save(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id,
105
69
                                const RowsetMetaPB& rowset_meta_pb) {
106
69
    std::string key =
107
69
            fmt::format("{}{}_{}", ROWSET_PREFIX, tablet_uid.to_string(), rowset_id.to_string());
108
69
    std::string value;
109
69
    if (!rowset_meta_pb.SerializeToString(&value)) {
110
0
        return Status::Error<SERIALIZE_PROTOBUF_ERROR>("serialize rowset pb failed. rowset id:{}",
111
0
                                                       key);
112
0
    }
113
114
69
    return meta->put(META_COLUMN_FAMILY_INDEX, key, value);
115
69
}
116
117
Status RowsetMetaManager::_save_with_ccr_binlog(OlapMeta* meta, TabletUid tablet_uid,
118
                                                const RowsetId& rowset_id,
119
2
                                                const RowsetMetaPB& rowset_meta_pb) {
120
    // create rowset write data
121
2
    std::string rowset_key =
122
2
            fmt::format("{}{}_{}", ROWSET_PREFIX, tablet_uid.to_string(), rowset_id.to_string());
123
2
    std::string rowset_value;
124
2
    if (!rowset_meta_pb.SerializeToString(&rowset_value)) {
125
0
        return Status::Error<SERIALIZE_PROTOBUF_ERROR>("serialize rowset pb failed. rowset id:{}",
126
0
                                                       rowset_key);
127
0
    }
128
129
    // create binlog write data
130
    // binlog_meta_key format: {kBinlogPrefix}meta_{tablet_uid}_{version}_{rowset_id}
131
    // binlog_data_key format: {kBinlogPrefix}data_{tablet_uid}_{version}_{rowset_id}
132
    // version is formatted to 20 bytes to avoid the problem of sorting, version is lower, timestamp is lower
133
    // binlog key is not supported for cumulative rowset
134
2
    if (rowset_meta_pb.start_version() != rowset_meta_pb.end_version()) {
135
0
        return Status::Error<ROWSET_BINLOG_NOT_ONLY_ONE_VERSION>(
136
0
                "binlog key is not supported for cumulative rowset. rowset id:{}", rowset_key);
137
0
    }
138
2
    auto version = rowset_meta_pb.start_version();
139
2
    std::string binlog_meta_key = make_binlog_meta_key(tablet_uid, version, rowset_id);
140
2
    std::string binlog_data_key = make_binlog_data_key(tablet_uid, version, rowset_id);
141
2
    BinlogMetaEntryPB binlog_meta_entry_pb;
142
2
    binlog_meta_entry_pb.set_version(version);
143
2
    binlog_meta_entry_pb.set_tablet_id(rowset_meta_pb.tablet_id());
144
2
    binlog_meta_entry_pb.set_rowset_id(rowset_meta_pb.rowset_id());
145
2
    binlog_meta_entry_pb.set_num_segments(rowset_meta_pb.num_segments());
146
2
    binlog_meta_entry_pb.set_creation_time(rowset_meta_pb.creation_time());
147
2
    binlog_meta_entry_pb.set_rowset_id_v2(rowset_meta_pb.rowset_id_v2());
148
2
    std::string binlog_meta_value;
149
2
    if (!binlog_meta_entry_pb.SerializeToString(&binlog_meta_value)) {
150
0
        return Status::Error<SERIALIZE_PROTOBUF_ERROR>("serialize binlog pb failed. rowset id:{}",
151
0
                                                       binlog_meta_key);
152
0
    }
153
154
    // create batch entries
155
2
    std::vector<OlapMeta::BatchEntry> entries = {
156
2
            {std::cref(rowset_key), std::cref(rowset_value)},
157
2
            {std::cref(binlog_meta_key), std::cref(binlog_meta_value)},
158
2
            {std::cref(binlog_data_key), std::cref(rowset_value)}};
159
160
2
    return meta->put(META_COLUMN_FAMILY_INDEX, entries);
161
2
}
162
163
Status RowsetMetaManager::_save_with_row_binlog(
164
        OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id,
165
        const RowsetMetaPB& rowset_meta_pb,
166
6
        const std::map<RowsetId, RowsetMetaPB>& attach_rowset_map) {
167
6
    std::vector<std::string> keys;
168
6
    std::vector<std::string> values;
169
6
    keys.reserve(attach_rowset_map.size() + 1);
170
6
    values.reserve(attach_rowset_map.size() + 1);
171
172
6
    keys.emplace_back(
173
6
            fmt::format("{}{}_{}", ROWSET_PREFIX, tablet_uid.to_string(), rowset_id.to_string()));
174
6
    if (!rowset_meta_pb.SerializeToString(&values.emplace_back())) {
175
0
        return Status::Error<SERIALIZE_PROTOBUF_ERROR>("serialize rowset pb failed. rowset id:{}",
176
0
                                                       keys[0]);
177
0
    }
178
179
6
    for (const auto& [row_binlog_rs_id, row_binlog_rs_meta_pb] : attach_rowset_map) {
180
6
        keys.emplace_back(make_row_binlog_key(tablet_uid, rowset_id, row_binlog_rs_id));
181
6
        DCHECK(row_binlog_rs_meta_pb.has_is_row_binlog() && row_binlog_rs_meta_pb.is_row_binlog())
182
0
                << row_binlog_rs_meta_pb.ShortDebugString();
183
6
        if (!row_binlog_rs_meta_pb.SerializeToString(&values.emplace_back())) {
184
0
            return Status::Error<SERIALIZE_PROTOBUF_ERROR>(
185
0
                    "serialize rowset pb failed. rowset id:{}", keys.back());
186
0
        }
187
6
    }
188
189
6
    std::vector<OlapMeta::BatchEntry> entries;
190
6
    entries.reserve(keys.size());
191
18
    for (size_t i = 0; i < keys.size(); ++i) {
192
12
        entries.emplace_back(keys[i], values[i]);
193
12
    }
194
6
    return meta->put(META_COLUMN_FAMILY_INDEX, entries);
195
6
}
196
197
std::vector<std::string> RowsetMetaManager::get_binlog_filenames(OlapMeta* meta,
198
                                                                 TabletUid tablet_uid,
199
                                                                 std::string_view binlog_version,
200
0
                                                                 int64_t segment_idx) {
201
0
    auto prefix_key = make_binlog_filename_key(tablet_uid, binlog_version);
202
0
    VLOG_DEBUG << fmt::format("prefix_key:{}", prefix_key);
203
204
0
    std::vector<std::string> binlog_files;
205
0
    std::string rowset_id;
206
0
    int64_t num_segments = -1;
207
0
    auto traverse_func = [&rowset_id, &num_segments](std::string_view key,
208
0
                                                     std::string_view value) -> bool {
209
0
        VLOG_DEBUG << fmt::format("key:{}, value:{}", key, value);
210
        // key is 'binlog_meta_6943f1585fe834b5-e542c2b83a21d0b7_00000000000000000069_020000000000000135449d7cd7eadfe672aa0f928fa99593', extract last part '020000000000000135449d7cd7eadfe672aa0f928fa99593'
211
        // check starts with "binlog_meta_"
212
0
        if (!starts_with_binlog_meta(key)) {
213
0
            LOG(WARNING) << fmt::format("invalid binlog meta key:{}", key);
214
0
            return false;
215
0
        }
216
0
        if (auto pos = key.rfind('_'); pos == std::string::npos) {
217
0
            LOG(WARNING) << fmt::format("invalid binlog meta key:{}", key);
218
0
            return false;
219
0
        } else {
220
0
            rowset_id = key.substr(pos + 1);
221
0
        }
222
223
0
        BinlogMetaEntryPB binlog_meta_entry_pb;
224
0
        if (!binlog_meta_entry_pb.ParseFromArray(value.data(), cast_set<int32_t>(value.size()))) {
225
0
            LOG(WARNING) << fmt::format("invalid binlog meta value:{}", value);
226
0
            return false;
227
0
        }
228
0
        num_segments = binlog_meta_entry_pb.num_segments();
229
230
0
        return false;
231
0
    };
232
233
    // get binlog meta by prefix
234
0
    Status status = meta->iterate(META_COLUMN_FAMILY_INDEX, prefix_key, traverse_func);
235
0
    if (!status.ok() || rowset_id.empty() || num_segments < 0) {
236
0
        LOG(WARNING) << fmt::format(
237
0
                "fail to get binlog filename. tablet uid:{}, binlog version:{}, status:{}, "
238
0
                "rowset_id:{}, num_segments:{}",
239
0
                tablet_uid.to_string(), binlog_version, status.to_string(), rowset_id,
240
0
                num_segments);
241
0
    }
242
243
    // construct binlog_files list
244
0
    if (segment_idx >= num_segments) {
245
0
        LOG(WARNING) << fmt::format("invalid segment idx:{}, num_segments:{}", segment_idx,
246
0
                                    num_segments);
247
0
        return binlog_files;
248
0
    }
249
0
    for (int64_t i = 0; i < num_segments; ++i) {
250
        // TODO(Drogon): Update to filesystem path
251
0
        auto segment_file = fmt::format("{}_{}.dat", rowset_id, i);
252
0
        binlog_files.emplace_back(std::move(segment_file));
253
0
    }
254
0
    return binlog_files;
255
0
}
256
257
std::pair<std::string, int64_t> RowsetMetaManager::get_binlog_info(
258
0
        OlapMeta* meta, TabletUid tablet_uid, std::string_view binlog_version) {
259
0
    VLOG_DEBUG << fmt::format("tablet_uid:{}, binlog_version:{}", tablet_uid.to_string(),
260
0
                              binlog_version);
261
0
    auto prefix_key = make_binlog_filename_key(tablet_uid, binlog_version);
262
0
    VLOG_DEBUG << fmt::format("prefix_key:{}", prefix_key);
263
264
0
    std::string rowset_id;
265
0
    int64_t num_segments = -1;
266
0
    auto traverse_func = [&rowset_id, &num_segments](std::string_view key,
267
0
                                                     std::string_view value) -> bool {
268
0
        VLOG_DEBUG << fmt::format("key:{}, value:{}", key, value);
269
        // key is 'binlog_meta_6943f1585fe834b5-e542c2b83a21d0b7_00000000000000000069_020000000000000135449d7cd7eadfe672aa0f928fa99593', extract last part '020000000000000135449d7cd7eadfe672aa0f928fa99593'
270
0
        auto pos = key.rfind('_');
271
0
        if (pos == std::string::npos) {
272
0
            LOG(WARNING) << fmt::format("invalid binlog meta key:{}", key);
273
0
            return false;
274
0
        }
275
0
        rowset_id = key.substr(pos + 1);
276
277
0
        BinlogMetaEntryPB binlog_meta_entry_pb;
278
0
        binlog_meta_entry_pb.ParseFromArray(value.data(), cast_set<int32_t>(value.size()));
279
0
        num_segments = binlog_meta_entry_pb.num_segments();
280
281
0
        return false;
282
0
    };
283
284
    // get binlog meta by prefix
285
0
    Status status = meta->iterate(META_COLUMN_FAMILY_INDEX, prefix_key, traverse_func);
286
0
    if (!status.ok() || rowset_id.empty() || num_segments < 0) {
287
0
        LOG(WARNING) << fmt::format(
288
0
                "fail to get binlog filename. tablet uid:{}, binlog version:{}, status:{}, "
289
0
                "rowset_id:{}, num_segments:{}",
290
0
                tablet_uid.to_string(), binlog_version, status.to_string(), rowset_id,
291
0
                num_segments);
292
0
    }
293
294
0
    return std::make_pair(rowset_id, num_segments);
295
0
}
296
297
std::string RowsetMetaManager::get_rowset_binlog_meta(OlapMeta* meta, TabletUid tablet_uid,
298
                                                      std::string_view binlog_version,
299
0
                                                      std::string_view rowset_id) {
300
0
    auto binlog_data_key = make_binlog_data_key(tablet_uid.to_string(), binlog_version, rowset_id);
301
0
    VLOG_DEBUG << fmt::format("get binlog_meta_key:{}", binlog_data_key);
302
303
0
    std::string binlog_meta_value;
304
0
    Status status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_meta_value);
305
0
    if (!status.ok()) {
306
0
        LOG(WARNING) << fmt::format(
307
0
                "fail to get binlog meta. tablet uid:{}, binlog version:{}, "
308
0
                "rowset_id:{}, status:{}",
309
0
                tablet_uid.to_string(), binlog_version, rowset_id, status.to_string());
310
0
        return "";
311
0
    }
312
0
    return binlog_meta_value;
313
0
}
314
315
Status RowsetMetaManager::get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid,
316
                                                  const std::vector<int64_t>& binlog_versions,
317
0
                                                  RowsetBinlogMetasPB* metas_pb) {
318
0
    if (binlog_versions.empty()) {
319
0
        return _get_all_rowset_binlog_metas(meta, tablet_uid, metas_pb);
320
0
    } else {
321
0
        return _get_rowset_binlog_metas(meta, tablet_uid, binlog_versions, metas_pb);
322
0
    }
323
0
}
324
325
Status RowsetMetaManager::_get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid,
326
                                                   const std::vector<int64_t>& binlog_versions,
327
0
                                                   RowsetBinlogMetasPB* metas_pb) {
328
0
    Status status;
329
0
    auto tablet_uid_str = tablet_uid.to_string();
330
0
    auto traverse_func = [meta, metas_pb, &status, &tablet_uid_str](
331
0
                                 std::string_view key, std::string_view value) -> bool {
332
0
        VLOG_DEBUG << fmt::format("key:{}, value:{}", key, value);
333
0
        if (!starts_with_binlog_meta(key)) {
334
0
            auto err_msg = fmt::format("invalid binlog meta key:{}", key);
335
0
            status = Status::InternalError(err_msg);
336
0
            LOG(WARNING) << err_msg;
337
0
            return false;
338
0
        }
339
340
0
        BinlogMetaEntryPB binlog_meta_entry_pb;
341
0
        if (!binlog_meta_entry_pb.ParseFromArray(value.data(), cast_set<int32_t>(value.size()))) {
342
0
            auto err_msg = fmt::format("fail to parse binlog meta value:{}", value);
343
0
            status = Status::InternalError(err_msg);
344
0
            LOG(WARNING) << err_msg;
345
0
            return false;
346
0
        }
347
0
        auto& rowset_id = binlog_meta_entry_pb.rowset_id_v2();
348
349
0
        auto binlog_meta_pb = metas_pb->add_rowset_binlog_metas();
350
0
        binlog_meta_pb->set_rowset_id(rowset_id);
351
0
        binlog_meta_pb->set_version(binlog_meta_entry_pb.version());
352
0
        binlog_meta_pb->set_num_segments(binlog_meta_entry_pb.num_segments());
353
0
        binlog_meta_pb->set_meta_key(std::string {key});
354
0
        binlog_meta_pb->set_meta(std::string {value});
355
356
0
        auto binlog_data_key =
357
0
                make_binlog_data_key(tablet_uid_str, binlog_meta_entry_pb.version(), rowset_id);
358
0
        std::string binlog_data;
359
0
        status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_data);
360
0
        if (!status.ok()) {
361
0
            LOG(WARNING) << status.to_string();
362
0
            return false;
363
0
        }
364
0
        binlog_meta_pb->set_data_key(binlog_data_key);
365
0
        binlog_meta_pb->set_data(binlog_data);
366
367
0
        return false;
368
0
    };
369
370
0
    for (auto& binlog_version : binlog_versions) {
371
0
        auto prefix_key = make_binlog_meta_key_prefix(tablet_uid, binlog_version);
372
0
        Status iterStatus = meta->iterate(META_COLUMN_FAMILY_INDEX, prefix_key, traverse_func);
373
0
        if (!iterStatus.ok()) {
374
0
            LOG(WARNING) << fmt::format("fail to iterate binlog meta. prefix_key:{}, status:{}",
375
0
                                        prefix_key, iterStatus.to_string());
376
0
            return iterStatus;
377
0
        }
378
0
        if (!status.ok()) {
379
0
            return status;
380
0
        }
381
0
    }
382
0
    return status;
383
0
}
384
385
Status RowsetMetaManager::get_rowset_binlog_metas(OlapMeta* meta, TabletUid tablet_uid,
386
4
                                                  Version version, RowsetBinlogMetasPB* metas_pb) {
387
4
    Status status;
388
4
    auto tablet_uid_str = tablet_uid.to_string();
389
4
    auto prefix_key = make_binlog_meta_key_prefix(tablet_uid);
390
4
    auto begin_key = make_binlog_meta_key_prefix(tablet_uid, version.first);
391
4
    auto end_key = make_binlog_meta_key_prefix(tablet_uid, version.second + 1);
392
4
    auto traverse_func = [meta, metas_pb, &status, &tablet_uid_str, &end_key](
393
4
                                 std::string_view key, std::string_view value) -> bool {
394
0
        VLOG_DEBUG << fmt::format("get rowset binlog metas, key={}, value={}", key, value);
395
0
        if (key.compare(end_key) > 0) { // the binlog meta key is binary comparable.
396
            // All binlog meta has been scanned
397
0
            return false;
398
0
        }
399
400
0
        if (!starts_with_binlog_meta(key)) {
401
0
            auto err_msg = fmt::format("invalid binlog meta key:{}", key);
402
0
            status = Status::InternalError(err_msg);
403
0
            LOG(WARNING) << err_msg;
404
0
            return false;
405
0
        }
406
407
0
        BinlogMetaEntryPB binlog_meta_entry_pb;
408
0
        if (!binlog_meta_entry_pb.ParseFromArray(value.data(), cast_set<int32_t>(value.size()))) {
409
0
            auto err_msg = fmt::format("fail to parse binlog meta value:{}", value);
410
0
            status = Status::InternalError(err_msg);
411
0
            LOG(WARNING) << err_msg;
412
0
            return false;
413
0
        }
414
415
0
        const auto& rowset_id = binlog_meta_entry_pb.rowset_id_v2();
416
0
        auto* binlog_meta_pb = metas_pb->add_rowset_binlog_metas();
417
0
        binlog_meta_pb->set_rowset_id(rowset_id);
418
0
        binlog_meta_pb->set_version(binlog_meta_entry_pb.version());
419
0
        binlog_meta_pb->set_num_segments(binlog_meta_entry_pb.num_segments());
420
0
        binlog_meta_pb->set_meta_key(std::string {key});
421
0
        binlog_meta_pb->set_meta(std::string {value});
422
423
0
        auto binlog_data_key =
424
0
                make_binlog_data_key(tablet_uid_str, binlog_meta_entry_pb.version(), rowset_id);
425
0
        std::string binlog_data;
426
0
        status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_data);
427
0
        if (!status.ok()) {
428
0
            LOG(WARNING) << status.to_string();
429
0
            return false;
430
0
        }
431
0
        binlog_meta_pb->set_data_key(binlog_data_key);
432
0
        binlog_meta_pb->set_data(binlog_data);
433
434
0
        return true;
435
0
    };
436
437
4
    Status iterStatus =
438
4
            meta->iterate(META_COLUMN_FAMILY_INDEX, begin_key, prefix_key, traverse_func);
439
4
    if (!iterStatus.ok()) {
440
0
        LOG(WARNING) << fmt::format(
441
0
                "fail to iterate binlog meta. prefix_key:{}, version:{}, status:{}", prefix_key,
442
0
                version.to_string(), iterStatus.to_string());
443
0
        return iterStatus;
444
0
    }
445
4
    return status;
446
4
}
447
448
Status RowsetMetaManager::_get_all_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid,
449
0
                                                       RowsetBinlogMetasPB* metas_pb) {
450
0
    Status status;
451
0
    auto tablet_uid_str = tablet_uid.to_string();
452
0
    int64_t tablet_id = 0;
453
0
    auto traverse_func = [meta, metas_pb, &status, &tablet_uid_str, &tablet_id](
454
0
                                 std::string_view key, std::string_view value) -> bool {
455
0
        VLOG_DEBUG << fmt::format("key:{}, value:{}", key, value);
456
0
        if (!starts_with_binlog_meta(key)) {
457
0
            LOG(INFO) << fmt::format("end scan binlog meta. key:{}", key);
458
0
            return false;
459
0
        }
460
461
0
        BinlogMetaEntryPB binlog_meta_entry_pb;
462
0
        if (!binlog_meta_entry_pb.ParseFromArray(value.data(), cast_set<int32_t>(value.size()))) {
463
0
            auto err_msg = fmt::format("fail to parse binlog meta value:{}", value);
464
0
            status = Status::InternalError(err_msg);
465
0
            LOG(WARNING) << err_msg;
466
0
            return false;
467
0
        }
468
0
        if (tablet_id == 0) {
469
0
            tablet_id = binlog_meta_entry_pb.tablet_id();
470
0
        } else if (tablet_id != binlog_meta_entry_pb.tablet_id()) {
471
            // scan all binlog meta, so tablet_id should be same:
472
0
            return false;
473
0
        }
474
0
        auto& rowset_id = binlog_meta_entry_pb.rowset_id_v2();
475
476
0
        auto binlog_meta_pb = metas_pb->add_rowset_binlog_metas();
477
0
        binlog_meta_pb->set_rowset_id(rowset_id);
478
0
        binlog_meta_pb->set_version(binlog_meta_entry_pb.version());
479
0
        binlog_meta_pb->set_num_segments(binlog_meta_entry_pb.num_segments());
480
0
        binlog_meta_pb->set_meta_key(std::string {key});
481
0
        binlog_meta_pb->set_meta(std::string {value});
482
483
0
        auto binlog_data_key =
484
0
                make_binlog_data_key(tablet_uid_str, binlog_meta_entry_pb.version(), rowset_id);
485
0
        std::string binlog_data;
486
0
        status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_data);
487
0
        if (!status.ok()) {
488
0
            LOG(WARNING) << status;
489
0
            return false;
490
0
        }
491
0
        binlog_meta_pb->set_data_key(binlog_data_key);
492
0
        binlog_meta_pb->set_data(binlog_data);
493
494
0
        return true;
495
0
    };
496
497
0
    auto prefix_key = make_binlog_meta_key_prefix(tablet_uid);
498
0
    Status iterStatus = meta->iterate(META_COLUMN_FAMILY_INDEX, prefix_key, traverse_func);
499
0
    if (!iterStatus.ok()) {
500
0
        LOG(WARNING) << fmt::format("fail to iterate binlog meta. prefix_key:{}, status:{}",
501
0
                                    prefix_key, iterStatus.to_string());
502
0
        return iterStatus;
503
0
    }
504
0
    return status;
505
0
}
506
507
8
Status RowsetMetaManager::remove(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id) {
508
8
    std::string key = ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string();
509
8
    VLOG_NOTICE << "start to remove rowset, key:" << key;
510
8
    Status status = meta->remove(META_COLUMN_FAMILY_INDEX, key);
511
8
    VLOG_NOTICE << "remove rowset key:" << key << " finished";
512
8
    return status;
513
8
}
514
515
0
Status RowsetMetaManager::remove_binlog(OlapMeta* meta, const std::string& suffix) {
516
    // Please do not remove std::vector<std::string>, more info refer to pr#23190
517
0
    return meta->remove(META_COLUMN_FAMILY_INDEX,
518
0
                        std::vector<std::string> {kBinlogMetaPrefix.data() + suffix,
519
0
                                                  kBinlogDataPrefix.data() + suffix});
520
0
}
521
522
Status RowsetMetaManager::remove_row_binlog(OlapMeta* meta, TabletUid tablet_uid,
523
                                            const RowsetId& base_rowset_id,
524
3
                                            const RowsetId& row_binlog_rowset_id) {
525
3
    return meta->remove(META_COLUMN_FAMILY_INDEX,
526
3
                        make_row_binlog_key(tablet_uid, base_rowset_id, row_binlog_rowset_id));
527
3
}
528
529
Status RowsetMetaManager::remove_row_binlog_metas(OlapMeta* meta, TabletUid tablet_uid,
530
1
                                                  const std::set<RowsetId>& row_binlog_rowset_ids) {
531
1
    std::map<RowsetId, RowsetId> base_rowset_id_to_row_binlog;
532
1
    RETURN_IF_ERROR(get_row_binlog_base_rowset_ids(meta, tablet_uid, base_rowset_id_to_row_binlog,
533
1
                                                   row_binlog_rowset_ids));
534
1
    for (const auto& [base_rowset_id, row_binlog_rowset_id] : base_rowset_id_to_row_binlog) {
535
1
        RETURN_IF_ERROR(remove_row_binlog(meta, tablet_uid, base_rowset_id, row_binlog_rowset_id));
536
1
    }
537
1
    return Status::OK();
538
1
}
539
540
bool RowsetMetaManager::row_binlog_meta_exists(OlapMeta* meta, TabletUid tablet_uid,
541
21
                                               const RowsetId& row_binlog_rowset_id) {
542
21
    bool found = false;
543
21
    auto probe = [&found, &row_binlog_rowset_id](std::string_view key,
544
21
                                                 std::string_view /* value */) -> bool {
545
0
        std::vector<std::string> parts;
546
        // key format: binlog_row_uuid_{rowset_id}_{row_binlog_rowset_id}
547
0
        RETURN_IF_ERROR(split_string(key, '_', &parts));
548
0
        if (parts.size() != 5) {
549
0
            LOG(WARNING) << "invalid binlog<row> key:" << key << ", splitted size:" << parts.size();
550
0
            return true;
551
0
        }
552
0
        RowsetId id;
553
0
        id.init(parts[4]);
554
0
        if (id == row_binlog_rowset_id) {
555
0
            found = true;
556
0
            return false;
557
0
        }
558
0
        return true;
559
0
    };
560
21
    static_cast<void>(meta->iterate(META_COLUMN_FAMILY_INDEX,
561
21
                                    std::string(kRowBinlogPrefix) + tablet_uid.to_string(), probe));
562
21
    return found;
563
21
}
564
565
Status RowsetMetaManager::get_row_binlog_base_rowset_ids(
566
        OlapMeta* meta, TabletUid tablet_uid,
567
        std::map<RowsetId, RowsetId>& base_rowset_id_to_row_binlog,
568
4
        const std::set<RowsetId>& row_binlog_rowset_ids) {
569
4
    auto collect_row_binlog_base_rowset_id =
570
4
            [&base_rowset_id_to_row_binlog, &row_binlog_rowset_ids](
571
4
                    std::string_view key, std::string_view /* value */) -> bool {
572
4
        std::vector<std::string> parts;
573
        // key format: binlog_row_uuid_{rowset_id}_{row_binlog_rowset_id}
574
4
        RETURN_IF_ERROR(split_string(key, '_', &parts));
575
4
        if (parts.size() != 5) {
576
0
            LOG(WARNING) << "invalid binlog<row> key:" << key << ", splitted size:" << parts.size();
577
0
            return true;
578
0
        }
579
580
4
        RowsetId rowset_id;
581
4
        rowset_id.init(parts[3]);
582
4
        RowsetId row_binlog_rowset_id;
583
4
        row_binlog_rowset_id.init(parts[4]);
584
4
        if (row_binlog_rowset_ids.contains(row_binlog_rowset_id)) {
585
4
            base_rowset_id_to_row_binlog.emplace(rowset_id, row_binlog_rowset_id);
586
4
        }
587
4
        return true;
588
4
    };
589
4
    return meta->iterate(META_COLUMN_FAMILY_INDEX,
590
4
                         std::string(kRowBinlogPrefix) + tablet_uid.to_string(),
591
4
                         collect_row_binlog_base_rowset_id);
592
4
}
593
594
Status RowsetMetaManager::ingest_binlog_metas(OlapMeta* meta, TabletUid tablet_uid,
595
0
                                              RowsetBinlogMetasPB* metas_pb) {
596
0
    std::vector<OlapMeta::BatchEntry> entries;
597
0
    const auto tablet_uid_str = tablet_uid.to_string();
598
599
0
    for (auto& rowset_binlog_meta : *metas_pb->mutable_rowset_binlog_metas()) {
600
0
        auto& rowset_id = rowset_binlog_meta.rowset_id();
601
0
        auto version = rowset_binlog_meta.version();
602
603
0
        auto meta_key = rowset_binlog_meta.mutable_meta_key();
604
0
        *meta_key = make_binlog_meta_key(tablet_uid_str, version, rowset_id);
605
0
        auto data_key = rowset_binlog_meta.mutable_data_key();
606
0
        *data_key = make_binlog_data_key(tablet_uid_str, version, rowset_id);
607
608
0
        entries.emplace_back(*meta_key, rowset_binlog_meta.meta());
609
0
        entries.emplace_back(*data_key, rowset_binlog_meta.data());
610
0
    }
611
612
0
    return meta->put(META_COLUMN_FAMILY_INDEX, entries);
613
0
}
614
615
Status RowsetMetaManager::traverse_rowset_metas(
616
        OlapMeta* meta,
617
50
        std::function<bool(const TabletUid&, const RowsetId&, std::string_view)> const& func) {
618
50
    auto traverse_rowset_meta_func = [&func](std::string_view key, std::string_view value) -> bool {
619
0
        std::vector<std::string> parts;
620
        // key format: rst_uuid_rowset_id
621
0
        RETURN_IF_ERROR(split_string(key, '_', &parts));
622
0
        if (parts.size() != 3) {
623
0
            LOG(WARNING) << "invalid rowset key:" << key << ", splitted size:" << parts.size();
624
0
            return true;
625
0
        }
626
0
        RowsetId rowset_id;
627
0
        rowset_id.init(parts[2]);
628
0
        std::vector<std::string> uid_parts;
629
0
        RETURN_IF_ERROR(split_string(parts[1], '-', &uid_parts));
630
0
        TabletUid tablet_uid(uid_parts[0], uid_parts[1]);
631
0
        return func(tablet_uid, rowset_id, value);
632
0
    };
633
50
    Status status =
634
50
            meta->iterate(META_COLUMN_FAMILY_INDEX, ROWSET_PREFIX, traverse_rowset_meta_func);
635
50
    return status;
636
50
}
637
638
Status RowsetMetaManager::traverse_binlog_metas(
639
        OlapMeta* meta,
640
0
        std::function<bool(std::string_view, std::string_view, bool)> const& collector) {
641
0
    std::pair<std::string, bool> last_info = std::make_pair(kBinlogMetaPrefix.data(), false);
642
0
    bool seek_found = false;
643
0
    Status status;
644
0
    auto traverse_binlog_meta_func = [&last_info, &seek_found, &collector](
645
0
                                             std::string_view key, std::string_view value) -> bool {
646
0
        seek_found = true;
647
0
        auto& [last_prefix, need_collect] = last_info;
648
0
        size_t pos = key.find('_', kBinlogMetaPrefix.size());
649
0
        if (pos == std::string::npos) {
650
0
            LOG(WARNING) << "invalid binlog meta key: " << key;
651
0
            return true;
652
0
        }
653
0
        std::string_view key_view(key.data(), pos);
654
0
        std::string_view last_prefix_view(last_prefix.data(), last_prefix.size() - 1);
655
656
0
        if (last_prefix_view != key_view) {
657
0
            need_collect = collector(key, value, true);
658
0
            last_prefix = std::string(key_view) + "~";
659
0
        } else if (need_collect) {
660
0
            collector(key, value, false);
661
0
        }
662
663
0
        return need_collect;
664
0
    };
665
666
0
    do {
667
0
        seek_found = false;
668
0
        status = meta->iterate(META_COLUMN_FAMILY_INDEX, last_info.first, kBinlogMetaPrefix.data(),
669
0
                               traverse_binlog_meta_func);
670
0
    } while (status.ok() && seek_found);
671
672
0
    return status;
673
0
}
674
675
Status RowsetMetaManager::traverse_row_binlog_metas(
676
        OlapMeta* meta, std::function<bool(const TabletUid&, const RowsetId&, const RowsetId&,
677
54
                                           const std::string&)> const& func) {
678
54
    auto traverse_row_binlog_rowset_meta_func = [&func](std::string_view key,
679
54
                                                        std::string_view value) -> bool {
680
2
        std::vector<std::string> parts;
681
        // key format: binlog_row_uuid_{rowset_id}_{row_binlog_rowset_id}
682
2
        RETURN_IF_ERROR(split_string(key, '_', &parts));
683
2
        if (parts.size() != 5) {
684
0
            LOG(WARNING) << "invalid rowset key:" << key << ", splitted size:" << parts.size();
685
0
            return true;
686
0
        }
687
2
        std::vector<std::string> uid_parts;
688
2
        RETURN_IF_ERROR(split_string(parts[2], '-', &uid_parts));
689
2
        if (uid_parts.size() != 2) {
690
0
            LOG(WARNING) << "invalid tablet uid in binlog<row> key:" << key
691
0
                         << ", splitted size:" << uid_parts.size();
692
0
            return true;
693
0
        }
694
2
        TabletUid tablet_uid(uid_parts[0], uid_parts[1]);
695
2
        RowsetId rowset_id;
696
2
        rowset_id.init(parts[3]);
697
2
        RowsetId row_binlog_rowset_id;
698
2
        row_binlog_rowset_id.init(parts[4]);
699
2
        return func(tablet_uid, rowset_id, row_binlog_rowset_id, std::string(value));
700
2
    };
701
54
    return meta->iterate(META_COLUMN_FAMILY_INDEX, std::string(kRowBinlogPrefix),
702
54
                         traverse_row_binlog_rowset_meta_func);
703
54
}
704
705
Status RowsetMetaManager::save_partial_update_info(
706
        OlapMeta* meta, int64_t tablet_id, int64_t partition_id, int64_t txn_id,
707
0
        const PartialUpdateInfoPB& partial_update_info_pb) {
708
0
    std::string key =
709
0
            fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, partition_id, txn_id);
710
0
    std::string value;
711
0
    if (!partial_update_info_pb.SerializeToString(&value)) {
712
0
        return Status::Error<SERIALIZE_PROTOBUF_ERROR>(
713
0
                "serialize partial update info failed. key={}", key);
714
0
    }
715
0
    VLOG_NOTICE << "save partial update info, key=" << key << ", value_size=" << value.size();
716
0
    return meta->put(META_COLUMN_FAMILY_INDEX, key, value);
717
0
}
718
719
Status RowsetMetaManager::try_get_partial_update_info(OlapMeta* meta, int64_t tablet_id,
720
                                                      int64_t partition_id, int64_t txn_id,
721
0
                                                      PartialUpdateInfoPB* partial_update_info_pb) {
722
0
    std::string key =
723
0
            fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, partition_id, txn_id);
724
0
    std::string value;
725
0
    Status status = meta->get(META_COLUMN_FAMILY_INDEX, key, &value);
726
0
    if (status.is<META_KEY_NOT_FOUND>()) {
727
0
        return status;
728
0
    }
729
0
    if (!status.ok()) {
730
0
        LOG_WARNING("failed to get partial update info. tablet_id={}, partition_id={}, txn_id={}",
731
0
                    tablet_id, partition_id, txn_id);
732
0
        return status;
733
0
    }
734
0
    if (!partial_update_info_pb->ParseFromString(value)) {
735
0
        return Status::Error<ErrorCode::PARSE_PROTOBUF_ERROR>(
736
0
                "fail to parse partial update info content to protobuf object. tablet_id={}, "
737
0
                "partition_id={}, txn_id={}",
738
0
                tablet_id, partition_id, txn_id);
739
0
    }
740
0
    return Status::OK();
741
0
}
742
743
Status RowsetMetaManager::traverse_partial_update_info(
744
        OlapMeta* meta,
745
0
        std::function<bool(int64_t, int64_t, int64_t, std::string_view)> const& func) {
746
0
    auto traverse_partial_update_info_func = [&func](std::string_view key,
747
0
                                                     std::string_view value) -> bool {
748
0
        std::vector<std::string> parts;
749
        // key format: pui_{tablet_id}_{partition_id}_{txn_id}
750
0
        RETURN_IF_ERROR(split_string(key, '_', &parts));
751
0
        if (parts.size() != 4) {
752
0
            LOG_WARNING("invalid rowset key={}, splitted size={}", key, parts.size());
753
0
            return true;
754
0
        }
755
0
        int64_t tablet_id = std::stoll(parts[1]);
756
0
        int64_t partition_id = std::stoll(parts[2]);
757
0
        int64_t txn_id = std::stoll(parts[3]);
758
0
        return func(tablet_id, partition_id, txn_id, value);
759
0
    };
760
0
    return meta->iterate(META_COLUMN_FAMILY_INDEX, PARTIAL_UPDATE_INFO_PREFIX,
761
0
                         traverse_partial_update_info_func);
762
0
}
763
764
Status RowsetMetaManager::remove_partial_update_info(OlapMeta* meta, int64_t tablet_id,
765
0
                                                     int64_t partition_id, int64_t txn_id) {
766
0
    std::string key =
767
0
            fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, partition_id, txn_id);
768
0
    Status res = meta->remove(META_COLUMN_FAMILY_INDEX, key);
769
0
    VLOG_NOTICE << "remove partial update info, key=" << key;
770
0
    return res;
771
0
}
772
773
Status RowsetMetaManager::remove_partial_update_infos(
774
0
        OlapMeta* meta, const std::vector<std::tuple<int64_t, int64_t, int64_t>>& keys) {
775
0
    std::vector<std::string> remove_keys;
776
0
    for (auto [tablet_id, partition_id, txn_id] : keys) {
777
0
        remove_keys.push_back(fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id,
778
0
                                          partition_id, txn_id));
779
0
    }
780
0
    Status res = meta->remove(META_COLUMN_FAMILY_INDEX, remove_keys);
781
0
    VLOG_NOTICE << "remove partial update info, remove_keys.size()=" << remove_keys.size();
782
0
    return res;
783
0
}
784
785
Status RowsetMetaManager::remove_tablet_related_partial_update_info(OlapMeta* meta,
786
0
                                                                    int64_t tablet_id) {
787
0
    std::string prefix = fmt::format("{}{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id);
788
0
    std::vector<std::string> remove_keys;
789
0
    auto get_remove_keys_func = [&](std::string_view key, std::string_view val) -> bool {
790
0
        remove_keys.emplace_back(key);
791
0
        return true;
792
0
    };
793
0
    VLOG_NOTICE << "remove tablet related partial update info, tablet_id: " << tablet_id
794
0
                << " removed keys size: " << remove_keys.size();
795
0
    RETURN_IF_ERROR(meta->iterate(META_COLUMN_FAMILY_INDEX, prefix, get_remove_keys_func));
796
0
    return meta->remove(META_COLUMN_FAMILY_INDEX, remove_keys);
797
0
}
798
} // namespace doris