Coverage Report

Created: 2025-07-25 21:23

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/olap/olap_meta.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "olap/olap_meta.h"
19
20
#include <bvar/latency_recorder.h>
21
#include <fmt/format.h>
22
#include <fmt/ranges.h>
23
#include <rocksdb/env.h>
24
#include <rocksdb/iterator.h>
25
#include <rocksdb/status.h>
26
#include <rocksdb/write_batch.h>
27
#include <stddef.h>
28
#include <stdint.h>
29
30
#include <memory>
31
#include <sstream>
32
#include <vector>
33
34
#include "common/config.h"
35
#include "common/logging.h"
36
#include "olap/olap_define.h"
37
#include "rocksdb/convenience.h"
38
#include "rocksdb/db.h"
39
#include "rocksdb/options.h"
40
#include "rocksdb/slice.h"
41
#include "rocksdb/slice_transform.h"
42
#include "util/defer_op.h"
43
#include "util/doris_metrics.h"
44
#include "util/runtime_profile.h"
45
46
using rocksdb::DB;
47
using rocksdb::DBOptions;
48
using rocksdb::ColumnFamilyDescriptor;
49
using rocksdb::ColumnFamilyOptions;
50
using rocksdb::ReadOptions;
51
using rocksdb::WriteOptions;
52
using rocksdb::Iterator;
53
using rocksdb::NewFixedPrefixTransform;
54
55
namespace doris {
56
using namespace ErrorCode;
57
const std::string META_POSTFIX = "/meta";
58
const size_t PREFIX_LENGTH = 4;
59
60
bvar::LatencyRecorder g_meta_put_latency("meta_put");
61
bvar::LatencyRecorder g_meta_get_latency("meta_get");
62
bvar::LatencyRecorder g_meta_remove_latency("meta_remove");
63
64
196
OlapMeta::OlapMeta(const std::string& root_path) : _root_path(root_path) {}
65
66
196
OlapMeta::~OlapMeta() = default;
67
68
class RocksdbLogger : public rocksdb::Logger {
69
public:
70
57.1k
    void Logv(const char* format, va_list ap) override {
71
57.1k
        char buf[1024];
72
57.1k
        vsnprintf(buf, sizeof(buf), format, ap);
73
57.1k
        LOG(INFO) << "[Rocksdb] " << buf;
74
57.1k
    }
75
};
76
77
196
Status OlapMeta::init() {
78
    // init db
79
196
    DBOptions options;
80
196
    options.IncreaseParallelism();
81
196
    options.create_if_missing = true;
82
196
    options.create_missing_column_families = true;
83
196
    options.info_log = std::make_shared<RocksdbLogger>();
84
196
    options.info_log_level = rocksdb::WARN_LEVEL;
85
86
196
    std::string db_path = _root_path + META_POSTFIX;
87
196
    std::vector<ColumnFamilyDescriptor> column_families;
88
    // default column family is required
89
196
    column_families.emplace_back(DEFAULT_COLUMN_FAMILY, ColumnFamilyOptions());
90
196
    column_families.emplace_back(DORIS_COLUMN_FAMILY, ColumnFamilyOptions());
91
92
    // meta column family add prefix extractor to improve performance and ensure correctness
93
196
    ColumnFamilyOptions meta_column_family;
94
196
    meta_column_family.max_write_buffer_number = config::rocksdb_max_write_buffer_number;
95
196
    meta_column_family.prefix_extractor.reset(NewFixedPrefixTransform(PREFIX_LENGTH));
96
196
    column_families.emplace_back(META_COLUMN_FAMILY, meta_column_family);
97
98
196
    rocksdb::DB* db;
99
196
    std::vector<rocksdb::ColumnFamilyHandle*> handles;
100
196
    rocksdb::Status s = DB::Open(options, db_path, column_families, &handles, &db);
101
196
    _db = std::unique_ptr<rocksdb::DB, std::function<void(rocksdb::DB*)>>(db, [](rocksdb::DB* db) {
102
196
        rocksdb::Status s = db->SyncWAL();
103
196
        if (!s.ok()) {
104
0
            LOG(WARNING) << "rocksdb sync wal failed: " << s.ToString();
105
0
        }
106
196
        rocksdb::CancelAllBackgroundWork(db, true);
107
196
        s = db->Close();
108
196
        if (!s.ok()) {
109
0
            LOG(WARNING) << "rocksdb close failed: " << s.ToString();
110
0
        }
111
196
        LOG(INFO) << "finish close rocksdb for OlapMeta";
112
113
196
        delete db;
114
196
    });
115
588
    for (auto handle : handles) {
116
588
        _handles.emplace_back(handle);
117
588
    }
118
196
    if (!s.ok() || _db == nullptr) {
119
0
        return Status::Error<META_OPEN_DB_ERROR>("rocks db open failed, reason: {}", s.ToString());
120
0
    }
121
196
    return Status::OK();
122
196
}
123
124
1.04k
Status OlapMeta::get(const int column_family_index, const std::string& key, std::string* value) {
125
1.04k
    auto& handle = _handles[column_family_index];
126
1.04k
    int64_t duration_ns = 0;
127
1.04k
    rocksdb::Status s;
128
1.04k
    {
129
1.04k
        SCOPED_RAW_TIMER(&duration_ns);
130
1.04k
        s = _db->Get(ReadOptions(), handle.get(), rocksdb::Slice(key), value);
131
1.04k
    }
132
1.04k
    g_meta_get_latency << (duration_ns / 1000);
133
1.04k
    if (s.IsNotFound()) {
134
72
        return Status::Error<META_KEY_NOT_FOUND>("OlapMeta::get meet not found key");
135
972
    } else if (!s.ok()) {
136
0
        return Status::Error<META_GET_ERROR>("rocks db get failed, key: {}, reason: {}", key,
137
0
                                             s.ToString());
138
0
    }
139
972
    return Status::OK();
140
1.04k
}
141
142
bool OlapMeta::key_may_exist(const int column_family_index, const std::string& key,
143
6
                             std::string* value) {
144
6
    auto& handle = _handles[column_family_index];
145
6
    int64_t duration_ns = 0;
146
6
    bool is_exist = false;
147
6
    {
148
6
        SCOPED_RAW_TIMER(&duration_ns);
149
6
        is_exist = _db->KeyMayExist(ReadOptions(), handle.get(), rocksdb::Slice(key), value);
150
6
    }
151
6
    g_meta_get_latency << (duration_ns / 1000);
152
153
6
    return is_exist;
154
6
}
155
156
Status OlapMeta::put(const int column_family_index, const std::string& key,
157
5.97k
                     const std::string& value) {
158
    // log all params
159
5.97k
    VLOG_DEBUG << "column_family_index: " << column_family_index << ", key: " << key
160
0
               << ", value: " << value;
161
162
5.97k
    auto& handle = _handles[column_family_index];
163
5.97k
    rocksdb::Status s;
164
5.97k
    {
165
5.97k
        int64_t duration_ns = 0;
166
5.97k
        Defer defer([&] { g_meta_put_latency << (duration_ns / 1000); });
167
5.97k
        SCOPED_RAW_TIMER(&duration_ns);
168
169
5.97k
        WriteOptions write_options;
170
5.97k
        write_options.sync = config::sync_tablet_meta;
171
5.97k
        s = _db->Put(write_options, handle.get(), rocksdb::Slice(key), rocksdb::Slice(value));
172
5.97k
    }
173
174
5.97k
    if (!s.ok()) {
175
0
        return Status::Error<META_PUT_ERROR>("rocks db put failed, key: {}, reason: {}", key,
176
0
                                             s.ToString());
177
0
    }
178
5.97k
    return Status::OK();
179
5.97k
}
180
181
4
Status OlapMeta::put(const int column_family_index, const std::vector<BatchEntry>& entries) {
182
4
    auto* handle = _handles[column_family_index].get();
183
4
    rocksdb::Status s;
184
4
    {
185
4
        int64_t duration_ns = 0;
186
4
        Defer defer([&] { g_meta_put_latency << (duration_ns / 1000); });
187
4
        SCOPED_RAW_TIMER(&duration_ns);
188
189
        // construct write batch
190
4
        rocksdb::WriteBatch write_batch;
191
12
        for (auto entry : entries) {
192
12
            VLOG_DEBUG << "column_family_index: " << column_family_index << ", key: " << entry.key
193
0
                       << ", value: " << entry.value;
194
12
            write_batch.Put(handle, rocksdb::Slice(entry.key), rocksdb::Slice(entry.value));
195
12
        }
196
197
        // write to rocksdb
198
4
        WriteOptions write_options;
199
4
        write_options.sync = config::sync_tablet_meta;
200
4
        s = _db->Write(write_options, &write_batch);
201
4
    }
202
203
4
    if (!s.ok()) {
204
0
        return Status::Error<META_PUT_ERROR>("rocks db put failed, reason: {}", s.ToString());
205
0
    }
206
4
    return Status::OK();
207
4
}
208
209
0
Status OlapMeta::put(rocksdb::WriteBatch* batch) {
210
0
    rocksdb::Status s;
211
0
    {
212
0
        int64_t duration_ns = 0;
213
0
        Defer defer([&] { g_meta_put_latency << (duration_ns / 1000); });
214
0
        SCOPED_RAW_TIMER(&duration_ns);
215
216
0
        WriteOptions write_options;
217
0
        write_options.sync = config::sync_tablet_meta;
218
0
        s = _db->Write(write_options, batch);
219
0
    }
220
221
0
    if (!s.ok()) {
222
0
        return Status::Error<META_PUT_ERROR>("rocks db put failed, reason: {}", s.ToString());
223
0
    }
224
0
    return Status::OK();
225
0
}
226
227
546
Status OlapMeta::remove(const int column_family_index, const std::string& key) {
228
546
    auto& handle = _handles[column_family_index];
229
546
    rocksdb::Status s;
230
546
    int64_t duration_ns = 0;
231
546
    {
232
546
        SCOPED_RAW_TIMER(&duration_ns);
233
546
        WriteOptions write_options;
234
546
        write_options.sync = config::sync_tablet_meta;
235
546
        s = _db->Delete(write_options, handle.get(), rocksdb::Slice(key));
236
546
    }
237
546
    g_meta_remove_latency << (duration_ns / 1000);
238
546
    if (!s.ok()) {
239
0
        return Status::Error<META_DELETE_ERROR>("rocks db delete key: {}, failed, reason: {}", key,
240
0
                                                s.ToString());
241
0
    }
242
546
    return Status::OK();
243
546
}
244
245
10
Status OlapMeta::remove(const int column_family_index, const std::vector<std::string>& keys) {
246
10
    auto& handle = _handles[column_family_index];
247
10
    rocksdb::Status s;
248
10
    int64_t duration_ns = 0;
249
10
    {
250
10
        SCOPED_RAW_TIMER(&duration_ns);
251
10
        WriteOptions write_options;
252
10
        write_options.sync = config::sync_tablet_meta;
253
10
        rocksdb::WriteBatch batch;
254
604
        for (auto& key : keys) {
255
604
            batch.Delete(handle.get(), rocksdb::Slice(key));
256
604
        }
257
10
        s = _db->Write(write_options, &batch);
258
10
    }
259
10
    g_meta_remove_latency << (duration_ns / 1000);
260
10
    if (!s.ok()) {
261
0
        return Status::Error<META_DELETE_ERROR>("rocks db delete keys:{} failed, reason:{}", keys,
262
0
                                                s.ToString());
263
0
    }
264
10
    return Status::OK();
265
10
}
266
267
Status OlapMeta::iterate(const int column_family_index, std::string_view prefix,
268
440
                         std::function<bool(std::string_view, std::string_view)> const& func) {
269
440
    return iterate(column_family_index, prefix, prefix, func);
270
440
}
271
272
Status OlapMeta::iterate(const int column_family_index, std::string_view seek_key,
273
                         std::string_view prefix,
274
448
                         std::function<bool(std::string_view, std::string_view)> const& func) {
275
448
    auto& handle = _handles[column_family_index];
276
448
    std::unique_ptr<Iterator> it(_db->NewIterator(ReadOptions(), handle.get()));
277
448
    if (seek_key.empty()) {
278
0
        it->SeekToFirst();
279
448
    } else {
280
448
        it->Seek({seek_key.data(), seek_key.size()});
281
448
    }
282
448
    rocksdb::Status status = it->status();
283
448
    if (!status.ok()) {
284
0
        return Status::Error<META_ITERATOR_ERROR>("rocksdb seek failed. reason: {}",
285
0
                                                  status.ToString());
286
0
    }
287
288
2.26k
    for (; it->Valid(); it->Next()) {
289
1.83k
        if (!prefix.empty()) {
290
1.83k
            if (!it->key().starts_with({prefix.data(), prefix.size()})) {
291
10
                return Status::OK();
292
10
            }
293
1.83k
        }
294
1.82k
        bool ret = func({it->key().data(), it->key().size()},
295
1.82k
                        {it->value().data(), it->value().size()});
296
1.82k
        if (!ret) {
297
6
            break;
298
6
        }
299
1.82k
    }
300
438
    if (!it->status().ok()) {
301
0
        return Status::Error<META_ITERATOR_ERROR>("rocksdb iterator failed. reason: {}",
302
0
                                                  status.ToString());
303
0
    }
304
305
438
    return Status::OK();
306
438
}
307
308
} // namespace doris