Coverage Report

Created: 2025-04-10 14:34

/root/doris/be/src/olap/olap_meta.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "olap/olap_meta.h"
19
20
#include <bvar/latency_recorder.h>
21
#include <fmt/format.h>
22
#include <fmt/ranges.h>
23
#include <rocksdb/env.h>
24
#include <rocksdb/iterator.h>
25
#include <rocksdb/status.h>
26
#include <rocksdb/write_batch.h>
27
#include <stddef.h>
28
#include <stdint.h>
29
30
#include <memory>
31
#include <sstream>
32
#include <vector>
33
34
#include "common/config.h"
35
#include "common/logging.h"
36
#include "olap/olap_define.h"
37
#include "rocksdb/convenience.h"
38
#include "rocksdb/db.h"
39
#include "rocksdb/options.h"
40
#include "rocksdb/slice.h"
41
#include "rocksdb/slice_transform.h"
42
#include "util/defer_op.h"
43
#include "util/doris_metrics.h"
44
#include "util/runtime_profile.h"
45
46
using rocksdb::DB;
47
using rocksdb::DBOptions;
48
using rocksdb::ColumnFamilyDescriptor;
49
using rocksdb::ColumnFamilyOptions;
50
using rocksdb::ReadOptions;
51
using rocksdb::WriteOptions;
52
using rocksdb::Iterator;
53
using rocksdb::NewFixedPrefixTransform;
54
55
namespace doris {
56
using namespace ErrorCode;
57
const std::string META_POSTFIX = "/meta";
58
const size_t PREFIX_LENGTH = 4;
59
60
bvar::LatencyRecorder g_meta_put_latency("meta_put");
61
bvar::LatencyRecorder g_meta_get_latency("meta_get");
62
bvar::LatencyRecorder g_meta_remove_latency("meta_remove");
63
64
91
OlapMeta::OlapMeta(const std::string& root_path) : _root_path(root_path) {}
65
66
91
OlapMeta::~OlapMeta() = default;
67
68
class RocksdbLogger : public rocksdb::Logger {
69
public:
70
26.5k
    void Logv(const char* format, va_list ap) override {
71
26.5k
        char buf[1024];
72
26.5k
        vsnprintf(buf, sizeof(buf), format, ap);
73
26.5k
        LOG(INFO) << "[Rocksdb] " << buf;
74
26.5k
    }
75
};
76
77
91
Status OlapMeta::init() {
78
    // init db
79
91
    DBOptions options;
80
91
    options.IncreaseParallelism();
81
91
    options.create_if_missing = true;
82
91
    options.create_missing_column_families = true;
83
91
    options.info_log = std::make_shared<RocksdbLogger>();
84
91
    options.info_log_level = rocksdb::WARN_LEVEL;
85
86
91
    std::string db_path = _root_path + META_POSTFIX;
87
91
    std::vector<ColumnFamilyDescriptor> column_families;
88
    // default column family is required
89
91
    column_families.emplace_back(DEFAULT_COLUMN_FAMILY, ColumnFamilyOptions());
90
91
    column_families.emplace_back(DORIS_COLUMN_FAMILY, ColumnFamilyOptions());
91
92
    // meta column family add prefix extractor to improve performance and ensure correctness
93
91
    ColumnFamilyOptions meta_column_family;
94
91
    meta_column_family.max_write_buffer_number = config::rocksdb_max_write_buffer_number;
95
91
    meta_column_family.prefix_extractor.reset(NewFixedPrefixTransform(PREFIX_LENGTH));
96
91
    column_families.emplace_back(META_COLUMN_FAMILY, meta_column_family);
97
98
91
    rocksdb::DB* db;
99
91
    std::vector<rocksdb::ColumnFamilyHandle*> handles;
100
91
    rocksdb::Status s = DB::Open(options, db_path, column_families, &handles, &db);
101
91
    _db = std::unique_ptr<rocksdb::DB, std::function<void(rocksdb::DB*)>>(db, [](rocksdb::DB* db) {
102
91
        rocksdb::Status s = db->SyncWAL();
103
91
        if (!s.ok()) {
104
0
            LOG(WARNING) << "rocksdb sync wal failed: " << s.ToString();
105
0
        }
106
91
        rocksdb::CancelAllBackgroundWork(db, true);
107
91
        s = db->Close();
108
91
        if (!s.ok()) {
109
0
            LOG(WARNING) << "rocksdb close failed: " << s.ToString();
110
0
        }
111
91
        LOG(INFO) << "finish close rocksdb for OlapMeta";
112
113
91
        delete db;
114
91
    });
115
273
    for (auto handle : handles) {
116
273
        _handles.emplace_back(handle);
117
273
    }
118
91
    if (!s.ok() || _db == nullptr) {
119
0
        return Status::Error<META_OPEN_DB_ERROR>("rocks db open failed, reason: {}", s.ToString());
120
0
    }
121
91
    return Status::OK();
122
91
}
123
124
522
Status OlapMeta::get(const int column_family_index, const std::string& key, std::string* value) {
125
522
    auto& handle = _handles[column_family_index];
126
522
    int64_t duration_ns = 0;
127
522
    rocksdb::Status s;
128
522
    {
129
522
        SCOPED_RAW_TIMER(&duration_ns);
130
522
        s = _db->Get(ReadOptions(), handle.get(), rocksdb::Slice(key), value);
131
522
    }
132
522
    g_meta_get_latency << (duration_ns / 1000);
133
522
    if (s.IsNotFound()) {
134
36
        return Status::Error<META_KEY_NOT_FOUND>("OlapMeta::get meet not found key");
135
486
    } else if (!s.ok()) {
136
0
        return Status::Error<META_GET_ERROR>("rocks db get failed, key: {}, reason: {}", key,
137
0
                                             s.ToString());
138
0
    }
139
486
    return Status::OK();
140
522
}
141
142
bool OlapMeta::key_may_exist(const int column_family_index, const std::string& key,
143
3
                             std::string* value) {
144
3
    auto& handle = _handles[column_family_index];
145
3
    int64_t duration_ns = 0;
146
3
    bool is_exist = false;
147
3
    {
148
3
        SCOPED_RAW_TIMER(&duration_ns);
149
3
        is_exist = _db->KeyMayExist(ReadOptions(), handle.get(), rocksdb::Slice(key), value);
150
3
    }
151
3
    g_meta_get_latency << (duration_ns / 1000);
152
153
3
    return is_exist;
154
3
}
155
156
Status OlapMeta::put(const int column_family_index, const std::string& key,
157
2.98k
                     const std::string& value) {
158
    // log all params
159
2.98k
    VLOG_DEBUG << "column_family_index: " << column_family_index << ", key: " << key
160
0
               << ", value: " << value;
161
162
2.98k
    auto& handle = _handles[column_family_index];
163
2.98k
    rocksdb::Status s;
164
2.98k
    {
165
2.98k
        int64_t duration_ns = 0;
166
2.98k
        Defer defer([&] { g_meta_put_latency << (duration_ns / 1000); });
167
2.98k
        SCOPED_RAW_TIMER(&duration_ns);
168
169
2.98k
        WriteOptions write_options;
170
2.98k
        write_options.sync = config::sync_tablet_meta;
171
2.98k
        s = _db->Put(write_options, handle.get(), rocksdb::Slice(key), rocksdb::Slice(value));
172
2.98k
    }
173
174
2.98k
    if (!s.ok()) {
175
0
        return Status::Error<META_PUT_ERROR>("rocks db put failed, key: {}, reason: {}", key,
176
0
                                             s.ToString());
177
0
    }
178
2.98k
    return Status::OK();
179
2.98k
}
180
181
2
Status OlapMeta::put(const int column_family_index, const std::vector<BatchEntry>& entries) {
182
2
    auto* handle = _handles[column_family_index].get();
183
2
    rocksdb::Status s;
184
2
    {
185
2
        int64_t duration_ns = 0;
186
2
        Defer defer([&] { g_meta_put_latency << (duration_ns / 1000); });
187
2
        SCOPED_RAW_TIMER(&duration_ns);
188
189
        // construct write batch
190
2
        rocksdb::WriteBatch write_batch;
191
6
        for (auto entry : entries) {
192
6
            VLOG_DEBUG << "column_family_index: " << column_family_index << ", key: " << entry.key
193
0
                       << ", value: " << entry.value;
194
6
            write_batch.Put(handle, rocksdb::Slice(entry.key), rocksdb::Slice(entry.value));
195
6
        }
196
197
        // write to rocksdb
198
2
        WriteOptions write_options;
199
2
        write_options.sync = config::sync_tablet_meta;
200
2
        s = _db->Write(write_options, &write_batch);
201
2
    }
202
203
2
    if (!s.ok()) {
204
0
        return Status::Error<META_PUT_ERROR>("rocks db put failed, reason: {}", s.ToString());
205
0
    }
206
2
    return Status::OK();
207
2
}
208
209
0
Status OlapMeta::put(rocksdb::WriteBatch* batch) {
210
0
    rocksdb::Status s;
211
0
    {
212
0
        int64_t duration_ns = 0;
213
0
        Defer defer([&] { g_meta_put_latency << (duration_ns / 1000); });
214
0
        SCOPED_RAW_TIMER(&duration_ns);
215
216
0
        WriteOptions write_options;
217
0
        write_options.sync = config::sync_tablet_meta;
218
0
        s = _db->Write(write_options, batch);
219
0
    }
220
221
0
    if (!s.ok()) {
222
0
        return Status::Error<META_PUT_ERROR>("rocks db put failed, reason: {}", s.ToString());
223
0
    }
224
0
    return Status::OK();
225
0
}
226
227
273
Status OlapMeta::remove(const int column_family_index, const std::string& key) {
228
273
    auto& handle = _handles[column_family_index];
229
273
    rocksdb::Status s;
230
273
    int64_t duration_ns = 0;
231
273
    {
232
273
        SCOPED_RAW_TIMER(&duration_ns);
233
273
        WriteOptions write_options;
234
273
        write_options.sync = config::sync_tablet_meta;
235
273
        s = _db->Delete(write_options, handle.get(), rocksdb::Slice(key));
236
273
    }
237
273
    g_meta_remove_latency << (duration_ns / 1000);
238
273
    if (!s.ok()) {
239
0
        return Status::Error<META_DELETE_ERROR>("rocks db delete key: {}, failed, reason: {}", key,
240
0
                                                s.ToString());
241
0
    }
242
273
    return Status::OK();
243
273
}
244
245
5
Status OlapMeta::remove(const int column_family_index, const std::vector<std::string>& keys) {
246
5
    auto& handle = _handles[column_family_index];
247
5
    rocksdb::Status s;
248
5
    int64_t duration_ns = 0;
249
5
    {
250
5
        SCOPED_RAW_TIMER(&duration_ns);
251
5
        WriteOptions write_options;
252
5
        write_options.sync = config::sync_tablet_meta;
253
5
        rocksdb::WriteBatch batch;
254
302
        for (auto& key : keys) {
255
302
            batch.Delete(handle.get(), rocksdb::Slice(key));
256
302
        }
257
5
        s = _db->Write(write_options, &batch);
258
5
    }
259
5
    g_meta_remove_latency << (duration_ns / 1000);
260
5
    if (!s.ok()) {
261
0
        return Status::Error<META_DELETE_ERROR>("rocks db delete keys:{} failed, reason:{}", keys,
262
0
                                                s.ToString());
263
0
    }
264
5
    return Status::OK();
265
5
}
266
267
Status OlapMeta::iterate(const int column_family_index, std::string_view prefix,
268
220
                         std::function<bool(std::string_view, std::string_view)> const& func) {
269
220
    return iterate(column_family_index, prefix, prefix, func);
270
220
}
271
272
Status OlapMeta::iterate(const int column_family_index, std::string_view seek_key,
273
                         std::string_view prefix,
274
224
                         std::function<bool(std::string_view, std::string_view)> const& func) {
275
224
    auto& handle = _handles[column_family_index];
276
224
    std::unique_ptr<Iterator> it(_db->NewIterator(ReadOptions(), handle.get()));
277
224
    if (seek_key.empty()) {
278
0
        it->SeekToFirst();
279
224
    } else {
280
224
        it->Seek({seek_key.data(), seek_key.size()});
281
224
    }
282
224
    rocksdb::Status status = it->status();
283
224
    if (!status.ok()) {
284
0
        return Status::Error<META_ITERATOR_ERROR>("rocksdb seek failed. reason: {}",
285
0
                                                  status.ToString());
286
0
    }
287
288
1.13k
    for (; it->Valid(); it->Next()) {
289
917
        if (!prefix.empty()) {
290
917
            if (!it->key().starts_with({prefix.data(), prefix.size()})) {
291
5
                return Status::OK();
292
5
            }
293
917
        }
294
912
        bool ret = func({it->key().data(), it->key().size()},
295
912
                        {it->value().data(), it->value().size()});
296
912
        if (!ret) {
297
3
            break;
298
3
        }
299
912
    }
300
219
    if (!it->status().ok()) {
301
0
        return Status::Error<META_ITERATOR_ERROR>("rocksdb iterator failed. reason: {}",
302
0
                                                  status.ToString());
303
0
    }
304
305
219
    return Status::OK();
306
219
}
307
308
} // namespace doris