be/src/storage/rowset/rowset_meta_manager.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/rowset/rowset_meta_manager.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/olap_file.pb.h> |
22 | | |
23 | | #include <boost/algorithm/string/trim.hpp> |
24 | | #include <fstream> |
25 | | #include <functional> |
26 | | #include <memory> |
27 | | #include <new> |
28 | | #include <string> |
29 | | #include <string_view> |
30 | | #include <vector> |
31 | | |
32 | | #include "common/logging.h" |
33 | | #include "storage/binlog.h" |
34 | | #include "storage/olap_define.h" |
35 | | #include "storage/olap_meta.h" |
36 | | #include "storage/utils.h" |
37 | | #include "util/debug_points.h" |
38 | | |
39 | | namespace doris { |
40 | | #include "common/compile_check_begin.h" |
41 | | using namespace ErrorCode; |
42 | | |
43 | | bool RowsetMetaManager::check_rowset_meta(OlapMeta* meta, TabletUid tablet_uid, |
44 | 0 | const RowsetId& rowset_id) { |
45 | 0 | std::string key = ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string(); |
46 | 0 | std::string value; |
47 | 0 | return meta->key_may_exist(META_COLUMN_FAMILY_INDEX, key, &value); |
48 | 0 | } |
49 | | |
50 | 40 | Status RowsetMetaManager::exists(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id) { |
51 | 40 | std::string key = ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string(); |
52 | 40 | std::string value; |
53 | 40 | return meta->get(META_COLUMN_FAMILY_INDEX, key, &value); |
54 | 40 | } |
55 | | |
56 | | Status RowsetMetaManager::get_rowset_meta(OlapMeta* meta, TabletUid tablet_uid, |
57 | | const RowsetId& rowset_id, |
58 | 9 | RowsetMetaSharedPtr rowset_meta) { |
59 | 9 | std::string key = ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string(); |
60 | 9 | std::string value; |
61 | 9 | Status s = meta->get(META_COLUMN_FAMILY_INDEX, key, &value); |
62 | 9 | if (s.is<META_KEY_NOT_FOUND>()) { |
63 | 3 | return Status::Error<META_KEY_NOT_FOUND>("rowset id: {} not found.", key); |
64 | 6 | } else if (!s.ok()) { |
65 | 0 | return Status::Error<IO_ERROR>("load rowset id: {} failed.", key); |
66 | 0 | } |
67 | 6 | bool ret = rowset_meta->init(value); |
68 | 6 | if (!ret) { |
69 | 0 | return Status::Error<SERIALIZE_PROTOBUF_ERROR>("parse rowset meta failed. rowset id: {}", |
70 | 0 | key); |
71 | 0 | } |
72 | 6 | return Status::OK(); |
73 | 6 | } |
74 | | |
75 | | Status RowsetMetaManager::save(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id, |
76 | 69 | const RowsetMetaPB& rowset_meta_pb, bool enable_binlog) { |
77 | 69 | if (rowset_meta_pb.partition_id() <= 0) { |
78 | 22 | LOG(WARNING) << "invalid partition id " << rowset_meta_pb.partition_id() << " tablet " |
79 | 22 | << rowset_meta_pb.tablet_id(); |
80 | | // TODO(dx): after fix partition id eq 0 bug, fix it |
81 | | // return Status::InternalError("invaid partition id {} tablet {}", |
82 | | // rowset_meta_pb.partition_id(), rowset_meta_pb.tablet_id()); |
83 | 22 | } |
84 | 69 | DBUG_EXECUTE_IF("RowsetMetaManager::save::zero_partition_id", { |
85 | 69 | long partition_id = rowset_meta_pb.partition_id(); |
86 | 69 | auto& rs_pb = const_cast<std::decay_t<decltype(rowset_meta_pb)>&>(rowset_meta_pb); |
87 | 69 | rs_pb.set_partition_id(0); |
88 | 69 | LOG(WARNING) << "set debug point RowsetMetaManager::save::zero_partition_id old=" |
89 | 69 | << partition_id << " new=" << rowset_meta_pb.DebugString(); |
90 | 69 | }); |
91 | 69 | if (enable_binlog) { |
92 | 2 | return _save_with_binlog(meta, tablet_uid, rowset_id, rowset_meta_pb); |
93 | 67 | } else { |
94 | 67 | return _save(meta, tablet_uid, rowset_id, rowset_meta_pb); |
95 | 67 | } |
96 | 69 | } |
97 | | |
98 | | Status RowsetMetaManager::_save(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id, |
99 | 67 | const RowsetMetaPB& rowset_meta_pb) { |
100 | 67 | std::string key = |
101 | 67 | fmt::format("{}{}_{}", ROWSET_PREFIX, tablet_uid.to_string(), rowset_id.to_string()); |
102 | 67 | std::string value; |
103 | 67 | if (!rowset_meta_pb.SerializeToString(&value)) { |
104 | 0 | return Status::Error<SERIALIZE_PROTOBUF_ERROR>("serialize rowset pb failed. rowset id:{}", |
105 | 0 | key); |
106 | 0 | } |
107 | | |
108 | 67 | return meta->put(META_COLUMN_FAMILY_INDEX, key, value); |
109 | 67 | } |
110 | | |
111 | | Status RowsetMetaManager::_save_with_binlog(OlapMeta* meta, TabletUid tablet_uid, |
112 | | const RowsetId& rowset_id, |
113 | 2 | const RowsetMetaPB& rowset_meta_pb) { |
114 | | // create rowset write data |
115 | 2 | std::string rowset_key = |
116 | 2 | fmt::format("{}{}_{}", ROWSET_PREFIX, tablet_uid.to_string(), rowset_id.to_string()); |
117 | 2 | std::string rowset_value; |
118 | 2 | if (!rowset_meta_pb.SerializeToString(&rowset_value)) { |
119 | 0 | return Status::Error<SERIALIZE_PROTOBUF_ERROR>("serialize rowset pb failed. rowset id:{}", |
120 | 0 | rowset_key); |
121 | 0 | } |
122 | | |
123 | | // create binlog write data |
124 | | // binlog_meta_key format: {kBinlogPrefix}meta_{tablet_uid}_{version}_{rowset_id} |
125 | | // binlog_data_key format: {kBinlogPrefix}data_{tablet_uid}_{version}_{rowset_id} |
126 | | // version is formatted to 20 bytes to avoid the problem of sorting, version is lower, timestamp is lower |
127 | | // binlog key is not supported for cumulative rowset |
128 | 2 | if (rowset_meta_pb.start_version() != rowset_meta_pb.end_version()) { |
129 | 0 | return Status::Error<ROWSET_BINLOG_NOT_ONLY_ONE_VERSION>( |
130 | 0 | "binlog key is not supported for cumulative rowset. rowset id:{}", rowset_key); |
131 | 0 | } |
132 | 2 | auto version = rowset_meta_pb.start_version(); |
133 | 2 | std::string binlog_meta_key = make_binlog_meta_key(tablet_uid, version, rowset_id); |
134 | 2 | std::string binlog_data_key = make_binlog_data_key(tablet_uid, version, rowset_id); |
135 | 2 | BinlogMetaEntryPB binlog_meta_entry_pb; |
136 | 2 | binlog_meta_entry_pb.set_version(version); |
137 | 2 | binlog_meta_entry_pb.set_tablet_id(rowset_meta_pb.tablet_id()); |
138 | 2 | binlog_meta_entry_pb.set_rowset_id(rowset_meta_pb.rowset_id()); |
139 | 2 | binlog_meta_entry_pb.set_num_segments(rowset_meta_pb.num_segments()); |
140 | 2 | binlog_meta_entry_pb.set_creation_time(rowset_meta_pb.creation_time()); |
141 | 2 | binlog_meta_entry_pb.set_rowset_id_v2(rowset_meta_pb.rowset_id_v2()); |
142 | 2 | std::string binlog_meta_value; |
143 | 2 | if (!binlog_meta_entry_pb.SerializeToString(&binlog_meta_value)) { |
144 | 0 | return Status::Error<SERIALIZE_PROTOBUF_ERROR>("serialize binlog pb failed. rowset id:{}", |
145 | 0 | binlog_meta_key); |
146 | 0 | } |
147 | | |
148 | | // create batch entries |
149 | 2 | std::vector<OlapMeta::BatchEntry> entries = { |
150 | 2 | {std::cref(rowset_key), std::cref(rowset_value)}, |
151 | 2 | {std::cref(binlog_meta_key), std::cref(binlog_meta_value)}, |
152 | 2 | {std::cref(binlog_data_key), std::cref(rowset_value)}}; |
153 | | |
154 | 2 | return meta->put(META_COLUMN_FAMILY_INDEX, entries); |
155 | 2 | } |
156 | | |
157 | | std::vector<std::string> RowsetMetaManager::get_binlog_filenames(OlapMeta* meta, |
158 | | TabletUid tablet_uid, |
159 | | std::string_view binlog_version, |
160 | 0 | int64_t segment_idx) { |
161 | 0 | auto prefix_key = make_binlog_filename_key(tablet_uid, binlog_version); |
162 | 0 | VLOG_DEBUG << fmt::format("prefix_key:{}", prefix_key); |
163 | |
|
164 | 0 | std::vector<std::string> binlog_files; |
165 | 0 | std::string rowset_id; |
166 | 0 | int64_t num_segments = -1; |
167 | 0 | auto traverse_func = [&rowset_id, &num_segments](std::string_view key, |
168 | 0 | std::string_view value) -> bool { |
169 | 0 | VLOG_DEBUG << fmt::format("key:{}, value:{}", key, value); |
170 | | // key is 'binlog_meta_6943f1585fe834b5-e542c2b83a21d0b7_00000000000000000069_020000000000000135449d7cd7eadfe672aa0f928fa99593', extract last part '020000000000000135449d7cd7eadfe672aa0f928fa99593' |
171 | | // check starts with "binlog_meta_" |
172 | 0 | if (!starts_with_binlog_meta(key)) { |
173 | 0 | LOG(WARNING) << fmt::format("invalid binlog meta key:{}", key); |
174 | 0 | return false; |
175 | 0 | } |
176 | 0 | if (auto pos = key.rfind('_'); pos == std::string::npos) { |
177 | 0 | LOG(WARNING) << fmt::format("invalid binlog meta key:{}", key); |
178 | 0 | return false; |
179 | 0 | } else { |
180 | 0 | rowset_id = key.substr(pos + 1); |
181 | 0 | } |
182 | | |
183 | 0 | BinlogMetaEntryPB binlog_meta_entry_pb; |
184 | 0 | if (!binlog_meta_entry_pb.ParseFromArray(value.data(), cast_set<int32_t>(value.size()))) { |
185 | 0 | LOG(WARNING) << fmt::format("invalid binlog meta value:{}", value); |
186 | 0 | return false; |
187 | 0 | } |
188 | 0 | num_segments = binlog_meta_entry_pb.num_segments(); |
189 | |
|
190 | 0 | return false; |
191 | 0 | }; |
192 | | |
193 | | // get binlog meta by prefix |
194 | 0 | Status status = meta->iterate(META_COLUMN_FAMILY_INDEX, prefix_key, traverse_func); |
195 | 0 | if (!status.ok() || rowset_id.empty() || num_segments < 0) { |
196 | 0 | LOG(WARNING) << fmt::format( |
197 | 0 | "fail to get binlog filename. tablet uid:{}, binlog version:{}, status:{}, " |
198 | 0 | "rowset_id:{}, num_segments:{}", |
199 | 0 | tablet_uid.to_string(), binlog_version, status.to_string(), rowset_id, |
200 | 0 | num_segments); |
201 | 0 | } |
202 | | |
203 | | // construct binlog_files list |
204 | 0 | if (segment_idx >= num_segments) { |
205 | 0 | LOG(WARNING) << fmt::format("invalid segment idx:{}, num_segments:{}", segment_idx, |
206 | 0 | num_segments); |
207 | 0 | return binlog_files; |
208 | 0 | } |
209 | 0 | for (int64_t i = 0; i < num_segments; ++i) { |
210 | | // TODO(Drogon): Update to filesystem path |
211 | 0 | auto segment_file = fmt::format("{}_{}.dat", rowset_id, i); |
212 | 0 | binlog_files.emplace_back(std::move(segment_file)); |
213 | 0 | } |
214 | 0 | return binlog_files; |
215 | 0 | } |
216 | | |
217 | | std::pair<std::string, int64_t> RowsetMetaManager::get_binlog_info( |
218 | 0 | OlapMeta* meta, TabletUid tablet_uid, std::string_view binlog_version) { |
219 | 0 | VLOG_DEBUG << fmt::format("tablet_uid:{}, binlog_version:{}", tablet_uid.to_string(), |
220 | 0 | binlog_version); |
221 | 0 | auto prefix_key = make_binlog_filename_key(tablet_uid, binlog_version); |
222 | 0 | VLOG_DEBUG << fmt::format("prefix_key:{}", prefix_key); |
223 | |
|
224 | 0 | std::string rowset_id; |
225 | 0 | int64_t num_segments = -1; |
226 | 0 | auto traverse_func = [&rowset_id, &num_segments](std::string_view key, |
227 | 0 | std::string_view value) -> bool { |
228 | 0 | VLOG_DEBUG << fmt::format("key:{}, value:{}", key, value); |
229 | | // key is 'binlog_meta_6943f1585fe834b5-e542c2b83a21d0b7_00000000000000000069_020000000000000135449d7cd7eadfe672aa0f928fa99593', extract last part '020000000000000135449d7cd7eadfe672aa0f928fa99593' |
230 | 0 | auto pos = key.rfind('_'); |
231 | 0 | if (pos == std::string::npos) { |
232 | 0 | LOG(WARNING) << fmt::format("invalid binlog meta key:{}", key); |
233 | 0 | return false; |
234 | 0 | } |
235 | 0 | rowset_id = key.substr(pos + 1); |
236 | |
|
237 | 0 | BinlogMetaEntryPB binlog_meta_entry_pb; |
238 | 0 | binlog_meta_entry_pb.ParseFromArray(value.data(), cast_set<int32_t>(value.size())); |
239 | 0 | num_segments = binlog_meta_entry_pb.num_segments(); |
240 | |
|
241 | 0 | return false; |
242 | 0 | }; |
243 | | |
244 | | // get binlog meta by prefix |
245 | 0 | Status status = meta->iterate(META_COLUMN_FAMILY_INDEX, prefix_key, traverse_func); |
246 | 0 | if (!status.ok() || rowset_id.empty() || num_segments < 0) { |
247 | 0 | LOG(WARNING) << fmt::format( |
248 | 0 | "fail to get binlog filename. tablet uid:{}, binlog version:{}, status:{}, " |
249 | 0 | "rowset_id:{}, num_segments:{}", |
250 | 0 | tablet_uid.to_string(), binlog_version, status.to_string(), rowset_id, |
251 | 0 | num_segments); |
252 | 0 | } |
253 | |
|
254 | 0 | return std::make_pair(rowset_id, num_segments); |
255 | 0 | } |
256 | | |
257 | | std::string RowsetMetaManager::get_rowset_binlog_meta(OlapMeta* meta, TabletUid tablet_uid, |
258 | | std::string_view binlog_version, |
259 | 0 | std::string_view rowset_id) { |
260 | 0 | auto binlog_data_key = make_binlog_data_key(tablet_uid.to_string(), binlog_version, rowset_id); |
261 | 0 | VLOG_DEBUG << fmt::format("get binlog_meta_key:{}", binlog_data_key); |
262 | |
|
263 | 0 | std::string binlog_meta_value; |
264 | 0 | Status status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_meta_value); |
265 | 0 | if (!status.ok()) { |
266 | 0 | LOG(WARNING) << fmt::format( |
267 | 0 | "fail to get binlog meta. tablet uid:{}, binlog version:{}, " |
268 | 0 | "rowset_id:{}, status:{}", |
269 | 0 | tablet_uid.to_string(), binlog_version, rowset_id, status.to_string()); |
270 | 0 | return ""; |
271 | 0 | } |
272 | 0 | return binlog_meta_value; |
273 | 0 | } |
274 | | |
275 | | Status RowsetMetaManager::get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid, |
276 | | const std::vector<int64_t>& binlog_versions, |
277 | 0 | RowsetBinlogMetasPB* metas_pb) { |
278 | 0 | if (binlog_versions.empty()) { |
279 | 0 | return _get_all_rowset_binlog_metas(meta, tablet_uid, metas_pb); |
280 | 0 | } else { |
281 | 0 | return _get_rowset_binlog_metas(meta, tablet_uid, binlog_versions, metas_pb); |
282 | 0 | } |
283 | 0 | } |
284 | | |
285 | | Status RowsetMetaManager::_get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid, |
286 | | const std::vector<int64_t>& binlog_versions, |
287 | 0 | RowsetBinlogMetasPB* metas_pb) { |
288 | 0 | Status status; |
289 | 0 | auto tablet_uid_str = tablet_uid.to_string(); |
290 | 0 | auto traverse_func = [meta, metas_pb, &status, &tablet_uid_str]( |
291 | 0 | std::string_view key, std::string_view value) -> bool { |
292 | 0 | VLOG_DEBUG << fmt::format("key:{}, value:{}", key, value); |
293 | 0 | if (!starts_with_binlog_meta(key)) { |
294 | 0 | auto err_msg = fmt::format("invalid binlog meta key:{}", key); |
295 | 0 | status = Status::InternalError(err_msg); |
296 | 0 | LOG(WARNING) << err_msg; |
297 | 0 | return false; |
298 | 0 | } |
299 | | |
300 | 0 | BinlogMetaEntryPB binlog_meta_entry_pb; |
301 | 0 | if (!binlog_meta_entry_pb.ParseFromArray(value.data(), cast_set<int32_t>(value.size()))) { |
302 | 0 | auto err_msg = fmt::format("fail to parse binlog meta value:{}", value); |
303 | 0 | status = Status::InternalError(err_msg); |
304 | 0 | LOG(WARNING) << err_msg; |
305 | 0 | return false; |
306 | 0 | } |
307 | 0 | auto& rowset_id = binlog_meta_entry_pb.rowset_id_v2(); |
308 | |
|
309 | 0 | auto binlog_meta_pb = metas_pb->add_rowset_binlog_metas(); |
310 | 0 | binlog_meta_pb->set_rowset_id(rowset_id); |
311 | 0 | binlog_meta_pb->set_version(binlog_meta_entry_pb.version()); |
312 | 0 | binlog_meta_pb->set_num_segments(binlog_meta_entry_pb.num_segments()); |
313 | 0 | binlog_meta_pb->set_meta_key(std::string {key}); |
314 | 0 | binlog_meta_pb->set_meta(std::string {value}); |
315 | |
|
316 | 0 | auto binlog_data_key = |
317 | 0 | make_binlog_data_key(tablet_uid_str, binlog_meta_entry_pb.version(), rowset_id); |
318 | 0 | std::string binlog_data; |
319 | 0 | status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_data); |
320 | 0 | if (!status.ok()) { |
321 | 0 | LOG(WARNING) << status.to_string(); |
322 | 0 | return false; |
323 | 0 | } |
324 | 0 | binlog_meta_pb->set_data_key(binlog_data_key); |
325 | 0 | binlog_meta_pb->set_data(binlog_data); |
326 | |
|
327 | 0 | return false; |
328 | 0 | }; |
329 | |
|
330 | 0 | for (auto& binlog_version : binlog_versions) { |
331 | 0 | auto prefix_key = make_binlog_meta_key_prefix(tablet_uid, binlog_version); |
332 | 0 | Status iterStatus = meta->iterate(META_COLUMN_FAMILY_INDEX, prefix_key, traverse_func); |
333 | 0 | if (!iterStatus.ok()) { |
334 | 0 | LOG(WARNING) << fmt::format("fail to iterate binlog meta. prefix_key:{}, status:{}", |
335 | 0 | prefix_key, iterStatus.to_string()); |
336 | 0 | return iterStatus; |
337 | 0 | } |
338 | 0 | if (!status.ok()) { |
339 | 0 | return status; |
340 | 0 | } |
341 | 0 | } |
342 | 0 | return status; |
343 | 0 | } |
344 | | |
345 | | Status RowsetMetaManager::get_rowset_binlog_metas(OlapMeta* meta, TabletUid tablet_uid, |
346 | 4 | Version version, RowsetBinlogMetasPB* metas_pb) { |
347 | 4 | Status status; |
348 | 4 | auto tablet_uid_str = tablet_uid.to_string(); |
349 | 4 | auto prefix_key = make_binlog_meta_key_prefix(tablet_uid); |
350 | 4 | auto begin_key = make_binlog_meta_key_prefix(tablet_uid, version.first); |
351 | 4 | auto end_key = make_binlog_meta_key_prefix(tablet_uid, version.second + 1); |
352 | 4 | auto traverse_func = [meta, metas_pb, &status, &tablet_uid_str, &end_key]( |
353 | 4 | std::string_view key, std::string_view value) -> bool { |
354 | 0 | VLOG_DEBUG << fmt::format("get rowset binlog metas, key={}, value={}", key, value); |
355 | 0 | if (key.compare(end_key) > 0) { // the binlog meta key is binary comparable. |
356 | | // All binlog meta has been scanned |
357 | 0 | return false; |
358 | 0 | } |
359 | | |
360 | 0 | if (!starts_with_binlog_meta(key)) { |
361 | 0 | auto err_msg = fmt::format("invalid binlog meta key:{}", key); |
362 | 0 | status = Status::InternalError(err_msg); |
363 | 0 | LOG(WARNING) << err_msg; |
364 | 0 | return false; |
365 | 0 | } |
366 | | |
367 | 0 | BinlogMetaEntryPB binlog_meta_entry_pb; |
368 | 0 | if (!binlog_meta_entry_pb.ParseFromArray(value.data(), cast_set<int32_t>(value.size()))) { |
369 | 0 | auto err_msg = fmt::format("fail to parse binlog meta value:{}", value); |
370 | 0 | status = Status::InternalError(err_msg); |
371 | 0 | LOG(WARNING) << err_msg; |
372 | 0 | return false; |
373 | 0 | } |
374 | | |
375 | 0 | const auto& rowset_id = binlog_meta_entry_pb.rowset_id_v2(); |
376 | 0 | auto* binlog_meta_pb = metas_pb->add_rowset_binlog_metas(); |
377 | 0 | binlog_meta_pb->set_rowset_id(rowset_id); |
378 | 0 | binlog_meta_pb->set_version(binlog_meta_entry_pb.version()); |
379 | 0 | binlog_meta_pb->set_num_segments(binlog_meta_entry_pb.num_segments()); |
380 | 0 | binlog_meta_pb->set_meta_key(std::string {key}); |
381 | 0 | binlog_meta_pb->set_meta(std::string {value}); |
382 | |
|
383 | 0 | auto binlog_data_key = |
384 | 0 | make_binlog_data_key(tablet_uid_str, binlog_meta_entry_pb.version(), rowset_id); |
385 | 0 | std::string binlog_data; |
386 | 0 | status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_data); |
387 | 0 | if (!status.ok()) { |
388 | 0 | LOG(WARNING) << status.to_string(); |
389 | 0 | return false; |
390 | 0 | } |
391 | 0 | binlog_meta_pb->set_data_key(binlog_data_key); |
392 | 0 | binlog_meta_pb->set_data(binlog_data); |
393 | |
|
394 | 0 | return true; |
395 | 0 | }; |
396 | | |
397 | 4 | Status iterStatus = |
398 | 4 | meta->iterate(META_COLUMN_FAMILY_INDEX, begin_key, prefix_key, traverse_func); |
399 | 4 | if (!iterStatus.ok()) { |
400 | 0 | LOG(WARNING) << fmt::format( |
401 | 0 | "fail to iterate binlog meta. prefix_key:{}, version:{}, status:{}", prefix_key, |
402 | 0 | version.to_string(), iterStatus.to_string()); |
403 | 0 | return iterStatus; |
404 | 0 | } |
405 | 4 | return status; |
406 | 4 | } |
407 | | |
408 | | Status RowsetMetaManager::_get_all_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid, |
409 | 0 | RowsetBinlogMetasPB* metas_pb) { |
410 | 0 | Status status; |
411 | 0 | auto tablet_uid_str = tablet_uid.to_string(); |
412 | 0 | int64_t tablet_id = 0; |
413 | 0 | auto traverse_func = [meta, metas_pb, &status, &tablet_uid_str, &tablet_id]( |
414 | 0 | std::string_view key, std::string_view value) -> bool { |
415 | 0 | VLOG_DEBUG << fmt::format("key:{}, value:{}", key, value); |
416 | 0 | if (!starts_with_binlog_meta(key)) { |
417 | 0 | LOG(INFO) << fmt::format("end scan binlog meta. key:{}", key); |
418 | 0 | return false; |
419 | 0 | } |
420 | | |
421 | 0 | BinlogMetaEntryPB binlog_meta_entry_pb; |
422 | 0 | if (!binlog_meta_entry_pb.ParseFromArray(value.data(), cast_set<int32_t>(value.size()))) { |
423 | 0 | auto err_msg = fmt::format("fail to parse binlog meta value:{}", value); |
424 | 0 | status = Status::InternalError(err_msg); |
425 | 0 | LOG(WARNING) << err_msg; |
426 | 0 | return false; |
427 | 0 | } |
428 | 0 | if (tablet_id == 0) { |
429 | 0 | tablet_id = binlog_meta_entry_pb.tablet_id(); |
430 | 0 | } else if (tablet_id != binlog_meta_entry_pb.tablet_id()) { |
431 | | // scan all binlog meta, so tablet_id should be same: |
432 | 0 | return false; |
433 | 0 | } |
434 | 0 | auto& rowset_id = binlog_meta_entry_pb.rowset_id_v2(); |
435 | |
|
436 | 0 | auto binlog_meta_pb = metas_pb->add_rowset_binlog_metas(); |
437 | 0 | binlog_meta_pb->set_rowset_id(rowset_id); |
438 | 0 | binlog_meta_pb->set_version(binlog_meta_entry_pb.version()); |
439 | 0 | binlog_meta_pb->set_num_segments(binlog_meta_entry_pb.num_segments()); |
440 | 0 | binlog_meta_pb->set_meta_key(std::string {key}); |
441 | 0 | binlog_meta_pb->set_meta(std::string {value}); |
442 | |
|
443 | 0 | auto binlog_data_key = |
444 | 0 | make_binlog_data_key(tablet_uid_str, binlog_meta_entry_pb.version(), rowset_id); |
445 | 0 | std::string binlog_data; |
446 | 0 | status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_data); |
447 | 0 | if (!status.ok()) { |
448 | 0 | LOG(WARNING) << status; |
449 | 0 | return false; |
450 | 0 | } |
451 | 0 | binlog_meta_pb->set_data_key(binlog_data_key); |
452 | 0 | binlog_meta_pb->set_data(binlog_data); |
453 | |
|
454 | 0 | return true; |
455 | 0 | }; |
456 | |
|
457 | 0 | auto prefix_key = make_binlog_meta_key_prefix(tablet_uid); |
458 | 0 | Status iterStatus = meta->iterate(META_COLUMN_FAMILY_INDEX, prefix_key, traverse_func); |
459 | 0 | if (!iterStatus.ok()) { |
460 | 0 | LOG(WARNING) << fmt::format("fail to iterate binlog meta. prefix_key:{}, status:{}", |
461 | 0 | prefix_key, iterStatus.to_string()); |
462 | 0 | return iterStatus; |
463 | 0 | } |
464 | 0 | return status; |
465 | 0 | } |
466 | | |
467 | 7 | Status RowsetMetaManager::remove(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id) { |
468 | 7 | std::string key = ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string(); |
469 | 7 | VLOG_NOTICE << "start to remove rowset, key:" << key; |
470 | 7 | Status status = meta->remove(META_COLUMN_FAMILY_INDEX, key); |
471 | 7 | VLOG_NOTICE << "remove rowset key:" << key << " finished"; |
472 | 7 | return status; |
473 | 7 | } |
474 | | |
475 | 0 | Status RowsetMetaManager::remove_binlog(OlapMeta* meta, const std::string& suffix) { |
476 | | // Please do not remove std::vector<std::string>, more info refer to pr#23190 |
477 | 0 | return meta->remove(META_COLUMN_FAMILY_INDEX, |
478 | 0 | std::vector<std::string> {kBinlogMetaPrefix.data() + suffix, |
479 | 0 | kBinlogDataPrefix.data() + suffix}); |
480 | 0 | } |
481 | | |
482 | | Status RowsetMetaManager::ingest_binlog_metas(OlapMeta* meta, TabletUid tablet_uid, |
483 | 0 | RowsetBinlogMetasPB* metas_pb) { |
484 | 0 | std::vector<OlapMeta::BatchEntry> entries; |
485 | 0 | const auto tablet_uid_str = tablet_uid.to_string(); |
486 | |
|
487 | 0 | for (auto& rowset_binlog_meta : *metas_pb->mutable_rowset_binlog_metas()) { |
488 | 0 | auto& rowset_id = rowset_binlog_meta.rowset_id(); |
489 | 0 | auto version = rowset_binlog_meta.version(); |
490 | |
|
491 | 0 | auto meta_key = rowset_binlog_meta.mutable_meta_key(); |
492 | 0 | *meta_key = make_binlog_meta_key(tablet_uid_str, version, rowset_id); |
493 | 0 | auto data_key = rowset_binlog_meta.mutable_data_key(); |
494 | 0 | *data_key = make_binlog_data_key(tablet_uid_str, version, rowset_id); |
495 | |
|
496 | 0 | entries.emplace_back(*meta_key, rowset_binlog_meta.meta()); |
497 | 0 | entries.emplace_back(*data_key, rowset_binlog_meta.data()); |
498 | 0 | } |
499 | |
|
500 | 0 | return meta->put(META_COLUMN_FAMILY_INDEX, entries); |
501 | 0 | } |
502 | | |
503 | | Status RowsetMetaManager::traverse_rowset_metas( |
504 | | OlapMeta* meta, |
505 | 45 | std::function<bool(const TabletUid&, const RowsetId&, std::string_view)> const& func) { |
506 | 45 | auto traverse_rowset_meta_func = [&func](std::string_view key, std::string_view value) -> bool { |
507 | 0 | std::vector<std::string> parts; |
508 | | // key format: rst_uuid_rowset_id |
509 | 0 | RETURN_IF_ERROR(split_string(key, '_', &parts)); |
510 | 0 | if (parts.size() != 3) { |
511 | 0 | LOG(WARNING) << "invalid rowset key:" << key << ", splitted size:" << parts.size(); |
512 | 0 | return true; |
513 | 0 | } |
514 | 0 | RowsetId rowset_id; |
515 | 0 | rowset_id.init(parts[2]); |
516 | 0 | std::vector<std::string> uid_parts; |
517 | 0 | RETURN_IF_ERROR(split_string(parts[1], '-', &uid_parts)); |
518 | 0 | TabletUid tablet_uid(uid_parts[0], uid_parts[1]); |
519 | 0 | return func(tablet_uid, rowset_id, value); |
520 | 0 | }; |
521 | 45 | Status status = |
522 | 45 | meta->iterate(META_COLUMN_FAMILY_INDEX, ROWSET_PREFIX, traverse_rowset_meta_func); |
523 | 45 | return status; |
524 | 45 | } |
525 | | |
526 | | Status RowsetMetaManager::traverse_binlog_metas( |
527 | | OlapMeta* meta, |
528 | 0 | std::function<bool(std::string_view, std::string_view, bool)> const& collector) { |
529 | 0 | std::pair<std::string, bool> last_info = std::make_pair(kBinlogMetaPrefix.data(), false); |
530 | 0 | bool seek_found = false; |
531 | 0 | Status status; |
532 | 0 | auto traverse_binlog_meta_func = [&last_info, &seek_found, &collector]( |
533 | 0 | std::string_view key, std::string_view value) -> bool { |
534 | 0 | seek_found = true; |
535 | 0 | auto& [last_prefix, need_collect] = last_info; |
536 | 0 | size_t pos = key.find('_', kBinlogMetaPrefix.size()); |
537 | 0 | if (pos == std::string::npos) { |
538 | 0 | LOG(WARNING) << "invalid binlog meta key: " << key; |
539 | 0 | return true; |
540 | 0 | } |
541 | 0 | std::string_view key_view(key.data(), pos); |
542 | 0 | std::string_view last_prefix_view(last_prefix.data(), last_prefix.size() - 1); |
543 | |
|
544 | 0 | if (last_prefix_view != key_view) { |
545 | 0 | need_collect = collector(key, value, true); |
546 | 0 | last_prefix = std::string(key_view) + "~"; |
547 | 0 | } else if (need_collect) { |
548 | 0 | collector(key, value, false); |
549 | 0 | } |
550 | |
|
551 | 0 | return need_collect; |
552 | 0 | }; |
553 | |
|
554 | 0 | do { |
555 | 0 | seek_found = false; |
556 | 0 | status = meta->iterate(META_COLUMN_FAMILY_INDEX, last_info.first, kBinlogMetaPrefix.data(), |
557 | 0 | traverse_binlog_meta_func); |
558 | 0 | } while (status.ok() && seek_found); |
559 | |
|
560 | 0 | return status; |
561 | 0 | } |
562 | | |
563 | | Status RowsetMetaManager::save_partial_update_info( |
564 | | OlapMeta* meta, int64_t tablet_id, int64_t partition_id, int64_t txn_id, |
565 | 0 | const PartialUpdateInfoPB& partial_update_info_pb) { |
566 | 0 | std::string key = |
567 | 0 | fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, partition_id, txn_id); |
568 | 0 | std::string value; |
569 | 0 | if (!partial_update_info_pb.SerializeToString(&value)) { |
570 | 0 | return Status::Error<SERIALIZE_PROTOBUF_ERROR>( |
571 | 0 | "serialize partial update info failed. key={}", key); |
572 | 0 | } |
573 | 0 | VLOG_NOTICE << "save partial update info, key=" << key << ", value_size=" << value.size(); |
574 | 0 | return meta->put(META_COLUMN_FAMILY_INDEX, key, value); |
575 | 0 | } |
576 | | |
577 | | Status RowsetMetaManager::try_get_partial_update_info(OlapMeta* meta, int64_t tablet_id, |
578 | | int64_t partition_id, int64_t txn_id, |
579 | 0 | PartialUpdateInfoPB* partial_update_info_pb) { |
580 | 0 | std::string key = |
581 | 0 | fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, partition_id, txn_id); |
582 | 0 | std::string value; |
583 | 0 | Status status = meta->get(META_COLUMN_FAMILY_INDEX, key, &value); |
584 | 0 | if (status.is<META_KEY_NOT_FOUND>()) { |
585 | 0 | return status; |
586 | 0 | } |
587 | 0 | if (!status.ok()) { |
588 | 0 | LOG_WARNING("failed to get partial update info. tablet_id={}, partition_id={}, txn_id={}", |
589 | 0 | tablet_id, partition_id, txn_id); |
590 | 0 | return status; |
591 | 0 | } |
592 | 0 | if (!partial_update_info_pb->ParseFromString(value)) { |
593 | 0 | return Status::Error<ErrorCode::PARSE_PROTOBUF_ERROR>( |
594 | 0 | "fail to parse partial update info content to protobuf object. tablet_id={}, " |
595 | 0 | "partition_id={}, txn_id={}", |
596 | 0 | tablet_id, partition_id, txn_id); |
597 | 0 | } |
598 | 0 | return Status::OK(); |
599 | 0 | } |
600 | | |
601 | | Status RowsetMetaManager::traverse_partial_update_info( |
602 | | OlapMeta* meta, |
603 | 0 | std::function<bool(int64_t, int64_t, int64_t, std::string_view)> const& func) { |
604 | 0 | auto traverse_partial_update_info_func = [&func](std::string_view key, |
605 | 0 | std::string_view value) -> bool { |
606 | 0 | std::vector<std::string> parts; |
607 | | // key format: pui_{tablet_id}_{partition_id}_{txn_id} |
608 | 0 | RETURN_IF_ERROR(split_string(key, '_', &parts)); |
609 | 0 | if (parts.size() != 4) { |
610 | 0 | LOG_WARNING("invalid rowset key={}, splitted size={}", key, parts.size()); |
611 | 0 | return true; |
612 | 0 | } |
613 | 0 | int64_t tablet_id = std::stoll(parts[1]); |
614 | 0 | int64_t partition_id = std::stoll(parts[2]); |
615 | 0 | int64_t txn_id = std::stoll(parts[3]); |
616 | 0 | return func(tablet_id, partition_id, txn_id, value); |
617 | 0 | }; |
618 | 0 | return meta->iterate(META_COLUMN_FAMILY_INDEX, PARTIAL_UPDATE_INFO_PREFIX, |
619 | 0 | traverse_partial_update_info_func); |
620 | 0 | } |
621 | | |
622 | | Status RowsetMetaManager::remove_partial_update_info(OlapMeta* meta, int64_t tablet_id, |
623 | 0 | int64_t partition_id, int64_t txn_id) { |
624 | 0 | std::string key = |
625 | 0 | fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, partition_id, txn_id); |
626 | 0 | Status res = meta->remove(META_COLUMN_FAMILY_INDEX, key); |
627 | 0 | VLOG_NOTICE << "remove partial update info, key=" << key; |
628 | 0 | return res; |
629 | 0 | } |
630 | | |
631 | | Status RowsetMetaManager::remove_partial_update_infos( |
632 | 0 | OlapMeta* meta, const std::vector<std::tuple<int64_t, int64_t, int64_t>>& keys) { |
633 | 0 | std::vector<std::string> remove_keys; |
634 | 0 | for (auto [tablet_id, partition_id, txn_id] : keys) { |
635 | 0 | remove_keys.push_back(fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, |
636 | 0 | partition_id, txn_id)); |
637 | 0 | } |
638 | 0 | Status res = meta->remove(META_COLUMN_FAMILY_INDEX, remove_keys); |
639 | 0 | VLOG_NOTICE << "remove partial update info, remove_keys.size()=" << remove_keys.size(); |
640 | 0 | return res; |
641 | 0 | } |
642 | | |
643 | | Status RowsetMetaManager::remove_tablet_related_partial_update_info(OlapMeta* meta, |
644 | 0 | int64_t tablet_id) { |
645 | 0 | std::string prefix = fmt::format("{}{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id); |
646 | 0 | std::vector<std::string> remove_keys; |
647 | 0 | auto get_remove_keys_func = [&](std::string_view key, std::string_view val) -> bool { |
648 | 0 | remove_keys.emplace_back(key); |
649 | 0 | return true; |
650 | 0 | }; |
651 | 0 | VLOG_NOTICE << "remove tablet related partial update info, tablet_id: " << tablet_id |
652 | 0 | << " removed keys size: " << remove_keys.size(); |
653 | 0 | RETURN_IF_ERROR(meta->iterate(META_COLUMN_FAMILY_INDEX, prefix, get_remove_keys_func)); |
654 | 0 | return meta->remove(META_COLUMN_FAMILY_INDEX, remove_keys); |
655 | 0 | } |
656 | | #include "common/compile_check_end.h" |
657 | | } // namespace doris |