Coverage Report

Created: 2026-05-21 00:20

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/tablet/tablet_meta_manager.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/tablet/tablet_meta_manager.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/olap_file.pb.h>
22
#include <rocksdb/db.h>
23
#include <rocksdb/write_batch.h>
24
25
#include <boost/algorithm/string/trim.hpp>
26
#include <fstream>
27
#include <string>
28
#include <unordered_set>
29
#include <vector>
30
31
#include "common/logging.h"
32
#include "common/status.h"
33
#include "json2pb/json_to_pb.h"
34
#include "json2pb/pb_to_json.h"
35
#include "storage/data_dir.h"
36
#include "storage/olap_define.h"
37
#include "storage/olap_meta.h"
38
#include "storage/utils.h"
39
40
namespace rocksdb {
41
class Iterator;
42
class Slice;
43
class Status;
44
struct ColumnFamilyOptions;
45
struct DBOptions;
46
struct ReadOptions;
47
struct WriteOptions;
48
} // namespace rocksdb
49
namespace doris {
50
using namespace ErrorCode;
51
52
// should use tablet->generate_tablet_meta_copy() method to get a copy of current tablet meta
53
// there are some rowset meta in local meta store and in in-memory tablet meta
54
// but not in tablet meta in local meta store
55
Status TabletMetaManager::get_meta(DataDir* store, TTabletId tablet_id, TSchemaHash schema_hash,
56
467
                                   TabletMetaSharedPtr tablet_meta) {
57
467
    OlapMeta* meta = store->get_meta();
58
467
    std::stringstream key_stream;
59
467
    key_stream << HEADER_PREFIX << tablet_id << "_" << schema_hash;
60
467
    std::string key = key_stream.str();
61
467
    std::string value;
62
467
    Status s = meta->get(META_COLUMN_FAMILY_INDEX, key, &value);
63
467
    if (s.is<META_KEY_NOT_FOUND>()) {
64
11
        return Status::Error<META_KEY_NOT_FOUND>("tablet_id:{}, schema_hash:{}, not found.",
65
11
                                                 tablet_id, schema_hash);
66
456
    } else if (!s.ok()) {
67
0
        LOG(WARNING) << "load tablet_id:" << tablet_id << ", schema_hash:" << schema_hash
68
0
                     << " failed.";
69
0
        return s;
70
0
    }
71
456
    return tablet_meta->deserialize(value);
72
467
}
73
74
Status TabletMetaManager::get_json_meta(DataDir* store, TTabletId tablet_id,
75
2
                                        TSchemaHash schema_hash, std::string* json_meta) {
76
2
    TabletMetaSharedPtr tablet_meta(new TabletMeta());
77
2
    Status s = get_meta(store, tablet_id, schema_hash, tablet_meta);
78
2
    if (!s.ok()) {
79
0
        return s;
80
0
    }
81
2
    json2pb::Pb2JsonOptions json_options;
82
2
    json_options.pretty_json = true;
83
2
    tablet_meta->to_json(json_meta, json_options);
84
2
    return Status::OK();
85
2
}
86
87
// TODO(ygl):
88
// 1. if term > 0 then save to remote meta store first using term
89
// 2. save to local meta store
90
Status TabletMetaManager::save(DataDir* store, TTabletId tablet_id, TSchemaHash schema_hash,
91
1
                               TabletMetaSharedPtr tablet_meta, std::string_view header_prefix) {
92
1
    std::string key = fmt::format("{}{}_{}", header_prefix, tablet_id, schema_hash);
93
1
    std::string value;
94
1
    tablet_meta->serialize(&value);
95
1
    if (tablet_meta->partition_id() <= 0) {
96
0
        LOG(WARNING) << "invalid partition id " << tablet_meta->partition_id() << " tablet "
97
0
                     << tablet_meta->tablet_id();
98
        // TODO(dx): after fix partition id eq 0 bug, fix it
99
        // return Status::InternalError("invaid partition id {} tablet {}",
100
        //  tablet_meta->partition_id(), tablet_meta->tablet_id());
101
0
    }
102
1
    OlapMeta* meta = store->get_meta();
103
1
    VLOG_NOTICE << "save tablet meta"
104
1
                << ", key:" << key << ", meta length:" << value.length();
105
1
    return meta->put(META_COLUMN_FAMILY_INDEX, key, value);
106
1
}
107
108
Status TabletMetaManager::save(DataDir* store, TTabletId tablet_id, TSchemaHash schema_hash,
109
583
                               const std::string& meta_binary, std::string_view header_prefix) {
110
583
    std::string key = fmt::format("{}{}_{}", header_prefix, tablet_id, schema_hash);
111
583
    OlapMeta* meta = store->get_meta();
112
583
    VLOG_NOTICE << "save tablet meta "
113
483
                << ", key:" << key << " meta_size=" << meta_binary.length();
114
583
    return meta->put(META_COLUMN_FAMILY_INDEX, key, meta_binary);
115
583
}
116
117
// TODO(ygl):
118
// 1. remove load data first
119
// 2. remove from load meta store using term if term > 0
120
Status TabletMetaManager::remove(DataDir* store, TTabletId tablet_id, TSchemaHash schema_hash,
121
226
                                 std::string_view header_prefix) {
122
226
    std::string key = fmt::format("{}{}_{}", header_prefix, tablet_id, schema_hash);
123
226
    OlapMeta* meta = store->get_meta();
124
226
    Status res = meta->remove(META_COLUMN_FAMILY_INDEX, key);
125
226
    VLOG_NOTICE << "remove tablet_meta, key:" << key << ", res:" << res;
126
226
    return res;
127
226
}
128
129
Status TabletMetaManager::traverse_headers(
130
        OlapMeta* meta, std::function<bool(long, long, std::string_view)> const& func,
131
100
        std::string_view header_prefix) {
132
100
    auto traverse_header_func = [&func](std::string_view key, std::string_view value) -> bool {
133
0
        std::vector<std::string> parts;
134
        // old format key format: "hdr_" + tablet_id + "_" + schema_hash  0.11
135
        // new format key format: "tabletmeta_" + tablet_id + "_" + schema_hash  0.10
136
0
        static_cast<void>(split_string(key, '_', &parts));
137
0
        if (parts.size() != 3) {
138
0
            LOG(WARNING) << "invalid tablet_meta key:" << key << ", split size:" << parts.size();
139
0
            return true;
140
0
        }
141
0
        TTabletId tablet_id = std::stol(parts[1], nullptr, 10);
142
0
        TSchemaHash schema_hash = cast_set<int32_t>(std::stol(parts[2], nullptr, 10));
143
0
        return func(tablet_id, schema_hash, value);
144
0
    };
145
100
    Status status = meta->iterate(META_COLUMN_FAMILY_INDEX, header_prefix, traverse_header_func);
146
100
    return status;
147
100
}
148
149
2
Status TabletMetaManager::load_json_meta(DataDir* store, const std::string& meta_path) {
150
2
    std::ifstream infile(meta_path);
151
2
    if (!infile.is_open()) {
152
1
        return Status::InternalError("Failed to open file {}", meta_path);
153
1
    }
154
155
1
    std::string json_meta((std::istreambuf_iterator<char>(infile)),
156
1
                          std::istreambuf_iterator<char>());
157
1
    boost::algorithm::trim(json_meta);
158
1
    TabletMetaPB tablet_meta_pb;
159
1
    std::string error;
160
1
    bool ret = json2pb::JsonToProtoMessage(json_meta, &tablet_meta_pb, &error);
161
1
    if (!ret) {
162
0
        return Status::Error<HEADER_LOAD_JSON_HEADER>("JSON to protobuf message failed: {}", error);
163
0
    }
164
165
1
    std::string meta_binary;
166
1
    tablet_meta_pb.SerializeToString(&meta_binary);
167
1
    TTabletId tablet_id = tablet_meta_pb.tablet_id();
168
1
    TSchemaHash schema_hash = tablet_meta_pb.schema_hash();
169
1
    return save(store, tablet_id, schema_hash, meta_binary);
170
1
}
171
172
Status TabletMetaManager::save_pending_publish_info(DataDir* store, TTabletId tablet_id,
173
                                                    int64_t publish_version,
174
2.03k
                                                    const std::string& meta_binary) {
175
2.03k
    std::string key = fmt::format("{}{}_{}", PENDING_PUBLISH_INFO, tablet_id, publish_version);
176
2.03k
    OlapMeta* meta = store->get_meta();
177
2.03k
    LOG(INFO) << "save pending publish rowset, key:" << key
178
2.03k
              << " meta_size=" << meta_binary.length();
179
2.03k
    return meta->put(META_COLUMN_FAMILY_INDEX, key, meta_binary);
180
2.03k
}
181
182
Status TabletMetaManager::remove_pending_publish_info(DataDir* store, TTabletId tablet_id,
183
37
                                                      int64_t publish_version) {
184
37
    std::string key = fmt::format("{}{}_{}", PENDING_PUBLISH_INFO, tablet_id, publish_version);
185
37
    OlapMeta* meta = store->get_meta();
186
37
    Status res = meta->remove(META_COLUMN_FAMILY_INDEX, key);
187
37
    LOG(INFO) << "remove pending publish_info, key:" << key << ", res:" << res;
188
37
    return res;
189
37
}
190
191
Status TabletMetaManager::traverse_pending_publish(
192
51
        OlapMeta* meta, std::function<bool(int64_t, int64_t, std::string_view)> const& func) {
193
51
    auto traverse_header_func = [&func](std::string_view key, std::string_view value) -> bool {
194
7
        std::vector<std::string> parts;
195
        // key format: "ppi_" + tablet_id + "_" + publish_version
196
7
        static_cast<void>(split_string(key, '_', &parts));
197
7
        if (parts.size() != 3) {
198
0
            LOG(WARNING) << "invalid pending publish info key:" << key
199
0
                         << ", split size:" << parts.size();
200
0
            return true;
201
0
        }
202
7
        int64_t tablet_id = std::stol(parts[1], nullptr, 10);
203
7
        int64_t version = std::stol(parts[2], nullptr, 10);
204
7
        return func(tablet_id, version, value);
205
7
    };
206
51
    Status status =
207
51
            meta->iterate(META_COLUMN_FAMILY_INDEX, PENDING_PUBLISH_INFO, traverse_header_func);
208
51
    return status;
209
51
}
210
211
305
std::string TabletMetaManager::encode_delete_bitmap_key(TTabletId tablet_id, int64_t version) {
212
305
    std::string key;
213
305
    key.reserve(20);
214
305
    key.append(DELETE_BITMAP);
215
305
    put_fixed64_le(&key, to_endian<std::endian::big>(tablet_id));
216
305
    put_fixed64_le(&key, to_endian<std::endian::big>(version));
217
305
    return key;
218
305
}
219
220
3
std::string TabletMetaManager::encode_delete_bitmap_key(TTabletId tablet_id) {
221
3
    std::string key;
222
3
    key.reserve(12);
223
3
    key.append(DELETE_BITMAP);
224
3
    put_fixed64_le(&key, to_endian<std::endian::big>(tablet_id));
225
3
    return key;
226
3
}
227
228
void NO_SANITIZE_UNDEFINED TabletMetaManager::decode_delete_bitmap_key(std::string_view enc_key,
229
                                                                       TTabletId* tablet_id,
230
599
                                                                       int64_t* version) {
231
599
    DCHECK_EQ(enc_key.size(), 20);
232
599
    *tablet_id = to_endian<std::endian::big>(unaligned_load<uint64_t>(enc_key.data() + 4));
233
599
    *version = to_endian<std::endian::big>(unaligned_load<uint64_t>(enc_key.data() + 12));
234
599
}
235
236
Status TabletMetaManager::save_delete_bitmap(DataDir* store, TTabletId tablet_id,
237
300
                                             DeleteBitmapPtr delete_bitmap, int64_t version) {
238
300
    return save_delete_bitmap(store, tablet_id, std::move(delete_bitmap), nullptr, version);
239
300
}
240
241
Status TabletMetaManager::save_delete_bitmap(DataDir* store, TTabletId tablet_id,
242
                                             DeleteBitmapPtr delete_bitmap,
243
303
                                             DeleteBitmapPtr binlog_delvec, int64_t version) {
244
303
    VLOG_NOTICE << "save delete bitmap, tablet_id:" << tablet_id << ", version: " << version;
245
303
    if ((delete_bitmap == nullptr || delete_bitmap->delete_bitmap.empty()) &&
246
303
        (binlog_delvec == nullptr || binlog_delvec->delete_bitmap.empty())) {
247
2
        return Status::OK();
248
2
    }
249
301
    OlapMeta* meta = store->get_meta();
250
301
    DeleteBitmapPB delete_bitmap_pb;
251
252
    // Normal delete bitmap.
253
301
    if (delete_bitmap != nullptr) {
254
7.50k
        for (auto& [id, bitmap] : delete_bitmap->delete_bitmap) {
255
7.50k
            auto& rowset_id = std::get<0>(id);
256
7.50k
            auto segment_id = std::get<1>(id);
257
7.50k
            delete_bitmap_pb.add_rowset_ids(rowset_id.to_string());
258
7.50k
            delete_bitmap_pb.add_segment_ids(segment_id);
259
7.50k
            std::string bitmap_data(bitmap.getSizeInBytes(), '\0');
260
7.50k
            bitmap.write(bitmap_data.data());
261
7.50k
            *(delete_bitmap_pb.add_segment_delete_bitmaps()) = std::move(bitmap_data);
262
7.50k
            delete_bitmap_pb.add_is_binlog_delvec(false);
263
7.50k
        }
264
301
    }
265
266
    // Binlog delvec.
267
301
    if (binlog_delvec != nullptr) {
268
0
        for (auto& [id, bitmap] : binlog_delvec->delete_bitmap) {
269
0
            auto& rowset_id = std::get<0>(id);
270
0
            auto segment_id = std::get<1>(id);
271
0
            delete_bitmap_pb.add_rowset_ids(rowset_id.to_string());
272
0
            delete_bitmap_pb.add_segment_ids(segment_id);
273
0
            std::string bitmap_data(bitmap.getSizeInBytes(), '\0');
274
0
            bitmap.write(bitmap_data.data());
275
0
            *(delete_bitmap_pb.add_segment_delete_bitmaps()) = std::move(bitmap_data);
276
0
            delete_bitmap_pb.add_is_binlog_delvec(true);
277
0
        }
278
0
    }
279
280
301
    std::string key = encode_delete_bitmap_key(tablet_id, version);
281
301
    std::string val;
282
301
    bool ok = delete_bitmap_pb.SerializeToString(&val);
283
301
    if (!ok) {
284
0
        auto msg = fmt::format("failed to serialize delete bitmap, tablet_id: {}, version: {}",
285
0
                               tablet_id, version);
286
0
        LOG(WARNING) << msg;
287
0
        return Status::InternalError(msg);
288
0
    }
289
301
    return meta->put(META_COLUMN_FAMILY_INDEX, key, val);
290
301
}
291
292
Status TabletMetaManager::traverse_delete_bitmap(
293
54
        OlapMeta* meta, std::function<bool(int64_t, int64_t, std::string_view)> const& func) {
294
598
    auto traverse_header_func = [&func](std::string_view key, std::string_view value) -> bool {
295
598
        TTabletId tablet_id;
296
598
        int64_t version;
297
598
        decode_delete_bitmap_key(key, &tablet_id, &version);
298
598
        VLOG_NOTICE << "traverse delete bitmap, tablet_id: " << tablet_id
299
598
                    << ", version: " << version;
300
598
        return func(tablet_id, version, value);
301
598
    };
302
54
    return meta->iterate(META_COLUMN_FAMILY_INDEX, DELETE_BITMAP, traverse_header_func);
303
54
}
304
305
Status TabletMetaManager::remove_old_version_delete_bitmap(DataDir* store, TTabletId tablet_id,
306
3
                                                           int64_t version) {
307
3
    OlapMeta* meta = store->get_meta();
308
3
    std::string begin_key = encode_delete_bitmap_key(tablet_id);
309
3
    std::string end_key = encode_delete_bitmap_key(tablet_id, version);
310
311
3
    std::vector<std::string> remove_keys;
312
302
    auto get_remove_keys_func = [&](std::string_view key, std::string_view val) -> bool {
313
        // include end_key
314
302
        if (key > end_key) {
315
2
            return false;
316
2
        }
317
300
        remove_keys.emplace_back(key);
318
300
        return true;
319
302
    };
320
3
    LOG(INFO) << "remove old version delete bitmap, tablet_id: " << tablet_id
321
3
              << " version: " << version << ", removed keys size: " << remove_keys.size();
322
3
    RETURN_IF_ERROR(meta->iterate(META_COLUMN_FAMILY_INDEX, begin_key, get_remove_keys_func));
323
3
    return meta->remove(META_COLUMN_FAMILY_INDEX, remove_keys);
324
3
}
325
326
} // namespace doris