be/src/storage/tablet/tablet_meta_manager.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/tablet/tablet_meta_manager.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/olap_file.pb.h> |
22 | | #include <rocksdb/db.h> |
23 | | #include <rocksdb/write_batch.h> |
24 | | |
25 | | #include <boost/algorithm/string/trim.hpp> |
26 | | #include <fstream> |
27 | | #include <string> |
28 | | #include <vector> |
29 | | |
30 | | #include "common/logging.h" |
31 | | #include "common/status.h" |
32 | | #include "json2pb/json_to_pb.h" |
33 | | #include "json2pb/pb_to_json.h" |
34 | | #include "storage/data_dir.h" |
35 | | #include "storage/olap_define.h" |
36 | | #include "storage/olap_meta.h" |
37 | | #include "storage/utils.h" |
38 | | |
39 | | namespace rocksdb { |
40 | | class Iterator; |
41 | | class Slice; |
42 | | class Status; |
43 | | struct ColumnFamilyOptions; |
44 | | struct DBOptions; |
45 | | struct ReadOptions; |
46 | | struct WriteOptions; |
47 | | } // namespace rocksdb |
48 | | namespace doris { |
49 | | using namespace ErrorCode; |
50 | | |
51 | | // should use tablet->generate_tablet_meta_copy() method to get a copy of current tablet meta |
52 | | // there are some rowset meta in local meta store and in in-memory tablet meta |
53 | | // but not in tablet meta in local meta store |
54 | | Status TabletMetaManager::get_meta(DataDir* store, TTabletId tablet_id, TSchemaHash schema_hash, |
55 | 934 | TabletMetaSharedPtr tablet_meta) { |
56 | 934 | OlapMeta* meta = store->get_meta(); |
57 | 934 | std::stringstream key_stream; |
58 | 934 | key_stream << HEADER_PREFIX << tablet_id << "_" << schema_hash; |
59 | 934 | std::string key = key_stream.str(); |
60 | 934 | std::string value; |
61 | 934 | Status s = meta->get(META_COLUMN_FAMILY_INDEX, key, &value); |
62 | 934 | if (s.is<META_KEY_NOT_FOUND>()) { |
63 | 22 | return Status::Error<META_KEY_NOT_FOUND>("tablet_id:{}, schema_hash:{}, not found.", |
64 | 22 | tablet_id, schema_hash); |
65 | 912 | } else if (!s.ok()) { |
66 | 0 | LOG(WARNING) << "load tablet_id:" << tablet_id << ", schema_hash:" << schema_hash |
67 | 0 | << " failed."; |
68 | 0 | return s; |
69 | 0 | } |
70 | 912 | return tablet_meta->deserialize(value); |
71 | 934 | } |
72 | | |
73 | | Status TabletMetaManager::get_json_meta(DataDir* store, TTabletId tablet_id, |
74 | 4 | TSchemaHash schema_hash, std::string* json_meta) { |
75 | 4 | TabletMetaSharedPtr tablet_meta(new TabletMeta()); |
76 | 4 | Status s = get_meta(store, tablet_id, schema_hash, tablet_meta); |
77 | 4 | if (!s.ok()) { |
78 | 0 | return s; |
79 | 0 | } |
80 | 4 | json2pb::Pb2JsonOptions json_options; |
81 | 4 | json_options.pretty_json = true; |
82 | 4 | tablet_meta->to_json(json_meta, json_options); |
83 | 4 | return Status::OK(); |
84 | 4 | } |
85 | | |
86 | | // TODO(ygl): |
87 | | // 1. if term > 0 then save to remote meta store first using term |
88 | | // 2. save to local meta store |
89 | | Status TabletMetaManager::save(DataDir* store, TTabletId tablet_id, TSchemaHash schema_hash, |
90 | 2 | TabletMetaSharedPtr tablet_meta, std::string_view header_prefix) { |
91 | 2 | std::string key = fmt::format("{}{}_{}", header_prefix, tablet_id, schema_hash); |
92 | 2 | std::string value; |
93 | 2 | tablet_meta->serialize(&value); |
94 | 2 | if (tablet_meta->partition_id() <= 0) { |
95 | 0 | LOG(WARNING) << "invalid partition id " << tablet_meta->partition_id() << " tablet " |
96 | 0 | << tablet_meta->tablet_id(); |
97 | | // TODO(dx): after fix partition id eq 0 bug, fix it |
98 | | // return Status::InternalError("invaid partition id {} tablet {}", |
99 | | // tablet_meta->partition_id(), tablet_meta->tablet_id()); |
100 | 0 | } |
101 | 2 | OlapMeta* meta = store->get_meta(); |
102 | 2 | VLOG_NOTICE << "save tablet meta" |
103 | 2 | << ", key:" << key << ", meta length:" << value.length(); |
104 | 2 | return meta->put(META_COLUMN_FAMILY_INDEX, key, value); |
105 | 2 | } |
106 | | |
107 | | Status TabletMetaManager::save(DataDir* store, TTabletId tablet_id, TSchemaHash schema_hash, |
108 | 1.14k | const std::string& meta_binary, std::string_view header_prefix) { |
109 | 1.14k | std::string key = fmt::format("{}{}_{}", header_prefix, tablet_id, schema_hash); |
110 | 1.14k | OlapMeta* meta = store->get_meta(); |
111 | 1.14k | VLOG_NOTICE << "save tablet meta " |
112 | 966 | << ", key:" << key << " meta_size=" << meta_binary.length(); |
113 | 1.14k | return meta->put(META_COLUMN_FAMILY_INDEX, key, meta_binary); |
114 | 1.14k | } |
115 | | |
116 | | // TODO(ygl): |
117 | | // 1. remove load data first |
118 | | // 2. remove from load meta store using term if term > 0 |
119 | | Status TabletMetaManager::remove(DataDir* store, TTabletId tablet_id, TSchemaHash schema_hash, |
120 | 452 | std::string_view header_prefix) { |
121 | 452 | std::string key = fmt::format("{}{}_{}", header_prefix, tablet_id, schema_hash); |
122 | 452 | OlapMeta* meta = store->get_meta(); |
123 | 452 | Status res = meta->remove(META_COLUMN_FAMILY_INDEX, key); |
124 | 452 | VLOG_NOTICE << "remove tablet_meta, key:" << key << ", res:" << res; |
125 | 452 | return res; |
126 | 452 | } |
127 | | |
128 | | Status TabletMetaManager::traverse_headers( |
129 | | OlapMeta* meta, std::function<bool(long, long, std::string_view)> const& func, |
130 | 179 | std::string_view header_prefix) { |
131 | 179 | auto traverse_header_func = [&func](std::string_view key, std::string_view value) -> bool { |
132 | 0 | std::vector<std::string> parts; |
133 | | // old format key format: "hdr_" + tablet_id + "_" + schema_hash 0.11 |
134 | | // new format key format: "tabletmeta_" + tablet_id + "_" + schema_hash 0.10 |
135 | 0 | static_cast<void>(split_string(key, '_', &parts)); |
136 | 0 | if (parts.size() != 3) { |
137 | 0 | LOG(WARNING) << "invalid tablet_meta key:" << key << ", split size:" << parts.size(); |
138 | 0 | return true; |
139 | 0 | } |
140 | 0 | TTabletId tablet_id = std::stol(parts[1], nullptr, 10); |
141 | 0 | TSchemaHash schema_hash = cast_set<int32_t>(std::stol(parts[2], nullptr, 10)); |
142 | 0 | return func(tablet_id, schema_hash, value); |
143 | 0 | }; |
144 | 179 | Status status = meta->iterate(META_COLUMN_FAMILY_INDEX, header_prefix, traverse_header_func); |
145 | 179 | return status; |
146 | 179 | } |
147 | | |
148 | 2 | Status TabletMetaManager::load_json_meta(DataDir* store, const std::string& meta_path) { |
149 | 2 | std::ifstream infile(meta_path); |
150 | 2 | char buffer[102400]; |
151 | 2 | std::string json_meta; |
152 | 238 | while (!infile.eof()) { |
153 | 236 | infile.getline(buffer, 102400); |
154 | 236 | json_meta = json_meta + buffer; |
155 | 236 | } |
156 | 2 | boost::algorithm::trim(json_meta); |
157 | 2 | TabletMetaPB tablet_meta_pb; |
158 | 2 | std::string error; |
159 | 2 | bool ret = json2pb::JsonToProtoMessage(json_meta, &tablet_meta_pb, &error); |
160 | 2 | if (!ret) { |
161 | 0 | return Status::Error<HEADER_LOAD_JSON_HEADER>("JSON to protobuf message failed: {}", error); |
162 | 0 | } |
163 | | |
164 | 2 | std::string meta_binary; |
165 | 2 | tablet_meta_pb.SerializeToString(&meta_binary); |
166 | 2 | TTabletId tablet_id = tablet_meta_pb.tablet_id(); |
167 | 2 | TSchemaHash schema_hash = tablet_meta_pb.schema_hash(); |
168 | 2 | return save(store, tablet_id, schema_hash, meta_binary); |
169 | 2 | } |
170 | | |
171 | | Status TabletMetaManager::save_pending_publish_info(DataDir* store, TTabletId tablet_id, |
172 | | int64_t publish_version, |
173 | 4.07k | const std::string& meta_binary) { |
174 | 4.07k | std::string key = fmt::format("{}{}_{}", PENDING_PUBLISH_INFO, tablet_id, publish_version); |
175 | 4.07k | OlapMeta* meta = store->get_meta(); |
176 | 4.07k | LOG(INFO) << "save pending publish rowset, key:" << key |
177 | 4.07k | << " meta_size=" << meta_binary.length(); |
178 | 4.07k | return meta->put(META_COLUMN_FAMILY_INDEX, key, meta_binary); |
179 | 4.07k | } |
180 | | |
181 | | Status TabletMetaManager::remove_pending_publish_info(DataDir* store, TTabletId tablet_id, |
182 | 74 | int64_t publish_version) { |
183 | 74 | std::string key = fmt::format("{}{}_{}", PENDING_PUBLISH_INFO, tablet_id, publish_version); |
184 | 74 | OlapMeta* meta = store->get_meta(); |
185 | 74 | Status res = meta->remove(META_COLUMN_FAMILY_INDEX, key); |
186 | 74 | LOG(INFO) << "remove pending publish_info, key:" << key << ", res:" << res; |
187 | 74 | return res; |
188 | 74 | } |
189 | | |
190 | | Status TabletMetaManager::traverse_pending_publish( |
191 | 92 | OlapMeta* meta, std::function<bool(int64_t, int64_t, std::string_view)> const& func) { |
192 | 92 | auto traverse_header_func = [&func](std::string_view key, std::string_view value) -> bool { |
193 | 14 | std::vector<std::string> parts; |
194 | | // key format: "ppi_" + tablet_id + "_" + publish_version |
195 | 14 | static_cast<void>(split_string(key, '_', &parts)); |
196 | 14 | if (parts.size() != 3) { |
197 | 0 | LOG(WARNING) << "invalid pending publish info key:" << key |
198 | 0 | << ", split size:" << parts.size(); |
199 | 0 | return true; |
200 | 0 | } |
201 | 14 | int64_t tablet_id = std::stol(parts[1], nullptr, 10); |
202 | 14 | int64_t version = std::stol(parts[2], nullptr, 10); |
203 | 14 | return func(tablet_id, version, value); |
204 | 14 | }; |
205 | 92 | Status status = |
206 | 92 | meta->iterate(META_COLUMN_FAMILY_INDEX, PENDING_PUBLISH_INFO, traverse_header_func); |
207 | 92 | return status; |
208 | 92 | } |
209 | | |
210 | 610 | std::string TabletMetaManager::encode_delete_bitmap_key(TTabletId tablet_id, int64_t version) { |
211 | 610 | std::string key; |
212 | 610 | key.reserve(20); |
213 | 610 | key.append(DELETE_BITMAP); |
214 | 610 | put_fixed64_le(&key, to_endian<std::endian::big>(tablet_id)); |
215 | 610 | put_fixed64_le(&key, to_endian<std::endian::big>(version)); |
216 | 610 | return key; |
217 | 610 | } |
218 | | |
219 | 6 | std::string TabletMetaManager::encode_delete_bitmap_key(TTabletId tablet_id) { |
220 | 6 | std::string key; |
221 | 6 | key.reserve(12); |
222 | 6 | key.append(DELETE_BITMAP); |
223 | 6 | put_fixed64_le(&key, to_endian<std::endian::big>(tablet_id)); |
224 | 6 | return key; |
225 | 6 | } |
226 | | |
227 | | void NO_SANITIZE_UNDEFINED TabletMetaManager::decode_delete_bitmap_key(std::string_view enc_key, |
228 | | TTabletId* tablet_id, |
229 | 1.19k | int64_t* version) { |
230 | 1.19k | DCHECK_EQ(enc_key.size(), 20); |
231 | 1.19k | *tablet_id = to_endian<std::endian::big>(unaligned_load<uint64_t>(enc_key.data() + 4)); |
232 | 1.19k | *version = to_endian<std::endian::big>(unaligned_load<uint64_t>(enc_key.data() + 12)); |
233 | 1.19k | } |
234 | | |
235 | | Status TabletMetaManager::save_delete_bitmap(DataDir* store, TTabletId tablet_id, |
236 | 606 | DeleteBitmapPtr delete_bitmap, int64_t version) { |
237 | 606 | VLOG_NOTICE << "save delete bitmap, tablet_id:" << tablet_id << ", version: " << version; |
238 | 606 | if (delete_bitmap->delete_bitmap.empty()) { |
239 | 4 | return Status::OK(); |
240 | 4 | } |
241 | 602 | OlapMeta* meta = store->get_meta(); |
242 | 602 | DeleteBitmapPB delete_bitmap_pb; |
243 | 15.0k | for (auto& [id, bitmap] : delete_bitmap->delete_bitmap) { |
244 | 15.0k | auto& rowset_id = std::get<0>(id); |
245 | 15.0k | auto segment_id = std::get<1>(id); |
246 | 15.0k | delete_bitmap_pb.add_rowset_ids(rowset_id.to_string()); |
247 | 15.0k | delete_bitmap_pb.add_segment_ids(segment_id); |
248 | 15.0k | std::string bitmap_data(bitmap.getSizeInBytes(), '\0'); |
249 | 15.0k | bitmap.write(bitmap_data.data()); |
250 | 15.0k | *(delete_bitmap_pb.add_segment_delete_bitmaps()) = std::move(bitmap_data); |
251 | 15.0k | } |
252 | 602 | std::string key = encode_delete_bitmap_key(tablet_id, version); |
253 | 602 | std::string val; |
254 | 602 | bool ok = delete_bitmap_pb.SerializeToString(&val); |
255 | 602 | if (!ok) { |
256 | 0 | auto msg = fmt::format("failed to serialize delete bitmap, tablet_id: {}, version: {}", |
257 | 0 | tablet_id, version); |
258 | 0 | LOG(WARNING) << msg; |
259 | 0 | return Status::InternalError(msg); |
260 | 0 | } |
261 | 602 | return meta->put(META_COLUMN_FAMILY_INDEX, key, val); |
262 | 602 | } |
263 | | |
264 | | Status TabletMetaManager::traverse_delete_bitmap( |
265 | 98 | OlapMeta* meta, std::function<bool(int64_t, int64_t, std::string_view)> const& func) { |
266 | 1.19k | auto traverse_header_func = [&func](std::string_view key, std::string_view value) -> bool { |
267 | 1.19k | TTabletId tablet_id; |
268 | 1.19k | int64_t version; |
269 | 1.19k | decode_delete_bitmap_key(key, &tablet_id, &version); |
270 | 1.19k | VLOG_NOTICE << "traverse delete bitmap, tablet_id: " << tablet_id |
271 | 1.19k | << ", version: " << version; |
272 | 1.19k | return func(tablet_id, version, value); |
273 | 1.19k | }; |
274 | 98 | return meta->iterate(META_COLUMN_FAMILY_INDEX, DELETE_BITMAP, traverse_header_func); |
275 | 98 | } |
276 | | |
277 | | Status TabletMetaManager::remove_old_version_delete_bitmap(DataDir* store, TTabletId tablet_id, |
278 | 6 | int64_t version) { |
279 | 6 | OlapMeta* meta = store->get_meta(); |
280 | 6 | std::string begin_key = encode_delete_bitmap_key(tablet_id); |
281 | 6 | std::string end_key = encode_delete_bitmap_key(tablet_id, version); |
282 | | |
283 | 6 | std::vector<std::string> remove_keys; |
284 | 604 | auto get_remove_keys_func = [&](std::string_view key, std::string_view val) -> bool { |
285 | | // include end_key |
286 | 604 | if (key > end_key) { |
287 | 4 | return false; |
288 | 4 | } |
289 | 600 | remove_keys.emplace_back(key); |
290 | 600 | return true; |
291 | 604 | }; |
292 | 6 | LOG(INFO) << "remove old version delete bitmap, tablet_id: " << tablet_id |
293 | 6 | << " version: " << version << ", removed keys size: " << remove_keys.size(); |
294 | 6 | RETURN_IF_ERROR(meta->iterate(META_COLUMN_FAMILY_INDEX, begin_key, get_remove_keys_func)); |
295 | 6 | return meta->remove(META_COLUMN_FAMILY_INDEX, remove_keys); |
296 | 6 | } |
297 | | |
298 | | } // namespace doris |