be/src/storage/tablet/tablet_meta_manager.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/tablet/tablet_meta_manager.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/olap_file.pb.h> |
22 | | #include <rocksdb/db.h> |
23 | | #include <rocksdb/write_batch.h> |
24 | | |
25 | | #include <boost/algorithm/string/trim.hpp> |
26 | | #include <fstream> |
27 | | #include <string> |
28 | | #include <vector> |
29 | | |
30 | | #include "common/logging.h" |
31 | | #include "common/status.h" |
32 | | #include "json2pb/json_to_pb.h" |
33 | | #include "json2pb/pb_to_json.h" |
34 | | #include "storage/data_dir.h" |
35 | | #include "storage/olap_define.h" |
36 | | #include "storage/olap_meta.h" |
37 | | #include "storage/utils.h" |
38 | | |
39 | | namespace rocksdb { |
40 | | class Iterator; |
41 | | class Slice; |
42 | | class Status; |
43 | | struct ColumnFamilyOptions; |
44 | | struct DBOptions; |
45 | | struct ReadOptions; |
46 | | struct WriteOptions; |
47 | | } // namespace rocksdb |
48 | | namespace doris { |
49 | | #include "common/compile_check_begin.h" |
50 | | using namespace ErrorCode; |
51 | | |
52 | | // should use tablet->generate_tablet_meta_copy() method to get a copy of current tablet meta |
53 | | // there are some rowset meta in local meta store and in in-memory tablet meta |
54 | | // but not in tablet meta in local meta store |
55 | | Status TabletMetaManager::get_meta(DataDir* store, TTabletId tablet_id, TSchemaHash schema_hash, |
56 | 467 | TabletMetaSharedPtr tablet_meta) { |
57 | 467 | OlapMeta* meta = store->get_meta(); |
58 | 467 | std::stringstream key_stream; |
59 | 467 | key_stream << HEADER_PREFIX << tablet_id << "_" << schema_hash; |
60 | 467 | std::string key = key_stream.str(); |
61 | 467 | std::string value; |
62 | 467 | Status s = meta->get(META_COLUMN_FAMILY_INDEX, key, &value); |
63 | 467 | if (s.is<META_KEY_NOT_FOUND>()) { |
64 | 11 | return Status::Error<META_KEY_NOT_FOUND>("tablet_id:{}, schema_hash:{}, not found.", |
65 | 11 | tablet_id, schema_hash); |
66 | 456 | } else if (!s.ok()) { |
67 | 0 | LOG(WARNING) << "load tablet_id:" << tablet_id << ", schema_hash:" << schema_hash |
68 | 0 | << " failed."; |
69 | 0 | return s; |
70 | 0 | } |
71 | 456 | return tablet_meta->deserialize(value); |
72 | 467 | } |
73 | | |
74 | | Status TabletMetaManager::get_json_meta(DataDir* store, TTabletId tablet_id, |
75 | 2 | TSchemaHash schema_hash, std::string* json_meta) { |
76 | 2 | TabletMetaSharedPtr tablet_meta(new TabletMeta()); |
77 | 2 | Status s = get_meta(store, tablet_id, schema_hash, tablet_meta); |
78 | 2 | if (!s.ok()) { |
79 | 0 | return s; |
80 | 0 | } |
81 | 2 | json2pb::Pb2JsonOptions json_options; |
82 | 2 | json_options.pretty_json = true; |
83 | 2 | tablet_meta->to_json(json_meta, json_options); |
84 | 2 | return Status::OK(); |
85 | 2 | } |
86 | | |
87 | | // TODO(ygl): |
88 | | // 1. if term > 0 then save to remote meta store first using term |
89 | | // 2. save to local meta store |
90 | | Status TabletMetaManager::save(DataDir* store, TTabletId tablet_id, TSchemaHash schema_hash, |
91 | 1 | TabletMetaSharedPtr tablet_meta, std::string_view header_prefix) { |
92 | 1 | std::string key = fmt::format("{}{}_{}", header_prefix, tablet_id, schema_hash); |
93 | 1 | std::string value; |
94 | 1 | tablet_meta->serialize(&value); |
95 | 1 | if (tablet_meta->partition_id() <= 0) { |
96 | 0 | LOG(WARNING) << "invalid partition id " << tablet_meta->partition_id() << " tablet " |
97 | 0 | << tablet_meta->tablet_id(); |
98 | | // TODO(dx): after fix partition id eq 0 bug, fix it |
99 | | // return Status::InternalError("invaid partition id {} tablet {}", |
100 | | // tablet_meta->partition_id(), tablet_meta->tablet_id()); |
101 | 0 | } |
102 | 1 | OlapMeta* meta = store->get_meta(); |
103 | 1 | VLOG_NOTICE << "save tablet meta" |
104 | 1 | << ", key:" << key << ", meta length:" << value.length(); |
105 | 1 | return meta->put(META_COLUMN_FAMILY_INDEX, key, value); |
106 | 1 | } |
107 | | |
108 | | Status TabletMetaManager::save(DataDir* store, TTabletId tablet_id, TSchemaHash schema_hash, |
109 | 573 | const std::string& meta_binary, std::string_view header_prefix) { |
110 | 573 | std::string key = fmt::format("{}{}_{}", header_prefix, tablet_id, schema_hash); |
111 | 573 | OlapMeta* meta = store->get_meta(); |
112 | 573 | VLOG_NOTICE << "save tablet meta " |
113 | 483 | << ", key:" << key << " meta_size=" << meta_binary.length(); |
114 | 573 | return meta->put(META_COLUMN_FAMILY_INDEX, key, meta_binary); |
115 | 573 | } |
116 | | |
117 | | // TODO(ygl): |
118 | | // 1. remove load data first |
119 | | // 2. remove from load meta store using term if term > 0 |
120 | | Status TabletMetaManager::remove(DataDir* store, TTabletId tablet_id, TSchemaHash schema_hash, |
121 | 226 | std::string_view header_prefix) { |
122 | 226 | std::string key = fmt::format("{}{}_{}", header_prefix, tablet_id, schema_hash); |
123 | 226 | OlapMeta* meta = store->get_meta(); |
124 | 226 | Status res = meta->remove(META_COLUMN_FAMILY_INDEX, key); |
125 | 226 | VLOG_NOTICE << "remove tablet_meta, key:" << key << ", res:" << res; |
126 | 226 | return res; |
127 | 226 | } |
128 | | |
129 | | Status TabletMetaManager::traverse_headers( |
130 | | OlapMeta* meta, std::function<bool(long, long, std::string_view)> const& func, |
131 | 90 | std::string_view header_prefix) { |
132 | 90 | auto traverse_header_func = [&func](std::string_view key, std::string_view value) -> bool { |
133 | 0 | std::vector<std::string> parts; |
134 | | // old format key format: "hdr_" + tablet_id + "_" + schema_hash 0.11 |
135 | | // new format key format: "tabletmeta_" + tablet_id + "_" + schema_hash 0.10 |
136 | 0 | static_cast<void>(split_string(key, '_', &parts)); |
137 | 0 | if (parts.size() != 3) { |
138 | 0 | LOG(WARNING) << "invalid tablet_meta key:" << key << ", split size:" << parts.size(); |
139 | 0 | return true; |
140 | 0 | } |
141 | 0 | TTabletId tablet_id = std::stol(parts[1], nullptr, 10); |
142 | 0 | TSchemaHash schema_hash = cast_set<int32_t>(std::stol(parts[2], nullptr, 10)); |
143 | 0 | return func(tablet_id, schema_hash, value); |
144 | 0 | }; |
145 | 90 | Status status = meta->iterate(META_COLUMN_FAMILY_INDEX, header_prefix, traverse_header_func); |
146 | 90 | return status; |
147 | 90 | } |
148 | | |
149 | 1 | Status TabletMetaManager::load_json_meta(DataDir* store, const std::string& meta_path) { |
150 | 1 | std::ifstream infile(meta_path); |
151 | 1 | char buffer[102400]; |
152 | 1 | std::string json_meta; |
153 | 119 | while (!infile.eof()) { |
154 | 118 | infile.getline(buffer, 102400); |
155 | 118 | json_meta = json_meta + buffer; |
156 | 118 | } |
157 | 1 | boost::algorithm::trim(json_meta); |
158 | 1 | TabletMetaPB tablet_meta_pb; |
159 | 1 | std::string error; |
160 | 1 | bool ret = json2pb::JsonToProtoMessage(json_meta, &tablet_meta_pb, &error); |
161 | 1 | if (!ret) { |
162 | 0 | return Status::Error<HEADER_LOAD_JSON_HEADER>("JSON to protobuf message failed: {}", error); |
163 | 0 | } |
164 | | |
165 | 1 | std::string meta_binary; |
166 | 1 | tablet_meta_pb.SerializeToString(&meta_binary); |
167 | 1 | TTabletId tablet_id = tablet_meta_pb.tablet_id(); |
168 | 1 | TSchemaHash schema_hash = tablet_meta_pb.schema_hash(); |
169 | 1 | return save(store, tablet_id, schema_hash, meta_binary); |
170 | 1 | } |
171 | | |
172 | | Status TabletMetaManager::save_pending_publish_info(DataDir* store, TTabletId tablet_id, |
173 | | int64_t publish_version, |
174 | 2.03k | const std::string& meta_binary) { |
175 | 2.03k | std::string key = fmt::format("{}{}_{}", PENDING_PUBLISH_INFO, tablet_id, publish_version); |
176 | 2.03k | OlapMeta* meta = store->get_meta(); |
177 | 2.03k | LOG(INFO) << "save pending publish rowset, key:" << key |
178 | 2.03k | << " meta_size=" << meta_binary.length(); |
179 | 2.03k | return meta->put(META_COLUMN_FAMILY_INDEX, key, meta_binary); |
180 | 2.03k | } |
181 | | |
182 | | Status TabletMetaManager::remove_pending_publish_info(DataDir* store, TTabletId tablet_id, |
183 | 37 | int64_t publish_version) { |
184 | 37 | std::string key = fmt::format("{}{}_{}", PENDING_PUBLISH_INFO, tablet_id, publish_version); |
185 | 37 | OlapMeta* meta = store->get_meta(); |
186 | 37 | Status res = meta->remove(META_COLUMN_FAMILY_INDEX, key); |
187 | 37 | LOG(INFO) << "remove pending publish_info, key:" << key << ", res:" << res; |
188 | 37 | return res; |
189 | 37 | } |
190 | | |
191 | | Status TabletMetaManager::traverse_pending_publish( |
192 | 45 | OlapMeta* meta, std::function<bool(int64_t, int64_t, std::string_view)> const& func) { |
193 | 45 | auto traverse_header_func = [&func](std::string_view key, std::string_view value) -> bool { |
194 | 0 | std::vector<std::string> parts; |
195 | | // key format: "ppi_" + tablet_id + "_" + publish_version |
196 | 0 | static_cast<void>(split_string(key, '_', &parts)); |
197 | 0 | if (parts.size() != 3) { |
198 | 0 | LOG(WARNING) << "invalid pending publish info key:" << key |
199 | 0 | << ", split size:" << parts.size(); |
200 | 0 | return true; |
201 | 0 | } |
202 | 0 | int64_t tablet_id = std::stol(parts[1], nullptr, 10); |
203 | 0 | int64_t version = std::stol(parts[2], nullptr, 10); |
204 | 0 | return func(tablet_id, version, value); |
205 | 0 | }; |
206 | 45 | Status status = |
207 | 45 | meta->iterate(META_COLUMN_FAMILY_INDEX, PENDING_PUBLISH_INFO, traverse_header_func); |
208 | 45 | return status; |
209 | 45 | } |
210 | | |
211 | 305 | std::string TabletMetaManager::encode_delete_bitmap_key(TTabletId tablet_id, int64_t version) { |
212 | 305 | std::string key; |
213 | 305 | key.reserve(20); |
214 | 305 | key.append(DELETE_BITMAP); |
215 | 305 | put_fixed64_le(&key, to_endian<std::endian::big>(tablet_id)); |
216 | 305 | put_fixed64_le(&key, to_endian<std::endian::big>(version)); |
217 | 305 | return key; |
218 | 305 | } |
219 | | |
220 | 3 | std::string TabletMetaManager::encode_delete_bitmap_key(TTabletId tablet_id) { |
221 | 3 | std::string key; |
222 | 3 | key.reserve(12); |
223 | 3 | key.append(DELETE_BITMAP); |
224 | 3 | put_fixed64_le(&key, to_endian<std::endian::big>(tablet_id)); |
225 | 3 | return key; |
226 | 3 | } |
227 | | |
228 | | void NO_SANITIZE_UNDEFINED TabletMetaManager::decode_delete_bitmap_key(std::string_view enc_key, |
229 | | TTabletId* tablet_id, |
230 | 599 | int64_t* version) { |
231 | 599 | DCHECK_EQ(enc_key.size(), 20); |
232 | 599 | *tablet_id = to_endian<std::endian::big>(unaligned_load<uint64_t>(enc_key.data() + 4)); |
233 | 599 | *version = to_endian<std::endian::big>(unaligned_load<uint64_t>(enc_key.data() + 12)); |
234 | 599 | } |
235 | | |
236 | | Status TabletMetaManager::save_delete_bitmap(DataDir* store, TTabletId tablet_id, |
237 | 303 | DeleteBitmapPtr delete_bitmap, int64_t version) { |
238 | 303 | VLOG_NOTICE << "save delete bitmap, tablet_id:" << tablet_id << ", version: " << version; |
239 | 303 | if (delete_bitmap->delete_bitmap.empty()) { |
240 | 2 | return Status::OK(); |
241 | 2 | } |
242 | 301 | OlapMeta* meta = store->get_meta(); |
243 | 301 | DeleteBitmapPB delete_bitmap_pb; |
244 | 7.50k | for (auto& [id, bitmap] : delete_bitmap->delete_bitmap) { |
245 | 7.50k | auto& rowset_id = std::get<0>(id); |
246 | 7.50k | auto segment_id = std::get<1>(id); |
247 | 7.50k | delete_bitmap_pb.add_rowset_ids(rowset_id.to_string()); |
248 | 7.50k | delete_bitmap_pb.add_segment_ids(segment_id); |
249 | 7.50k | std::string bitmap_data(bitmap.getSizeInBytes(), '\0'); |
250 | 7.50k | bitmap.write(bitmap_data.data()); |
251 | 7.50k | *(delete_bitmap_pb.add_segment_delete_bitmaps()) = std::move(bitmap_data); |
252 | 7.50k | } |
253 | 301 | std::string key = encode_delete_bitmap_key(tablet_id, version); |
254 | 301 | std::string val; |
255 | 301 | bool ok = delete_bitmap_pb.SerializeToString(&val); |
256 | 301 | if (!ok) { |
257 | 0 | auto msg = fmt::format("failed to serialize delete bitmap, tablet_id: {}, version: {}", |
258 | 0 | tablet_id, version); |
259 | 0 | LOG(WARNING) << msg; |
260 | 0 | return Status::InternalError(msg); |
261 | 0 | } |
262 | 301 | return meta->put(META_COLUMN_FAMILY_INDEX, key, val); |
263 | 301 | } |
264 | | |
265 | | Status TabletMetaManager::traverse_delete_bitmap( |
266 | 49 | OlapMeta* meta, std::function<bool(int64_t, int64_t, std::string_view)> const& func) { |
267 | 598 | auto traverse_header_func = [&func](std::string_view key, std::string_view value) -> bool { |
268 | 598 | TTabletId tablet_id; |
269 | 598 | int64_t version; |
270 | 598 | decode_delete_bitmap_key(key, &tablet_id, &version); |
271 | 598 | VLOG_NOTICE << "traverse delete bitmap, tablet_id: " << tablet_id |
272 | 598 | << ", version: " << version; |
273 | 598 | return func(tablet_id, version, value); |
274 | 598 | }; |
275 | 49 | return meta->iterate(META_COLUMN_FAMILY_INDEX, DELETE_BITMAP, traverse_header_func); |
276 | 49 | } |
277 | | |
278 | | Status TabletMetaManager::remove_old_version_delete_bitmap(DataDir* store, TTabletId tablet_id, |
279 | 3 | int64_t version) { |
280 | 3 | OlapMeta* meta = store->get_meta(); |
281 | 3 | std::string begin_key = encode_delete_bitmap_key(tablet_id); |
282 | 3 | std::string end_key = encode_delete_bitmap_key(tablet_id, version); |
283 | | |
284 | 3 | std::vector<std::string> remove_keys; |
285 | 302 | auto get_remove_keys_func = [&](std::string_view key, std::string_view val) -> bool { |
286 | | // include end_key |
287 | 302 | if (key > end_key) { |
288 | 2 | return false; |
289 | 2 | } |
290 | 300 | remove_keys.emplace_back(key); |
291 | 300 | return true; |
292 | 302 | }; |
293 | 3 | LOG(INFO) << "remove old version delete bitmap, tablet_id: " << tablet_id |
294 | 3 | << " version: " << version << ", removed keys size: " << remove_keys.size(); |
295 | 3 | RETURN_IF_ERROR(meta->iterate(META_COLUMN_FAMILY_INDEX, begin_key, get_remove_keys_func)); |
296 | 3 | return meta->remove(META_COLUMN_FAMILY_INDEX, remove_keys); |
297 | 3 | } |
298 | | |
299 | | #include "common/compile_check_end.h" |
300 | | } // namespace doris |