be/src/storage/rowset/rowset_meta_manager.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/rowset/rowset_meta_manager.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/olap_file.pb.h> |
22 | | |
23 | | #include <boost/algorithm/string/trim.hpp> |
24 | | #include <fstream> |
25 | | #include <functional> |
26 | | #include <memory> |
27 | | #include <new> |
28 | | #include <string> |
29 | | #include <string_view> |
30 | | #include <vector> |
31 | | |
32 | | #include "common/logging.h" |
33 | | #include "storage/binlog.h" |
34 | | #include "storage/olap_define.h" |
35 | | #include "storage/olap_meta.h" |
36 | | #include "storage/utils.h" |
37 | | #include "util/debug_points.h" |
38 | | |
39 | | namespace doris { |
40 | | using namespace ErrorCode; |
41 | | |
42 | | bool RowsetMetaManager::check_rowset_meta(OlapMeta* meta, TabletUid tablet_uid, |
43 | 0 | const RowsetId& rowset_id) { |
44 | 0 | std::string key = ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string(); |
45 | 0 | std::string value; |
46 | 0 | return meta->key_may_exist(META_COLUMN_FAMILY_INDEX, key, &value); |
47 | 0 | } |
48 | | |
49 | 40 | Status RowsetMetaManager::exists(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id) { |
50 | 40 | std::string key = ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string(); |
51 | 40 | std::string value; |
52 | 40 | return meta->get(META_COLUMN_FAMILY_INDEX, key, &value); |
53 | 40 | } |
54 | | |
55 | | Status RowsetMetaManager::get_rowset_meta(OlapMeta* meta, TabletUid tablet_uid, |
56 | | const RowsetId& rowset_id, |
57 | 10 | RowsetMetaSharedPtr rowset_meta) { |
58 | 10 | std::string key = ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string(); |
59 | 10 | std::string value; |
60 | 10 | Status s = meta->get(META_COLUMN_FAMILY_INDEX, key, &value); |
61 | 10 | if (s.is<META_KEY_NOT_FOUND>()) { |
62 | 3 | return Status::Error<META_KEY_NOT_FOUND>("rowset id: {} not found.", key); |
63 | 7 | } else if (!s.ok()) { |
64 | 0 | return Status::Error<IO_ERROR>("load rowset id: {} failed.", key); |
65 | 0 | } |
66 | 7 | bool ret = rowset_meta->init(value); |
67 | 7 | if (!ret) { |
68 | 0 | return Status::Error<SERIALIZE_PROTOBUF_ERROR>("parse rowset meta failed. rowset id: {}", |
69 | 0 | key); |
70 | 0 | } |
71 | 7 | return Status::OK(); |
72 | 7 | } |
73 | | |
74 | | Status RowsetMetaManager::save(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id, |
75 | 71 | const RowsetMetaPB& rowset_meta_pb, bool enable_binlog) { |
76 | 71 | if (rowset_meta_pb.partition_id() <= 0) { |
77 | 22 | LOG(WARNING) << "invalid partition id " << rowset_meta_pb.partition_id() << " tablet " |
78 | 22 | << rowset_meta_pb.tablet_id(); |
79 | | // TODO(dx): after fix partition id eq 0 bug, fix it |
80 | | // return Status::InternalError("invaid partition id {} tablet {}", |
81 | | // rowset_meta_pb.partition_id(), rowset_meta_pb.tablet_id()); |
82 | 22 | } |
83 | 71 | DBUG_EXECUTE_IF("RowsetMetaManager::save::zero_partition_id", { |
84 | 71 | long partition_id = rowset_meta_pb.partition_id(); |
85 | 71 | auto& rs_pb = const_cast<std::decay_t<decltype(rowset_meta_pb)>&>(rowset_meta_pb); |
86 | 71 | rs_pb.set_partition_id(0); |
87 | 71 | LOG(WARNING) << "set debug point RowsetMetaManager::save::zero_partition_id old=" |
88 | 71 | << partition_id << " new=" << rowset_meta_pb.DebugString(); |
89 | 71 | }); |
90 | 71 | if (enable_binlog) { |
91 | 2 | return _save_with_binlog(meta, tablet_uid, rowset_id, rowset_meta_pb); |
92 | 69 | } else { |
93 | 69 | return _save(meta, tablet_uid, rowset_id, rowset_meta_pb); |
94 | 69 | } |
95 | 71 | } |
96 | | |
97 | | Status RowsetMetaManager::_save(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id, |
98 | 69 | const RowsetMetaPB& rowset_meta_pb) { |
99 | 69 | std::string key = |
100 | 69 | fmt::format("{}{}_{}", ROWSET_PREFIX, tablet_uid.to_string(), rowset_id.to_string()); |
101 | 69 | std::string value; |
102 | 69 | if (!rowset_meta_pb.SerializeToString(&value)) { |
103 | 0 | return Status::Error<SERIALIZE_PROTOBUF_ERROR>("serialize rowset pb failed. rowset id:{}", |
104 | 0 | key); |
105 | 0 | } |
106 | | |
107 | 69 | return meta->put(META_COLUMN_FAMILY_INDEX, key, value); |
108 | 69 | } |
109 | | |
110 | | Status RowsetMetaManager::_save_with_binlog(OlapMeta* meta, TabletUid tablet_uid, |
111 | | const RowsetId& rowset_id, |
112 | 2 | const RowsetMetaPB& rowset_meta_pb) { |
113 | | // create rowset write data |
114 | 2 | std::string rowset_key = |
115 | 2 | fmt::format("{}{}_{}", ROWSET_PREFIX, tablet_uid.to_string(), rowset_id.to_string()); |
116 | 2 | std::string rowset_value; |
117 | 2 | if (!rowset_meta_pb.SerializeToString(&rowset_value)) { |
118 | 0 | return Status::Error<SERIALIZE_PROTOBUF_ERROR>("serialize rowset pb failed. rowset id:{}", |
119 | 0 | rowset_key); |
120 | 0 | } |
121 | | |
122 | | // create binlog write data |
123 | | // binlog_meta_key format: {kBinlogPrefix}meta_{tablet_uid}_{version}_{rowset_id} |
124 | | // binlog_data_key format: {kBinlogPrefix}data_{tablet_uid}_{version}_{rowset_id} |
125 | | // version is formatted to 20 bytes to avoid the problem of sorting, version is lower, timestamp is lower |
126 | | // binlog key is not supported for cumulative rowset |
127 | 2 | if (rowset_meta_pb.start_version() != rowset_meta_pb.end_version()) { |
128 | 0 | return Status::Error<ROWSET_BINLOG_NOT_ONLY_ONE_VERSION>( |
129 | 0 | "binlog key is not supported for cumulative rowset. rowset id:{}", rowset_key); |
130 | 0 | } |
131 | 2 | auto version = rowset_meta_pb.start_version(); |
132 | 2 | std::string binlog_meta_key = make_binlog_meta_key(tablet_uid, version, rowset_id); |
133 | 2 | std::string binlog_data_key = make_binlog_data_key(tablet_uid, version, rowset_id); |
134 | 2 | BinlogMetaEntryPB binlog_meta_entry_pb; |
135 | 2 | binlog_meta_entry_pb.set_version(version); |
136 | 2 | binlog_meta_entry_pb.set_tablet_id(rowset_meta_pb.tablet_id()); |
137 | 2 | binlog_meta_entry_pb.set_rowset_id(rowset_meta_pb.rowset_id()); |
138 | 2 | binlog_meta_entry_pb.set_num_segments(rowset_meta_pb.num_segments()); |
139 | 2 | binlog_meta_entry_pb.set_creation_time(rowset_meta_pb.creation_time()); |
140 | 2 | binlog_meta_entry_pb.set_rowset_id_v2(rowset_meta_pb.rowset_id_v2()); |
141 | 2 | std::string binlog_meta_value; |
142 | 2 | if (!binlog_meta_entry_pb.SerializeToString(&binlog_meta_value)) { |
143 | 0 | return Status::Error<SERIALIZE_PROTOBUF_ERROR>("serialize binlog pb failed. rowset id:{}", |
144 | 0 | binlog_meta_key); |
145 | 0 | } |
146 | | |
147 | | // create batch entries |
148 | 2 | std::vector<OlapMeta::BatchEntry> entries = { |
149 | 2 | {std::cref(rowset_key), std::cref(rowset_value)}, |
150 | 2 | {std::cref(binlog_meta_key), std::cref(binlog_meta_value)}, |
151 | 2 | {std::cref(binlog_data_key), std::cref(rowset_value)}}; |
152 | | |
153 | 2 | return meta->put(META_COLUMN_FAMILY_INDEX, entries); |
154 | 2 | } |
155 | | |
156 | | std::vector<std::string> RowsetMetaManager::get_binlog_filenames(OlapMeta* meta, |
157 | | TabletUid tablet_uid, |
158 | | std::string_view binlog_version, |
159 | 0 | int64_t segment_idx) { |
160 | 0 | auto prefix_key = make_binlog_filename_key(tablet_uid, binlog_version); |
161 | 0 | VLOG_DEBUG << fmt::format("prefix_key:{}", prefix_key); |
162 | |
|
163 | 0 | std::vector<std::string> binlog_files; |
164 | 0 | std::string rowset_id; |
165 | 0 | int64_t num_segments = -1; |
166 | 0 | auto traverse_func = [&rowset_id, &num_segments](std::string_view key, |
167 | 0 | std::string_view value) -> bool { |
168 | 0 | VLOG_DEBUG << fmt::format("key:{}, value:{}", key, value); |
169 | | // key is 'binlog_meta_6943f1585fe834b5-e542c2b83a21d0b7_00000000000000000069_020000000000000135449d7cd7eadfe672aa0f928fa99593', extract last part '020000000000000135449d7cd7eadfe672aa0f928fa99593' |
170 | | // check starts with "binlog_meta_" |
171 | 0 | if (!starts_with_binlog_meta(key)) { |
172 | 0 | LOG(WARNING) << fmt::format("invalid binlog meta key:{}", key); |
173 | 0 | return false; |
174 | 0 | } |
175 | 0 | if (auto pos = key.rfind('_'); pos == std::string::npos) { |
176 | 0 | LOG(WARNING) << fmt::format("invalid binlog meta key:{}", key); |
177 | 0 | return false; |
178 | 0 | } else { |
179 | 0 | rowset_id = key.substr(pos + 1); |
180 | 0 | } |
181 | | |
182 | 0 | BinlogMetaEntryPB binlog_meta_entry_pb; |
183 | 0 | if (!binlog_meta_entry_pb.ParseFromArray(value.data(), cast_set<int32_t>(value.size()))) { |
184 | 0 | LOG(WARNING) << fmt::format("invalid binlog meta value:{}", value); |
185 | 0 | return false; |
186 | 0 | } |
187 | 0 | num_segments = binlog_meta_entry_pb.num_segments(); |
188 | |
|
189 | 0 | return false; |
190 | 0 | }; |
191 | | |
192 | | // get binlog meta by prefix |
193 | 0 | Status status = meta->iterate(META_COLUMN_FAMILY_INDEX, prefix_key, traverse_func); |
194 | 0 | if (!status.ok() || rowset_id.empty() || num_segments < 0) { |
195 | 0 | LOG(WARNING) << fmt::format( |
196 | 0 | "fail to get binlog filename. tablet uid:{}, binlog version:{}, status:{}, " |
197 | 0 | "rowset_id:{}, num_segments:{}", |
198 | 0 | tablet_uid.to_string(), binlog_version, status.to_string(), rowset_id, |
199 | 0 | num_segments); |
200 | 0 | } |
201 | | |
202 | | // construct binlog_files list |
203 | 0 | if (segment_idx >= num_segments) { |
204 | 0 | LOG(WARNING) << fmt::format("invalid segment idx:{}, num_segments:{}", segment_idx, |
205 | 0 | num_segments); |
206 | 0 | return binlog_files; |
207 | 0 | } |
208 | 0 | for (int64_t i = 0; i < num_segments; ++i) { |
209 | | // TODO(Drogon): Update to filesystem path |
210 | 0 | auto segment_file = fmt::format("{}_{}.dat", rowset_id, i); |
211 | 0 | binlog_files.emplace_back(std::move(segment_file)); |
212 | 0 | } |
213 | 0 | return binlog_files; |
214 | 0 | } |
215 | | |
216 | | std::pair<std::string, int64_t> RowsetMetaManager::get_binlog_info( |
217 | 0 | OlapMeta* meta, TabletUid tablet_uid, std::string_view binlog_version) { |
218 | 0 | VLOG_DEBUG << fmt::format("tablet_uid:{}, binlog_version:{}", tablet_uid.to_string(), |
219 | 0 | binlog_version); |
220 | 0 | auto prefix_key = make_binlog_filename_key(tablet_uid, binlog_version); |
221 | 0 | VLOG_DEBUG << fmt::format("prefix_key:{}", prefix_key); |
222 | |
|
223 | 0 | std::string rowset_id; |
224 | 0 | int64_t num_segments = -1; |
225 | 0 | auto traverse_func = [&rowset_id, &num_segments](std::string_view key, |
226 | 0 | std::string_view value) -> bool { |
227 | 0 | VLOG_DEBUG << fmt::format("key:{}, value:{}", key, value); |
228 | | // key is 'binlog_meta_6943f1585fe834b5-e542c2b83a21d0b7_00000000000000000069_020000000000000135449d7cd7eadfe672aa0f928fa99593', extract last part '020000000000000135449d7cd7eadfe672aa0f928fa99593' |
229 | 0 | auto pos = key.rfind('_'); |
230 | 0 | if (pos == std::string::npos) { |
231 | 0 | LOG(WARNING) << fmt::format("invalid binlog meta key:{}", key); |
232 | 0 | return false; |
233 | 0 | } |
234 | 0 | rowset_id = key.substr(pos + 1); |
235 | |
|
236 | 0 | BinlogMetaEntryPB binlog_meta_entry_pb; |
237 | 0 | binlog_meta_entry_pb.ParseFromArray(value.data(), cast_set<int32_t>(value.size())); |
238 | 0 | num_segments = binlog_meta_entry_pb.num_segments(); |
239 | |
|
240 | 0 | return false; |
241 | 0 | }; |
242 | | |
243 | | // get binlog meta by prefix |
244 | 0 | Status status = meta->iterate(META_COLUMN_FAMILY_INDEX, prefix_key, traverse_func); |
245 | 0 | if (!status.ok() || rowset_id.empty() || num_segments < 0) { |
246 | 0 | LOG(WARNING) << fmt::format( |
247 | 0 | "fail to get binlog filename. tablet uid:{}, binlog version:{}, status:{}, " |
248 | 0 | "rowset_id:{}, num_segments:{}", |
249 | 0 | tablet_uid.to_string(), binlog_version, status.to_string(), rowset_id, |
250 | 0 | num_segments); |
251 | 0 | } |
252 | |
|
253 | 0 | return std::make_pair(rowset_id, num_segments); |
254 | 0 | } |
255 | | |
256 | | std::string RowsetMetaManager::get_rowset_binlog_meta(OlapMeta* meta, TabletUid tablet_uid, |
257 | | std::string_view binlog_version, |
258 | 0 | std::string_view rowset_id) { |
259 | 0 | auto binlog_data_key = make_binlog_data_key(tablet_uid.to_string(), binlog_version, rowset_id); |
260 | 0 | VLOG_DEBUG << fmt::format("get binlog_meta_key:{}", binlog_data_key); |
261 | |
|
262 | 0 | std::string binlog_meta_value; |
263 | 0 | Status status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_meta_value); |
264 | 0 | if (!status.ok()) { |
265 | 0 | LOG(WARNING) << fmt::format( |
266 | 0 | "fail to get binlog meta. tablet uid:{}, binlog version:{}, " |
267 | 0 | "rowset_id:{}, status:{}", |
268 | 0 | tablet_uid.to_string(), binlog_version, rowset_id, status.to_string()); |
269 | 0 | return ""; |
270 | 0 | } |
271 | 0 | return binlog_meta_value; |
272 | 0 | } |
273 | | |
274 | | Status RowsetMetaManager::get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid, |
275 | | const std::vector<int64_t>& binlog_versions, |
276 | 0 | RowsetBinlogMetasPB* metas_pb) { |
277 | 0 | if (binlog_versions.empty()) { |
278 | 0 | return _get_all_rowset_binlog_metas(meta, tablet_uid, metas_pb); |
279 | 0 | } else { |
280 | 0 | return _get_rowset_binlog_metas(meta, tablet_uid, binlog_versions, metas_pb); |
281 | 0 | } |
282 | 0 | } |
283 | | |
284 | | Status RowsetMetaManager::_get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid, |
285 | | const std::vector<int64_t>& binlog_versions, |
286 | 0 | RowsetBinlogMetasPB* metas_pb) { |
287 | 0 | Status status; |
288 | 0 | auto tablet_uid_str = tablet_uid.to_string(); |
289 | 0 | auto traverse_func = [meta, metas_pb, &status, &tablet_uid_str]( |
290 | 0 | std::string_view key, std::string_view value) -> bool { |
291 | 0 | VLOG_DEBUG << fmt::format("key:{}, value:{}", key, value); |
292 | 0 | if (!starts_with_binlog_meta(key)) { |
293 | 0 | auto err_msg = fmt::format("invalid binlog meta key:{}", key); |
294 | 0 | status = Status::InternalError(err_msg); |
295 | 0 | LOG(WARNING) << err_msg; |
296 | 0 | return false; |
297 | 0 | } |
298 | | |
299 | 0 | BinlogMetaEntryPB binlog_meta_entry_pb; |
300 | 0 | if (!binlog_meta_entry_pb.ParseFromArray(value.data(), cast_set<int32_t>(value.size()))) { |
301 | 0 | auto err_msg = fmt::format("fail to parse binlog meta value:{}", value); |
302 | 0 | status = Status::InternalError(err_msg); |
303 | 0 | LOG(WARNING) << err_msg; |
304 | 0 | return false; |
305 | 0 | } |
306 | 0 | auto& rowset_id = binlog_meta_entry_pb.rowset_id_v2(); |
307 | |
|
308 | 0 | auto binlog_meta_pb = metas_pb->add_rowset_binlog_metas(); |
309 | 0 | binlog_meta_pb->set_rowset_id(rowset_id); |
310 | 0 | binlog_meta_pb->set_version(binlog_meta_entry_pb.version()); |
311 | 0 | binlog_meta_pb->set_num_segments(binlog_meta_entry_pb.num_segments()); |
312 | 0 | binlog_meta_pb->set_meta_key(std::string {key}); |
313 | 0 | binlog_meta_pb->set_meta(std::string {value}); |
314 | |
|
315 | 0 | auto binlog_data_key = |
316 | 0 | make_binlog_data_key(tablet_uid_str, binlog_meta_entry_pb.version(), rowset_id); |
317 | 0 | std::string binlog_data; |
318 | 0 | status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_data); |
319 | 0 | if (!status.ok()) { |
320 | 0 | LOG(WARNING) << status.to_string(); |
321 | 0 | return false; |
322 | 0 | } |
323 | 0 | binlog_meta_pb->set_data_key(binlog_data_key); |
324 | 0 | binlog_meta_pb->set_data(binlog_data); |
325 | |
|
326 | 0 | return false; |
327 | 0 | }; |
328 | |
|
329 | 0 | for (auto& binlog_version : binlog_versions) { |
330 | 0 | auto prefix_key = make_binlog_meta_key_prefix(tablet_uid, binlog_version); |
331 | 0 | Status iterStatus = meta->iterate(META_COLUMN_FAMILY_INDEX, prefix_key, traverse_func); |
332 | 0 | if (!iterStatus.ok()) { |
333 | 0 | LOG(WARNING) << fmt::format("fail to iterate binlog meta. prefix_key:{}, status:{}", |
334 | 0 | prefix_key, iterStatus.to_string()); |
335 | 0 | return iterStatus; |
336 | 0 | } |
337 | 0 | if (!status.ok()) { |
338 | 0 | return status; |
339 | 0 | } |
340 | 0 | } |
341 | 0 | return status; |
342 | 0 | } |
343 | | |
344 | | Status RowsetMetaManager::get_rowset_binlog_metas(OlapMeta* meta, TabletUid tablet_uid, |
345 | 4 | Version version, RowsetBinlogMetasPB* metas_pb) { |
346 | 4 | Status status; |
347 | 4 | auto tablet_uid_str = tablet_uid.to_string(); |
348 | 4 | auto prefix_key = make_binlog_meta_key_prefix(tablet_uid); |
349 | 4 | auto begin_key = make_binlog_meta_key_prefix(tablet_uid, version.first); |
350 | 4 | auto end_key = make_binlog_meta_key_prefix(tablet_uid, version.second + 1); |
351 | 4 | auto traverse_func = [meta, metas_pb, &status, &tablet_uid_str, &end_key]( |
352 | 4 | std::string_view key, std::string_view value) -> bool { |
353 | 0 | VLOG_DEBUG << fmt::format("get rowset binlog metas, key={}, value={}", key, value); |
354 | 0 | if (key.compare(end_key) > 0) { // the binlog meta key is binary comparable. |
355 | | // All binlog meta has been scanned |
356 | 0 | return false; |
357 | 0 | } |
358 | | |
359 | 0 | if (!starts_with_binlog_meta(key)) { |
360 | 0 | auto err_msg = fmt::format("invalid binlog meta key:{}", key); |
361 | 0 | status = Status::InternalError(err_msg); |
362 | 0 | LOG(WARNING) << err_msg; |
363 | 0 | return false; |
364 | 0 | } |
365 | | |
366 | 0 | BinlogMetaEntryPB binlog_meta_entry_pb; |
367 | 0 | if (!binlog_meta_entry_pb.ParseFromArray(value.data(), cast_set<int32_t>(value.size()))) { |
368 | 0 | auto err_msg = fmt::format("fail to parse binlog meta value:{}", value); |
369 | 0 | status = Status::InternalError(err_msg); |
370 | 0 | LOG(WARNING) << err_msg; |
371 | 0 | return false; |
372 | 0 | } |
373 | | |
374 | 0 | const auto& rowset_id = binlog_meta_entry_pb.rowset_id_v2(); |
375 | 0 | auto* binlog_meta_pb = metas_pb->add_rowset_binlog_metas(); |
376 | 0 | binlog_meta_pb->set_rowset_id(rowset_id); |
377 | 0 | binlog_meta_pb->set_version(binlog_meta_entry_pb.version()); |
378 | 0 | binlog_meta_pb->set_num_segments(binlog_meta_entry_pb.num_segments()); |
379 | 0 | binlog_meta_pb->set_meta_key(std::string {key}); |
380 | 0 | binlog_meta_pb->set_meta(std::string {value}); |
381 | |
|
382 | 0 | auto binlog_data_key = |
383 | 0 | make_binlog_data_key(tablet_uid_str, binlog_meta_entry_pb.version(), rowset_id); |
384 | 0 | std::string binlog_data; |
385 | 0 | status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_data); |
386 | 0 | if (!status.ok()) { |
387 | 0 | LOG(WARNING) << status.to_string(); |
388 | 0 | return false; |
389 | 0 | } |
390 | 0 | binlog_meta_pb->set_data_key(binlog_data_key); |
391 | 0 | binlog_meta_pb->set_data(binlog_data); |
392 | |
|
393 | 0 | return true; |
394 | 0 | }; |
395 | | |
396 | 4 | Status iterStatus = |
397 | 4 | meta->iterate(META_COLUMN_FAMILY_INDEX, begin_key, prefix_key, traverse_func); |
398 | 4 | if (!iterStatus.ok()) { |
399 | 0 | LOG(WARNING) << fmt::format( |
400 | 0 | "fail to iterate binlog meta. prefix_key:{}, version:{}, status:{}", prefix_key, |
401 | 0 | version.to_string(), iterStatus.to_string()); |
402 | 0 | return iterStatus; |
403 | 0 | } |
404 | 4 | return status; |
405 | 4 | } |
406 | | |
407 | | Status RowsetMetaManager::_get_all_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid, |
408 | 0 | RowsetBinlogMetasPB* metas_pb) { |
409 | 0 | Status status; |
410 | 0 | auto tablet_uid_str = tablet_uid.to_string(); |
411 | 0 | int64_t tablet_id = 0; |
412 | 0 | auto traverse_func = [meta, metas_pb, &status, &tablet_uid_str, &tablet_id]( |
413 | 0 | std::string_view key, std::string_view value) -> bool { |
414 | 0 | VLOG_DEBUG << fmt::format("key:{}, value:{}", key, value); |
415 | 0 | if (!starts_with_binlog_meta(key)) { |
416 | 0 | LOG(INFO) << fmt::format("end scan binlog meta. key:{}", key); |
417 | 0 | return false; |
418 | 0 | } |
419 | | |
420 | 0 | BinlogMetaEntryPB binlog_meta_entry_pb; |
421 | 0 | if (!binlog_meta_entry_pb.ParseFromArray(value.data(), cast_set<int32_t>(value.size()))) { |
422 | 0 | auto err_msg = fmt::format("fail to parse binlog meta value:{}", value); |
423 | 0 | status = Status::InternalError(err_msg); |
424 | 0 | LOG(WARNING) << err_msg; |
425 | 0 | return false; |
426 | 0 | } |
427 | 0 | if (tablet_id == 0) { |
428 | 0 | tablet_id = binlog_meta_entry_pb.tablet_id(); |
429 | 0 | } else if (tablet_id != binlog_meta_entry_pb.tablet_id()) { |
430 | | // scan all binlog meta, so tablet_id should be same: |
431 | 0 | return false; |
432 | 0 | } |
433 | 0 | auto& rowset_id = binlog_meta_entry_pb.rowset_id_v2(); |
434 | |
|
435 | 0 | auto binlog_meta_pb = metas_pb->add_rowset_binlog_metas(); |
436 | 0 | binlog_meta_pb->set_rowset_id(rowset_id); |
437 | 0 | binlog_meta_pb->set_version(binlog_meta_entry_pb.version()); |
438 | 0 | binlog_meta_pb->set_num_segments(binlog_meta_entry_pb.num_segments()); |
439 | 0 | binlog_meta_pb->set_meta_key(std::string {key}); |
440 | 0 | binlog_meta_pb->set_meta(std::string {value}); |
441 | |
|
442 | 0 | auto binlog_data_key = |
443 | 0 | make_binlog_data_key(tablet_uid_str, binlog_meta_entry_pb.version(), rowset_id); |
444 | 0 | std::string binlog_data; |
445 | 0 | status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_data); |
446 | 0 | if (!status.ok()) { |
447 | 0 | LOG(WARNING) << status; |
448 | 0 | return false; |
449 | 0 | } |
450 | 0 | binlog_meta_pb->set_data_key(binlog_data_key); |
451 | 0 | binlog_meta_pb->set_data(binlog_data); |
452 | |
|
453 | 0 | return true; |
454 | 0 | }; |
455 | |
|
456 | 0 | auto prefix_key = make_binlog_meta_key_prefix(tablet_uid); |
457 | 0 | Status iterStatus = meta->iterate(META_COLUMN_FAMILY_INDEX, prefix_key, traverse_func); |
458 | 0 | if (!iterStatus.ok()) { |
459 | 0 | LOG(WARNING) << fmt::format("fail to iterate binlog meta. prefix_key:{}, status:{}", |
460 | 0 | prefix_key, iterStatus.to_string()); |
461 | 0 | return iterStatus; |
462 | 0 | } |
463 | 0 | return status; |
464 | 0 | } |
465 | | |
466 | 7 | Status RowsetMetaManager::remove(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id) { |
467 | 7 | std::string key = ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string(); |
468 | 7 | VLOG_NOTICE << "start to remove rowset, key:" << key; |
469 | 7 | Status status = meta->remove(META_COLUMN_FAMILY_INDEX, key); |
470 | 7 | VLOG_NOTICE << "remove rowset key:" << key << " finished"; |
471 | 7 | return status; |
472 | 7 | } |
473 | | |
474 | 0 | Status RowsetMetaManager::remove_binlog(OlapMeta* meta, const std::string& suffix) { |
475 | | // Please do not remove std::vector<std::string>, more info refer to pr#23190 |
476 | 0 | return meta->remove(META_COLUMN_FAMILY_INDEX, |
477 | 0 | std::vector<std::string> {kBinlogMetaPrefix.data() + suffix, |
478 | 0 | kBinlogDataPrefix.data() + suffix}); |
479 | 0 | } |
480 | | |
481 | | Status RowsetMetaManager::ingest_binlog_metas(OlapMeta* meta, TabletUid tablet_uid, |
482 | 0 | RowsetBinlogMetasPB* metas_pb) { |
483 | 0 | std::vector<OlapMeta::BatchEntry> entries; |
484 | 0 | const auto tablet_uid_str = tablet_uid.to_string(); |
485 | |
|
486 | 0 | for (auto& rowset_binlog_meta : *metas_pb->mutable_rowset_binlog_metas()) { |
487 | 0 | auto& rowset_id = rowset_binlog_meta.rowset_id(); |
488 | 0 | auto version = rowset_binlog_meta.version(); |
489 | |
|
490 | 0 | auto meta_key = rowset_binlog_meta.mutable_meta_key(); |
491 | 0 | *meta_key = make_binlog_meta_key(tablet_uid_str, version, rowset_id); |
492 | 0 | auto data_key = rowset_binlog_meta.mutable_data_key(); |
493 | 0 | *data_key = make_binlog_data_key(tablet_uid_str, version, rowset_id); |
494 | |
|
495 | 0 | entries.emplace_back(*meta_key, rowset_binlog_meta.meta()); |
496 | 0 | entries.emplace_back(*data_key, rowset_binlog_meta.data()); |
497 | 0 | } |
498 | |
|
499 | 0 | return meta->put(META_COLUMN_FAMILY_INDEX, entries); |
500 | 0 | } |
501 | | |
502 | | Status RowsetMetaManager::traverse_rowset_metas( |
503 | | OlapMeta* meta, |
504 | 45 | std::function<bool(const TabletUid&, const RowsetId&, std::string_view)> const& func) { |
505 | 45 | auto traverse_rowset_meta_func = [&func](std::string_view key, std::string_view value) -> bool { |
506 | 0 | std::vector<std::string> parts; |
507 | | // key format: rst_uuid_rowset_id |
508 | 0 | RETURN_IF_ERROR(split_string(key, '_', &parts)); |
509 | 0 | if (parts.size() != 3) { |
510 | 0 | LOG(WARNING) << "invalid rowset key:" << key << ", splitted size:" << parts.size(); |
511 | 0 | return true; |
512 | 0 | } |
513 | 0 | RowsetId rowset_id; |
514 | 0 | rowset_id.init(parts[2]); |
515 | 0 | std::vector<std::string> uid_parts; |
516 | 0 | RETURN_IF_ERROR(split_string(parts[1], '-', &uid_parts)); |
517 | 0 | TabletUid tablet_uid(uid_parts[0], uid_parts[1]); |
518 | 0 | return func(tablet_uid, rowset_id, value); |
519 | 0 | }; |
520 | 45 | Status status = |
521 | 45 | meta->iterate(META_COLUMN_FAMILY_INDEX, ROWSET_PREFIX, traverse_rowset_meta_func); |
522 | 45 | return status; |
523 | 45 | } |
524 | | |
525 | | Status RowsetMetaManager::traverse_binlog_metas( |
526 | | OlapMeta* meta, |
527 | 0 | std::function<bool(std::string_view, std::string_view, bool)> const& collector) { |
528 | 0 | std::pair<std::string, bool> last_info = std::make_pair(kBinlogMetaPrefix.data(), false); |
529 | 0 | bool seek_found = false; |
530 | 0 | Status status; |
531 | 0 | auto traverse_binlog_meta_func = [&last_info, &seek_found, &collector]( |
532 | 0 | std::string_view key, std::string_view value) -> bool { |
533 | 0 | seek_found = true; |
534 | 0 | auto& [last_prefix, need_collect] = last_info; |
535 | 0 | size_t pos = key.find('_', kBinlogMetaPrefix.size()); |
536 | 0 | if (pos == std::string::npos) { |
537 | 0 | LOG(WARNING) << "invalid binlog meta key: " << key; |
538 | 0 | return true; |
539 | 0 | } |
540 | 0 | std::string_view key_view(key.data(), pos); |
541 | 0 | std::string_view last_prefix_view(last_prefix.data(), last_prefix.size() - 1); |
542 | |
|
543 | 0 | if (last_prefix_view != key_view) { |
544 | 0 | need_collect = collector(key, value, true); |
545 | 0 | last_prefix = std::string(key_view) + "~"; |
546 | 0 | } else if (need_collect) { |
547 | 0 | collector(key, value, false); |
548 | 0 | } |
549 | |
|
550 | 0 | return need_collect; |
551 | 0 | }; |
552 | |
|
553 | 0 | do { |
554 | 0 | seek_found = false; |
555 | 0 | status = meta->iterate(META_COLUMN_FAMILY_INDEX, last_info.first, kBinlogMetaPrefix.data(), |
556 | 0 | traverse_binlog_meta_func); |
557 | 0 | } while (status.ok() && seek_found); |
558 | |
|
559 | 0 | return status; |
560 | 0 | } |
561 | | |
562 | | Status RowsetMetaManager::save_partial_update_info( |
563 | | OlapMeta* meta, int64_t tablet_id, int64_t partition_id, int64_t txn_id, |
564 | 0 | const PartialUpdateInfoPB& partial_update_info_pb) { |
565 | 0 | std::string key = |
566 | 0 | fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, partition_id, txn_id); |
567 | 0 | std::string value; |
568 | 0 | if (!partial_update_info_pb.SerializeToString(&value)) { |
569 | 0 | return Status::Error<SERIALIZE_PROTOBUF_ERROR>( |
570 | 0 | "serialize partial update info failed. key={}", key); |
571 | 0 | } |
572 | 0 | VLOG_NOTICE << "save partial update info, key=" << key << ", value_size=" << value.size(); |
573 | 0 | return meta->put(META_COLUMN_FAMILY_INDEX, key, value); |
574 | 0 | } |
575 | | |
576 | | Status RowsetMetaManager::try_get_partial_update_info(OlapMeta* meta, int64_t tablet_id, |
577 | | int64_t partition_id, int64_t txn_id, |
578 | 0 | PartialUpdateInfoPB* partial_update_info_pb) { |
579 | 0 | std::string key = |
580 | 0 | fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, partition_id, txn_id); |
581 | 0 | std::string value; |
582 | 0 | Status status = meta->get(META_COLUMN_FAMILY_INDEX, key, &value); |
583 | 0 | if (status.is<META_KEY_NOT_FOUND>()) { |
584 | 0 | return status; |
585 | 0 | } |
586 | 0 | if (!status.ok()) { |
587 | 0 | LOG_WARNING("failed to get partial update info. tablet_id={}, partition_id={}, txn_id={}", |
588 | 0 | tablet_id, partition_id, txn_id); |
589 | 0 | return status; |
590 | 0 | } |
591 | 0 | if (!partial_update_info_pb->ParseFromString(value)) { |
592 | 0 | return Status::Error<ErrorCode::PARSE_PROTOBUF_ERROR>( |
593 | 0 | "fail to parse partial update info content to protobuf object. tablet_id={}, " |
594 | 0 | "partition_id={}, txn_id={}", |
595 | 0 | tablet_id, partition_id, txn_id); |
596 | 0 | } |
597 | 0 | return Status::OK(); |
598 | 0 | } |
599 | | |
600 | | Status RowsetMetaManager::traverse_partial_update_info( |
601 | | OlapMeta* meta, |
602 | 0 | std::function<bool(int64_t, int64_t, int64_t, std::string_view)> const& func) { |
603 | 0 | auto traverse_partial_update_info_func = [&func](std::string_view key, |
604 | 0 | std::string_view value) -> bool { |
605 | 0 | std::vector<std::string> parts; |
606 | | // key format: pui_{tablet_id}_{partition_id}_{txn_id} |
607 | 0 | RETURN_IF_ERROR(split_string(key, '_', &parts)); |
608 | 0 | if (parts.size() != 4) { |
609 | 0 | LOG_WARNING("invalid rowset key={}, splitted size={}", key, parts.size()); |
610 | 0 | return true; |
611 | 0 | } |
612 | 0 | int64_t tablet_id = std::stoll(parts[1]); |
613 | 0 | int64_t partition_id = std::stoll(parts[2]); |
614 | 0 | int64_t txn_id = std::stoll(parts[3]); |
615 | 0 | return func(tablet_id, partition_id, txn_id, value); |
616 | 0 | }; |
617 | 0 | return meta->iterate(META_COLUMN_FAMILY_INDEX, PARTIAL_UPDATE_INFO_PREFIX, |
618 | 0 | traverse_partial_update_info_func); |
619 | 0 | } |
620 | | |
621 | | Status RowsetMetaManager::remove_partial_update_info(OlapMeta* meta, int64_t tablet_id, |
622 | 0 | int64_t partition_id, int64_t txn_id) { |
623 | 0 | std::string key = |
624 | 0 | fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, partition_id, txn_id); |
625 | 0 | Status res = meta->remove(META_COLUMN_FAMILY_INDEX, key); |
626 | 0 | VLOG_NOTICE << "remove partial update info, key=" << key; |
627 | 0 | return res; |
628 | 0 | } |
629 | | |
630 | | Status RowsetMetaManager::remove_partial_update_infos( |
631 | 0 | OlapMeta* meta, const std::vector<std::tuple<int64_t, int64_t, int64_t>>& keys) { |
632 | 0 | std::vector<std::string> remove_keys; |
633 | 0 | for (auto [tablet_id, partition_id, txn_id] : keys) { |
634 | 0 | remove_keys.push_back(fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, |
635 | 0 | partition_id, txn_id)); |
636 | 0 | } |
637 | 0 | Status res = meta->remove(META_COLUMN_FAMILY_INDEX, remove_keys); |
638 | 0 | VLOG_NOTICE << "remove partial update info, remove_keys.size()=" << remove_keys.size(); |
639 | 0 | return res; |
640 | 0 | } |
641 | | |
642 | | Status RowsetMetaManager::remove_tablet_related_partial_update_info(OlapMeta* meta, |
643 | 0 | int64_t tablet_id) { |
644 | 0 | std::string prefix = fmt::format("{}{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id); |
645 | 0 | std::vector<std::string> remove_keys; |
646 | 0 | auto get_remove_keys_func = [&](std::string_view key, std::string_view val) -> bool { |
647 | 0 | remove_keys.emplace_back(key); |
648 | 0 | return true; |
649 | 0 | }; |
650 | 0 | VLOG_NOTICE << "remove tablet related partial update info, tablet_id: " << tablet_id |
651 | 0 | << " removed keys size: " << remove_keys.size(); |
652 | 0 | RETURN_IF_ERROR(meta->iterate(META_COLUMN_FAMILY_INDEX, prefix, get_remove_keys_func)); |
653 | 0 | return meta->remove(META_COLUMN_FAMILY_INDEX, remove_keys); |
654 | 0 | } |
655 | | } // namespace doris |