be/src/storage/txn/txn_manager.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/txn/txn_manager.h" |
19 | | |
20 | | #include <bvar/bvar.h> |
21 | | #include <fmt/format.h> |
22 | | #include <fmt/ranges.h> |
23 | | #include <thrift/protocol/TDebugProtocol.h> |
24 | | #include <time.h> |
25 | | |
26 | | #include <filesystem> |
27 | | #include <iterator> |
28 | | #include <list> |
29 | | #include <new> |
30 | | #include <ostream> |
31 | | #include <queue> |
32 | | #include <set> |
33 | | #include <string> |
34 | | |
35 | | #include "common/config.h" |
36 | | #include "common/logging.h" |
37 | | #include "common/status.h" |
38 | | #include "load/delta_writer/delta_writer.h" |
39 | | #include "storage/data_dir.h" |
40 | | #include "storage/olap_common.h" |
41 | | #include "storage/partial_update_info.h" |
42 | | #include "storage/rowset/beta_rowset.h" |
43 | | #include "storage/rowset/pending_rowset_helper.h" |
44 | | #include "storage/rowset/rowset_meta.h" |
45 | | #include "storage/rowset/rowset_meta_manager.h" |
46 | | #include "storage/schema_change/schema_change.h" |
47 | | #include "storage/segment/segment_loader.h" |
48 | | #include "storage/storage_engine.h" |
49 | | #include "storage/tablet/tablet_manager.h" |
50 | | #include "storage/tablet/tablet_meta.h" |
51 | | #include "storage/tablet/tablet_meta_manager.h" |
52 | | #include "storage/task/engine_publish_version_task.h" |
53 | | #include "util/debug_points.h" |
54 | | #include "util/time.h" |
55 | | |
56 | | namespace doris { |
57 | | class OlapMeta; |
58 | | } // namespace doris |
59 | | |
60 | | using std::map; |
61 | | using std::pair; |
62 | | using std::set; |
63 | | using std::string; |
64 | | using std::stringstream; |
65 | | using std::vector; |
66 | | |
67 | | namespace doris { |
68 | | using namespace ErrorCode; |
69 | | |
70 | | bvar::Adder<int64_t> g_tablet_txn_info_txn_partitions_count("tablet_txn_info_txn_partitions_count"); |
71 | | |
72 | | TxnManager::TxnManager(StorageEngine& engine, int32_t txn_map_shard_size, int32_t txn_shard_size) |
73 | 349 | : _engine(engine), |
74 | 349 | _txn_map_shard_size(txn_map_shard_size), |
75 | 349 | _txn_shard_size(txn_shard_size) { |
76 | 349 | DCHECK_GT(_txn_map_shard_size, 0); |
77 | 349 | DCHECK_GT(_txn_shard_size, 0); |
78 | 349 | DCHECK_EQ(_txn_map_shard_size & (_txn_map_shard_size - 1), 0); |
79 | 349 | DCHECK_EQ(_txn_shard_size & (_txn_shard_size - 1), 0); |
80 | 349 | _txn_map_locks = new std::shared_mutex[_txn_map_shard_size]; |
81 | 349 | _txn_tablet_maps = new txn_tablet_map_t[_txn_map_shard_size]; |
82 | 349 | _txn_partition_maps = new txn_partition_map_t[_txn_map_shard_size]; |
83 | 349 | _txn_mutex = new std::shared_mutex[_txn_shard_size]; |
84 | 349 | _txn_tablet_delta_writer_map = new txn_tablet_delta_writer_map_t[_txn_map_shard_size]; |
85 | 349 | _txn_tablet_delta_writer_map_locks = new std::shared_mutex[_txn_map_shard_size]; |
86 | | // For debugging |
87 | 349 | _tablet_version_cache = std::make_unique<TabletVersionCache>(100000); |
88 | 349 | } |
89 | | |
90 | | // prepare txn should always be allowed because ingest task will be retried |
91 | | // could not distinguish rollup, schema change or base table, prepare txn successfully will allow |
92 | | // ingest retried |
93 | | Status TxnManager::prepare_txn(TPartitionId partition_id, const Tablet& tablet, |
94 | | TTransactionId transaction_id, const PUniqueId& load_id, |
95 | 0 | bool ingest) { |
96 | | // check if the tablet has already been shutdown. If it has, it indicates that |
97 | | // it is an old tablet, and data should not be imported into the old tablet. |
98 | | // Otherwise, it may lead to data loss during migration. |
99 | 0 | if (tablet.tablet_state() == TABLET_SHUTDOWN) { |
100 | 0 | return Status::InternalError<false>( |
101 | 0 | "The tablet's state is shutdown, tablet_id: {}. The tablet may have been dropped " |
102 | 0 | "or migrationed. Please check if the table has been dropped or try again.", |
103 | 0 | tablet.tablet_id()); |
104 | 0 | } |
105 | 0 | return prepare_txn(partition_id, transaction_id, tablet.tablet_id(), tablet.tablet_uid(), |
106 | 0 | load_id, ingest); |
107 | 0 | } |
108 | | |
109 | | // most used for ut |
110 | | Status TxnManager::prepare_txn(TPartitionId partition_id, TTransactionId transaction_id, |
111 | | TTabletId tablet_id, TabletUid tablet_uid, const PUniqueId& load_id, |
112 | 986 | bool ingest) { |
113 | 986 | TxnKey key(partition_id, transaction_id); |
114 | 986 | TabletInfo tablet_info(tablet_id, tablet_uid); |
115 | 986 | std::lock_guard<std::shared_mutex> txn_wrlock(_get_txn_map_lock(transaction_id)); |
116 | 986 | txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
117 | | |
118 | 986 | DBUG_EXECUTE_IF("TxnManager.prepare_txn.random_failed", { |
119 | 986 | if (rand() % 100 < (100 * dp->param("percent", 0.5))) { |
120 | 986 | LOG_WARNING("TxnManager.prepare_txn.random_failed random failed") |
121 | 986 | .tag("txn_id", transaction_id) |
122 | 986 | .tag("tablet_id", tablet_id); |
123 | 986 | return Status::InternalError("debug prepare txn random failed"); |
124 | 986 | } |
125 | 986 | }); |
126 | 986 | DBUG_EXECUTE_IF("TxnManager.prepare_txn.wait", { |
127 | 986 | if (auto wait = dp->param<int>("duration", 0); wait > 0) { |
128 | 986 | LOG_WARNING("TxnManager.prepare_txn.wait") |
129 | 986 | .tag("txn_id", transaction_id) |
130 | 986 | .tag("tablet_id", tablet_id) |
131 | 986 | .tag("wait ms", wait); |
132 | 986 | std::this_thread::sleep_for(std::chrono::milliseconds(wait)); |
133 | 986 | } |
134 | 986 | }); |
135 | | |
136 | | /// Step 1: check if the transaction is already exist |
137 | 986 | do { |
138 | 986 | auto iter = txn_tablet_map.find(key); |
139 | 986 | if (iter == txn_tablet_map.end()) { |
140 | 172 | break; |
141 | 172 | } |
142 | | |
143 | | // exist TxnKey |
144 | 814 | auto& txn_tablet_info_map = iter->second; |
145 | 814 | auto load_itr = txn_tablet_info_map.find(tablet_info); |
146 | 814 | if (load_itr == txn_tablet_info_map.end()) { |
147 | 812 | break; |
148 | 812 | } |
149 | | |
150 | | // found load for txn,tablet |
151 | 2 | auto& load_info = load_itr->second; |
152 | | // case 1: user commit rowset, then the load id must be equal |
153 | | // check if load id is equal |
154 | 2 | if (load_info->load_id.hi() == load_id.hi() && load_info->load_id.lo() == load_id.lo() && |
155 | 2 | load_info->rowset != nullptr) { |
156 | 1 | LOG(WARNING) << "find transaction exists when add to engine." |
157 | 1 | << "partition_id: " << key.first << ", transaction_id: " << key.second |
158 | 1 | << ", tablet: " << tablet_info.to_string(); |
159 | 1 | return Status::OK(); |
160 | 1 | } |
161 | 2 | } while (false); |
162 | | |
163 | | /// Step 2: check if there are too many transactions on running. |
164 | | // check if there are too many transactions on running. |
165 | | // if yes, reject the request. |
166 | 985 | txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id); |
167 | 985 | if (txn_partition_map.size() > config::max_runnings_transactions_per_txn_map) { |
168 | 0 | return Status::Error<TOO_MANY_TRANSACTIONS>("too many transactions: {}, limit: {}", |
169 | 0 | txn_tablet_map.size(), |
170 | 0 | config::max_runnings_transactions_per_txn_map); |
171 | 0 | } |
172 | | |
173 | | /// Step 3: Add transaction to engine |
174 | | // not found load id |
175 | | // case 1: user start a new txn, rowset = null |
176 | | // case 2: loading txn from meta env |
177 | 985 | auto load_info = std::make_shared<TabletTxnInfo>(load_id, nullptr, ingest); |
178 | 985 | load_info->prepare(); |
179 | 985 | if (!txn_tablet_map.contains(key)) { |
180 | 172 | g_tablet_txn_info_txn_partitions_count << 1; |
181 | 172 | } |
182 | 985 | txn_tablet_map[key][tablet_info] = std::move(load_info); |
183 | 985 | _insert_txn_partition_map_unlocked(transaction_id, partition_id); |
184 | 985 | VLOG_NOTICE << "add transaction to engine successfully." |
185 | 11 | << "partition_id: " << key.first << ", transaction_id: " << key.second |
186 | 11 | << ", tablet: " << tablet_info.to_string(); |
187 | 985 | return Status::OK(); |
188 | 985 | } |
189 | | |
190 | | Status TxnManager::commit_txn(TPartitionId partition_id, const Tablet& tablet, |
191 | | TTransactionId transaction_id, const PUniqueId& load_id, |
192 | | const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard, |
193 | | bool is_recovery, |
194 | 973 | std::shared_ptr<PartialUpdateInfo> partial_update_info) { |
195 | 973 | return commit_txn(tablet.data_dir()->get_meta(), partition_id, transaction_id, |
196 | 973 | tablet.tablet_id(), tablet.tablet_uid(), load_id, rowset_ptr, |
197 | 973 | std::move(guard), is_recovery, partial_update_info); |
198 | 973 | } |
199 | | |
200 | | Status TxnManager::publish_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, |
201 | | TTransactionId transaction_id, const Version& version, |
202 | | TabletPublishStatistics* stats, |
203 | | std::shared_ptr<TabletTxnInfo>& extend_tablet_txn_info, |
204 | 953 | const int64_t commit_tso) { |
205 | 953 | return publish_txn(tablet->data_dir()->get_meta(), partition_id, transaction_id, |
206 | 953 | tablet->tablet_id(), tablet->tablet_uid(), version, stats, |
207 | 953 | extend_tablet_txn_info, commit_tso); |
208 | 953 | } |
209 | | |
210 | | void TxnManager::abort_txn(TPartitionId partition_id, TTransactionId transaction_id, |
211 | 0 | TTabletId tablet_id, TabletUid tablet_uid) { |
212 | 0 | pair<int64_t, int64_t> key(partition_id, transaction_id); |
213 | 0 | TabletInfo tablet_info(tablet_id, tablet_uid); |
214 | |
|
215 | 0 | std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id)); |
216 | |
|
217 | 0 | auto& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
218 | 0 | auto it = txn_tablet_map.find(key); |
219 | 0 | if (it == txn_tablet_map.end()) { |
220 | 0 | return; |
221 | 0 | } |
222 | | |
223 | 0 | auto& tablet_txn_info_map = it->second; |
224 | 0 | auto tablet_txn_info_iter = tablet_txn_info_map.find(tablet_info); |
225 | 0 | if (tablet_txn_info_iter == tablet_txn_info_map.end()) { |
226 | 0 | return; |
227 | 0 | } |
228 | | |
229 | 0 | auto& txn_info = tablet_txn_info_iter->second; |
230 | 0 | txn_info->abort(); |
231 | 0 | } |
232 | | |
233 | | // delete the txn from manager if it is not committed(not have a valid rowset) |
234 | | Status TxnManager::rollback_txn(TPartitionId partition_id, const Tablet& tablet, |
235 | 5 | TTransactionId transaction_id) { |
236 | 5 | return rollback_txn(partition_id, transaction_id, tablet.tablet_id(), tablet.tablet_uid()); |
237 | 5 | } |
238 | | |
239 | | Status TxnManager::delete_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, |
240 | 0 | TTransactionId transaction_id) { |
241 | 0 | return delete_txn(tablet->data_dir()->get_meta(), partition_id, transaction_id, |
242 | 0 | tablet->tablet_id(), tablet->tablet_uid()); |
243 | 0 | } |
244 | | |
245 | | void TxnManager::set_txn_related_delete_bitmap( |
246 | | TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, |
247 | | TabletUid tablet_uid, bool unique_key_merge_on_write, DeleteBitmapPtr delete_bitmap, |
248 | | const RowsetIdUnorderedSet& rowset_ids, |
249 | 950 | std::shared_ptr<PartialUpdateInfo> partial_update_info) { |
250 | 950 | pair<int64_t, int64_t> key(partition_id, transaction_id); |
251 | 950 | TabletInfo tablet_info(tablet_id, tablet_uid); |
252 | | |
253 | 950 | std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id)); |
254 | 950 | { |
255 | | // get tx |
256 | 950 | std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id)); |
257 | 950 | txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
258 | 950 | auto it = txn_tablet_map.find(key); |
259 | 950 | if (it == txn_tablet_map.end()) { |
260 | 1 | LOG(WARNING) << "transaction_id: " << transaction_id |
261 | 1 | << " partition_id: " << partition_id << " may be cleared"; |
262 | 1 | return; |
263 | 1 | } |
264 | 949 | auto load_itr = it->second.find(tablet_info); |
265 | 949 | if (load_itr == it->second.end()) { |
266 | 0 | LOG(WARNING) << "transaction_id: " << transaction_id |
267 | 0 | << " partition_id: " << partition_id << " tablet_id: " << tablet_id |
268 | 0 | << " may be cleared"; |
269 | 0 | return; |
270 | 0 | } |
271 | 949 | auto& load_info = load_itr->second; |
272 | 949 | load_info->unique_key_merge_on_write = unique_key_merge_on_write; |
273 | 949 | load_info->delete_bitmap = delete_bitmap; |
274 | 949 | load_info->rowset_ids = rowset_ids; |
275 | 949 | load_info->partial_update_info = partial_update_info; |
276 | 949 | } |
277 | 949 | } |
278 | | |
279 | | Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id, |
280 | | TTransactionId transaction_id, TTabletId tablet_id, |
281 | | TabletUid tablet_uid, const PUniqueId& load_id, |
282 | | const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard, |
283 | | bool is_recovery, |
284 | 1.12k | std::shared_ptr<PartialUpdateInfo> partial_update_info) { |
285 | 1.12k | if (partition_id < 1 || transaction_id < 1 || tablet_id < 1) { |
286 | 0 | LOG(WARNING) << "invalid commit req " |
287 | 0 | << " partition_id=" << partition_id << " transaction_id=" << transaction_id |
288 | 0 | << " tablet_id=" << tablet_id; |
289 | 0 | return Status::InternalError("invalid partition id"); |
290 | 0 | } |
291 | | |
292 | 1.12k | pair<int64_t, int64_t> key(partition_id, transaction_id); |
293 | 1.12k | TabletInfo tablet_info(tablet_id, tablet_uid); |
294 | 1.12k | if (rowset_ptr == nullptr) { |
295 | 0 | return Status::Error<ROWSET_INVALID>( |
296 | 0 | "could not commit txn because rowset ptr is null. partition_id: {}, " |
297 | 0 | "transaction_id: {}, tablet: {}", |
298 | 0 | key.first, key.second, tablet_info.to_string()); |
299 | 0 | } |
300 | | |
301 | 1.12k | DBUG_EXECUTE_IF("TxnManager.commit_txn.random_failed", { |
302 | 1.12k | if (rand() % 100 < (100 * dp->param("percent", 0.5))) { |
303 | 1.12k | LOG_WARNING("TxnManager.commit_txn.random_failed") |
304 | 1.12k | .tag("txn_id", transaction_id) |
305 | 1.12k | .tag("tablet_id", tablet_id); |
306 | 1.12k | return Status::InternalError("debug commit txn random failed"); |
307 | 1.12k | } |
308 | 1.12k | }); |
309 | 1.12k | DBUG_EXECUTE_IF("TxnManager.commit_txn.wait", { |
310 | 1.12k | if (auto wait = dp->param<int>("duration", 0); wait > 0) { |
311 | 1.12k | LOG_WARNING("TxnManager.commit_txn.wait") |
312 | 1.12k | .tag("txn_id", transaction_id) |
313 | 1.12k | .tag("tablet_id", tablet_id) |
314 | 1.12k | .tag("wait ms", wait); |
315 | 1.12k | std::this_thread::sleep_for(std::chrono::milliseconds(wait)); |
316 | 1.12k | } |
317 | 1.12k | }); |
318 | | |
319 | 1.12k | std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id)); |
320 | | // this while loop just run only once, just for if break |
321 | 1.12k | do { |
322 | | // get tx |
323 | 1.12k | std::shared_lock rdlock(_get_txn_map_lock(transaction_id)); |
324 | 1.12k | auto rs_pb = rowset_ptr->rowset_meta()->get_rowset_pb(); |
325 | | // TODO(dx): remove log after fix partition id eq 0 bug |
326 | 1.12k | if (!rs_pb.has_partition_id() || rs_pb.partition_id() == 0) { |
327 | 1 | rowset_ptr->rowset_meta()->set_partition_id(partition_id); |
328 | 1 | LOG(WARNING) << "cant get partition id from rs pb, get from func arg partition_id=" |
329 | 1 | << partition_id; |
330 | 1 | } |
331 | 1.12k | txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
332 | 1.12k | auto it = txn_tablet_map.find(key); |
333 | 1.12k | if (it == txn_tablet_map.end()) { |
334 | 24 | break; |
335 | 24 | } |
336 | | |
337 | 1.09k | auto load_itr = it->second.find(tablet_info); |
338 | 1.09k | if (load_itr == it->second.end()) { |
339 | 120 | break; |
340 | 120 | } |
341 | | |
342 | | // found load for txn,tablet |
343 | | // case 1: user commit rowset, then the load id must be equal |
344 | 976 | auto& load_info = load_itr->second; |
345 | | // check if load id is equal |
346 | 976 | if (load_info->rowset == nullptr) { |
347 | 974 | break; |
348 | 974 | } |
349 | | |
350 | 2 | if (load_info->load_id.hi() != load_id.hi() || load_info->load_id.lo() != load_id.lo()) { |
351 | 0 | break; |
352 | 0 | } |
353 | | |
354 | | // find a rowset with same rowset id, then it means a duplicate call |
355 | 2 | if (load_info->rowset->rowset_id() == rowset_ptr->rowset_id()) { |
356 | 1 | LOG(INFO) << "find rowset exists when commit transaction to engine." |
357 | 1 | << "partition_id: " << key.first << ", transaction_id: " << key.second |
358 | 1 | << ", tablet: " << tablet_info.to_string() |
359 | 1 | << ", rowset_id: " << load_info->rowset->rowset_id(); |
360 | | // Should not remove this rowset from pending rowsets |
361 | 1 | load_info->pending_rs_guard = std::move(guard); |
362 | 1 | return Status::OK(); |
363 | 1 | } |
364 | | |
365 | | // find a rowset with different rowset id, then it should not happen, just return errors |
366 | 1 | return Status::Error<PUSH_TRANSACTION_ALREADY_EXIST>( |
367 | 1 | "find rowset exists when commit transaction to engine. but rowset ids are not " |
368 | 1 | "same. partition_id: {}, transaction_id: {}, tablet: {}, exist rowset_id: {}, new " |
369 | 1 | "rowset_id: {}", |
370 | 1 | key.first, key.second, tablet_info.to_string(), |
371 | 1 | load_info->rowset->rowset_id().to_string(), rowset_ptr->rowset_id().to_string()); |
372 | 2 | } while (false); |
373 | | |
374 | | // if not in recovery mode, then should persist the meta to meta env |
375 | | // save meta need access disk, it maybe very slow, so that it is not in global txn lock |
376 | | // it is under a single txn lock |
377 | 1.11k | if (!is_recovery) { |
378 | 984 | Status save_status = |
379 | 984 | RowsetMetaManager::save(meta, tablet_uid, rowset_ptr->rowset_id(), |
380 | 984 | rowset_ptr->rowset_meta()->get_rowset_pb(), false); |
381 | 984 | DBUG_EXECUTE_IF("TxnManager.RowsetMetaManager.save_wait", { |
382 | 984 | if (auto wait = dp->param<int>("duration", 0); wait > 0) { |
383 | 984 | LOG_WARNING("TxnManager.RowsetMetaManager.save_wait") |
384 | 984 | .tag("txn_id", transaction_id) |
385 | 984 | .tag("tablet_id", tablet_id) |
386 | 984 | .tag("wait ms", wait); |
387 | 984 | std::this_thread::sleep_for(std::chrono::milliseconds(wait)); |
388 | 984 | } |
389 | 984 | }); |
390 | 984 | if (!save_status.ok()) { |
391 | 0 | save_status.append(fmt::format(", txn id: {}", transaction_id)); |
392 | 0 | return save_status; |
393 | 0 | } |
394 | | |
395 | 984 | if (partial_update_info && partial_update_info->is_partial_update()) { |
396 | 42 | PartialUpdateInfoPB partial_update_info_pb; |
397 | 42 | partial_update_info->to_pb(&partial_update_info_pb); |
398 | 42 | save_status = RowsetMetaManager::save_partial_update_info( |
399 | 42 | meta, tablet_id, partition_id, transaction_id, partial_update_info_pb); |
400 | 42 | if (!save_status.ok()) { |
401 | 0 | save_status.append(fmt::format(", txn_id: {}", transaction_id)); |
402 | 0 | return save_status; |
403 | 0 | } |
404 | 42 | } |
405 | 984 | } |
406 | | |
407 | 1.11k | TabletSharedPtr tablet; |
408 | 1.11k | std::shared_ptr<PartialUpdateInfo> decoded_partial_update_info {nullptr}; |
409 | 1.11k | if (is_recovery) { |
410 | 134 | tablet = _engine.tablet_manager()->get_tablet(tablet_id, tablet_uid); |
411 | 134 | if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) { |
412 | 0 | PartialUpdateInfoPB partial_update_info_pb; |
413 | 0 | auto st = RowsetMetaManager::try_get_partial_update_info( |
414 | 0 | meta, tablet_id, partition_id, transaction_id, &partial_update_info_pb); |
415 | 0 | if (st.ok()) { |
416 | 0 | decoded_partial_update_info = std::make_shared<PartialUpdateInfo>(); |
417 | 0 | decoded_partial_update_info->from_pb(&partial_update_info_pb); |
418 | 0 | DCHECK(decoded_partial_update_info->is_partial_update()); |
419 | 0 | } else if (!st.is<META_KEY_NOT_FOUND>()) { |
420 | | // the load is not a partial update |
421 | 0 | return st; |
422 | 0 | } |
423 | 0 | } |
424 | 134 | } |
425 | | |
426 | 1.11k | { |
427 | 1.11k | std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id)); |
428 | 1.11k | auto load_info = std::make_shared<TabletTxnInfo>(load_id, rowset_ptr); |
429 | 1.11k | load_info->pending_rs_guard = std::move(guard); |
430 | 1.11k | if (is_recovery) { |
431 | 134 | if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) { |
432 | 0 | load_info->unique_key_merge_on_write = true; |
433 | 0 | load_info->delete_bitmap.reset(new DeleteBitmap(tablet->tablet_id())); |
434 | 0 | if (decoded_partial_update_info) { |
435 | 0 | LOG_INFO( |
436 | 0 | "get partial update info from RocksDB during recovery. txn_id={}, " |
437 | 0 | "partition_id={}, tablet_id={}, partial_update_info=[{}]", |
438 | 0 | transaction_id, partition_id, tablet_id, |
439 | 0 | decoded_partial_update_info->summary()); |
440 | 0 | load_info->partial_update_info = decoded_partial_update_info; |
441 | 0 | } |
442 | 0 | } |
443 | 134 | } |
444 | 1.11k | load_info->commit(); |
445 | | |
446 | 1.11k | txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
447 | 1.11k | txn_tablet_map[key][tablet_info] = std::move(load_info); |
448 | 1.11k | _insert_txn_partition_map_unlocked(transaction_id, partition_id); |
449 | 1.11k | VLOG_NOTICE << "commit transaction to engine successfully." |
450 | 15 | << " partition_id: " << key.first << ", transaction_id: " << key.second |
451 | 15 | << ", tablet: " << tablet_info.to_string() |
452 | 15 | << ", rowsetid: " << rowset_ptr->rowset_id() |
453 | 15 | << ", version: " << rowset_ptr->version().first; |
454 | 1.11k | } |
455 | 1.11k | return Status::OK(); |
456 | 1.11k | } |
457 | | |
458 | | // remove a txn from txn manager |
459 | | Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, |
460 | | TTransactionId transaction_id, TTabletId tablet_id, |
461 | | TabletUid tablet_uid, const Version& version, |
462 | | TabletPublishStatistics* stats, |
463 | | std::shared_ptr<TabletTxnInfo>& extend_tablet_txn_info, |
464 | 966 | const int64_t commit_tso) { |
465 | 966 | auto tablet = _engine.tablet_manager()->get_tablet(tablet_id); |
466 | 966 | if (tablet == nullptr) { |
467 | 0 | return Status::OK(); |
468 | 0 | } |
469 | 966 | DCHECK(stats != nullptr); |
470 | | |
471 | 966 | pair<int64_t, int64_t> key(partition_id, transaction_id); |
472 | 966 | TabletInfo tablet_info(tablet_id, tablet_uid); |
473 | 966 | RowsetSharedPtr rowset; |
474 | 966 | std::shared_ptr<TabletTxnInfo> tablet_txn_info; |
475 | 966 | int64_t t1 = MonotonicMicros(); |
476 | | /// Step 1: get rowset, tablet_txn_info by key |
477 | 966 | { |
478 | 966 | std::shared_lock txn_rlock(_get_txn_lock(transaction_id)); |
479 | 966 | std::shared_lock txn_map_rlock(_get_txn_map_lock(transaction_id)); |
480 | 966 | stats->lock_wait_time_us += MonotonicMicros() - t1; |
481 | | |
482 | 966 | txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
483 | 966 | if (auto it = txn_tablet_map.find(key); it != txn_tablet_map.end()) { |
484 | 965 | auto& tablet_map = it->second; |
485 | 965 | if (auto txn_info_iter = tablet_map.find(tablet_info); |
486 | 965 | txn_info_iter != tablet_map.end()) { |
487 | | // found load for txn,tablet |
488 | | // case 1: user commit rowset, then the load id must be equal |
489 | 965 | tablet_txn_info = txn_info_iter->second; |
490 | 965 | extend_tablet_txn_info = tablet_txn_info; |
491 | 965 | rowset = tablet_txn_info->rowset; |
492 | 965 | } |
493 | 965 | } |
494 | 966 | } |
495 | 966 | if (rowset == nullptr) { |
496 | 1 | return Status::Error<TRANSACTION_NOT_EXIST>( |
497 | 1 | "publish txn failed, rowset not found. partition_id={}, transaction_id={}, " |
498 | 1 | "tablet={}, commit_tso={}", |
499 | 1 | partition_id, transaction_id, tablet_info.to_string(), commit_tso); |
500 | 1 | } |
501 | 965 | DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_before_save_rs_meta", { |
502 | 965 | if (rand() % 100 < (100 * dp->param("percent", 0.5))) { |
503 | 965 | LOG_WARNING("TxnManager.publish_txn.random_failed_before_save_rs_meta") |
504 | 965 | .tag("txn_id", transaction_id) |
505 | 965 | .tag("tablet_id", tablet_id); |
506 | 965 | return Status::InternalError("debug publish txn before save rs meta random failed"); |
507 | 965 | } |
508 | 965 | }); |
509 | 965 | DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_before_save_rs_meta", { |
510 | 965 | if (auto wait = dp->param<int>("duration", 0); wait > 0) { |
511 | 965 | LOG_WARNING("TxnManager.publish_txn.wait_before_save_rs_meta") |
512 | 965 | .tag("txn_id", transaction_id) |
513 | 965 | .tag("tablet_id", tablet_id) |
514 | 965 | .tag("wait ms", wait); |
515 | 965 | std::this_thread::sleep_for(std::chrono::milliseconds(wait)); |
516 | 965 | } |
517 | 965 | }); |
518 | | |
519 | | /// Step 2: make rowset visible |
520 | | // save meta need access disk, it maybe very slow, so that it is not in global txn lock |
521 | | // it is under a single txn lock |
522 | | // TODO(ygl): rowset is already set version here, memory is changed, if save failed |
523 | | // it maybe a fatal error |
524 | 965 | rowset->make_visible(version, commit_tso); |
525 | | |
526 | 965 | DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_after_save_rs_meta", { |
527 | 965 | if (rand() % 100 < (100 * dp->param("percent", 0.5))) { |
528 | 965 | LOG_WARNING("TxnManager.publish_txn.random_failed_after_save_rs_meta") |
529 | 965 | .tag("txn_id", transaction_id) |
530 | 965 | .tag("tablet_id", tablet_id); |
531 | 965 | return Status::InternalError("debug publish txn after save rs meta random failed"); |
532 | 965 | } |
533 | 965 | }); |
534 | 965 | DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_after_save_rs_meta", { |
535 | 965 | if (auto wait = dp->param<int>("duration", 0); wait > 0) { |
536 | 965 | LOG_WARNING("TxnManager.publish_txn.wait_after_save_rs_meta") |
537 | 965 | .tag("txn_id", transaction_id) |
538 | 965 | .tag("tablet_id", tablet_id) |
539 | 965 | .tag("wait ms", wait); |
540 | 965 | std::this_thread::sleep_for(std::chrono::milliseconds(wait)); |
541 | 965 | } |
542 | 965 | }); |
543 | | // update delete_bitmap |
544 | 965 | if (tablet_txn_info->unique_key_merge_on_write) { |
545 | 941 | int64_t t2 = MonotonicMicros(); |
546 | 941 | if (rowset->num_segments() > 1 && |
547 | 941 | !tablet_txn_info->delete_bitmap->has_calculated_for_multi_segments( |
548 | 0 | rowset->rowset_id())) { |
549 | | // delete bitmap is empty, should re-calculate delete bitmaps between segments |
550 | 0 | std::vector<segment_v2::SegmentSharedPtr> segments; |
551 | 0 | RETURN_IF_ERROR(std::static_pointer_cast<BetaRowset>(rowset)->load_segments(&segments)); |
552 | 0 | RETURN_IF_ERROR(tablet->calc_delete_bitmap_between_segments( |
553 | 0 | rowset->tablet_schema(), rowset->rowset_id(), segments, |
554 | 0 | tablet_txn_info->delete_bitmap)); |
555 | 0 | } |
556 | | |
557 | 941 | RETURN_IF_ERROR( |
558 | 941 | Tablet::update_delete_bitmap(tablet, tablet_txn_info.get(), transaction_id)); |
559 | 941 | int64_t t3 = MonotonicMicros(); |
560 | 941 | stats->calc_delete_bitmap_time_us = t3 - t2; |
561 | 941 | RETURN_IF_ERROR(TabletMetaManager::save_delete_bitmap( |
562 | 941 | tablet->data_dir(), tablet->tablet_id(), tablet_txn_info->delete_bitmap, |
563 | 941 | version.second)); |
564 | 941 | stats->save_meta_time_us = MonotonicMicros() - t3; |
565 | 941 | } |
566 | | |
567 | | /// Step 3: add to binlog |
568 | 965 | auto enable_binlog = tablet->is_enable_binlog(); |
569 | 965 | if (enable_binlog) { |
570 | 0 | auto status = rowset->add_to_binlog(); |
571 | 0 | if (!status.ok()) { |
572 | 0 | return Status::Error<ROWSET_ADD_TO_BINLOG_FAILED>( |
573 | 0 | "add rowset to binlog failed. when publish txn rowset_id: {}, tablet id: {}, " |
574 | 0 | "txn id: {}, status: {}", |
575 | 0 | rowset->rowset_id().to_string(), tablet_id, transaction_id, |
576 | 0 | status.to_string_no_stack()); |
577 | 0 | } |
578 | 0 | } |
579 | | |
580 | | /// Step 4: save meta |
581 | 965 | int64_t t5 = MonotonicMicros(); |
582 | 965 | auto status = RowsetMetaManager::save(meta, tablet_uid, rowset->rowset_id(), |
583 | 965 | rowset->rowset_meta()->get_rowset_pb(), enable_binlog); |
584 | 965 | stats->save_meta_time_us += MonotonicMicros() - t5; |
585 | 965 | if (!status.ok()) { |
586 | 0 | status.append(fmt::format(", txn id: {}", transaction_id)); |
587 | 0 | return status; |
588 | 0 | } |
589 | | |
590 | 965 | if (tablet_txn_info->unique_key_merge_on_write && tablet_txn_info->partial_update_info && |
591 | 965 | tablet_txn_info->partial_update_info->is_partial_update()) { |
592 | 42 | status = RowsetMetaManager::remove_partial_update_info(meta, tablet_id, partition_id, |
593 | 42 | transaction_id); |
594 | 42 | if (!status) { |
595 | | // discard the error status and print the warning log |
596 | 0 | LOG_WARNING( |
597 | 0 | "fail to remove partial update info from RocksDB. txn_id={}, rowset_id={}, " |
598 | 0 | "tablet_id={}, tablet_uid={}", |
599 | 0 | transaction_id, rowset->rowset_id().to_string(), tablet_id, |
600 | 0 | tablet_uid.to_string()); |
601 | 0 | } |
602 | 42 | } |
603 | | |
604 | | // TODO(Drogon): remove these test codes |
605 | 965 | if (enable_binlog) { |
606 | 0 | auto version_str = fmt::format("{}", version.first); |
607 | 0 | VLOG_DEBUG << fmt::format("tabletid: {}, version: {}, binlog filepath: {}", tablet_id, |
608 | 0 | version_str, tablet->get_binlog_filepath(version_str)); |
609 | 0 | } |
610 | | |
611 | | /// Step 5: remove tablet_info from tnx_tablet_map |
612 | | // txn_tablet_map[key] empty, remove key from txn_tablet_map |
613 | 965 | int64_t t6 = MonotonicMicros(); |
614 | 965 | std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id)); |
615 | 965 | std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id)); |
616 | 965 | stats->lock_wait_time_us += MonotonicMicros() - t6; |
617 | 965 | _remove_txn_tablet_info_unlocked(partition_id, transaction_id, tablet_id, tablet_uid, txn_lock, |
618 | 965 | wrlock); |
619 | 965 | VLOG_NOTICE << "publish txn successfully." |
620 | 7 | << " partition_id: " << key.first << ", txn_id: " << key.second |
621 | 7 | << ", tablet_id: " << tablet_info.tablet_id << ", rowsetid: " << rowset->rowset_id() |
622 | 7 | << ", version: " << version.first << "," << version.second; |
623 | 965 | return status; |
624 | 965 | } |
625 | | |
626 | | void TxnManager::_remove_txn_tablet_info_unlocked(TPartitionId partition_id, |
627 | | TTransactionId transaction_id, |
628 | | TTabletId tablet_id, TabletUid tablet_uid, |
629 | | std::lock_guard<std::shared_mutex>& txn_lock, |
630 | 965 | std::lock_guard<std::shared_mutex>& wrlock) { |
631 | 965 | std::pair<int64_t, int64_t> key {partition_id, transaction_id}; |
632 | 965 | TabletInfo tablet_info {tablet_id, tablet_uid}; |
633 | 965 | txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
634 | 965 | if (auto it = txn_tablet_map.find(key); it != txn_tablet_map.end()) { |
635 | 965 | it->second.erase(tablet_info); |
636 | 965 | if (it->second.empty()) { |
637 | 155 | txn_tablet_map.erase(it); |
638 | 155 | g_tablet_txn_info_txn_partitions_count << -1; |
639 | 155 | _clear_txn_partition_map_unlocked(transaction_id, partition_id); |
640 | 155 | } |
641 | 965 | } |
642 | 965 | } |
643 | | |
644 | | void TxnManager::remove_txn_tablet_info(TPartitionId partition_id, TTransactionId transaction_id, |
645 | 0 | TTabletId tablet_id, TabletUid tablet_uid) { |
646 | 0 | std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id)); |
647 | 0 | std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id)); |
648 | 0 | _remove_txn_tablet_info_unlocked(partition_id, transaction_id, tablet_id, tablet_uid, txn_lock, |
649 | 0 | wrlock); |
650 | 0 | } |
651 | | |
652 | | // txn could be rollbacked if it does not have related rowset |
653 | | // if the txn has related rowset then could not rollback it, because it |
654 | | // may be committed in another thread and our current thread meets errors when writing to data file |
655 | | // BE has to wait for fe call clear txn api |
656 | | Status TxnManager::rollback_txn(TPartitionId partition_id, TTransactionId transaction_id, |
657 | 7 | TTabletId tablet_id, TabletUid tablet_uid) { |
658 | 7 | pair<int64_t, int64_t> key(partition_id, transaction_id); |
659 | 7 | TabletInfo tablet_info(tablet_id, tablet_uid); |
660 | | |
661 | 7 | std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id)); |
662 | 7 | txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
663 | | |
664 | 7 | auto it = txn_tablet_map.find(key); |
665 | 7 | if (it == txn_tablet_map.end()) { |
666 | 0 | return Status::OK(); |
667 | 0 | } |
668 | | |
669 | 7 | auto& tablet_txn_info_map = it->second; |
670 | 7 | if (auto load_itr = tablet_txn_info_map.find(tablet_info); |
671 | 7 | load_itr != tablet_txn_info_map.end()) { |
672 | | // found load for txn,tablet |
673 | | // case 1: user commit rowset, then the load id must be equal |
674 | 7 | const auto& load_info = load_itr->second; |
675 | 7 | if (load_info->rowset != nullptr) { |
676 | 1 | return Status::Error<TRANSACTION_ALREADY_COMMITTED>( |
677 | 1 | "if rowset is not null, it means other thread may commit the rowset should " |
678 | 1 | "not delete txn any more"); |
679 | 1 | } |
680 | 7 | } |
681 | | |
682 | 6 | tablet_txn_info_map.erase(tablet_info); |
683 | 6 | LOG(INFO) << "rollback transaction from engine successfully." |
684 | 6 | << " partition_id: " << key.first << ", transaction_id: " << key.second |
685 | 6 | << ", tablet: " << tablet_info.to_string(); |
686 | 6 | if (tablet_txn_info_map.empty()) { |
687 | 6 | txn_tablet_map.erase(it); |
688 | 6 | g_tablet_txn_info_txn_partitions_count << -1; |
689 | 6 | _clear_txn_partition_map_unlocked(transaction_id, partition_id); |
690 | 6 | } |
691 | 6 | return Status::OK(); |
692 | 7 | } |
693 | | |
694 | | // fe call this api to clear unused rowsets in be |
695 | | // could not delete the rowset if it already has a valid version |
696 | | Status TxnManager::delete_txn(OlapMeta* meta, TPartitionId partition_id, |
697 | | TTransactionId transaction_id, TTabletId tablet_id, |
698 | 4 | TabletUid tablet_uid) { |
699 | 4 | pair<int64_t, int64_t> key(partition_id, transaction_id); |
700 | 4 | TabletInfo tablet_info(tablet_id, tablet_uid); |
701 | 4 | std::lock_guard<std::shared_mutex> txn_wrlock(_get_txn_map_lock(transaction_id)); |
702 | 4 | txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
703 | 4 | auto it = txn_tablet_map.find(key); |
704 | 4 | if (it == txn_tablet_map.end()) { |
705 | 0 | return Status::Error<TRANSACTION_NOT_EXIST>("key not founded from txn_tablet_map"); |
706 | 0 | } |
707 | 4 | Status st = Status::OK(); |
708 | 4 | auto load_itr = it->second.find(tablet_info); |
709 | 4 | if (load_itr != it->second.end()) { |
710 | | // found load for txn,tablet |
711 | | // case 1: user commit rowset, then the load id must be equal |
712 | 4 | auto& load_info = load_itr->second; |
713 | 4 | auto& rowset = load_info->rowset; |
714 | 4 | if (rowset != nullptr && meta != nullptr) { |
715 | 3 | if (!rowset->is_pending()) { |
716 | 1 | st = Status::Error<TRANSACTION_ALREADY_COMMITTED>( |
717 | 1 | "could not delete transaction from engine, just remove it from memory not " |
718 | 1 | "delete from disk, because related rowset already published. partition_id: " |
719 | 1 | "{}, transaction_id: {}, tablet: {}, rowset id: {}, version: {}, state: {}", |
720 | 1 | key.first, key.second, tablet_info.to_string(), |
721 | 1 | rowset->rowset_id().to_string(), rowset->version().to_string(), |
722 | 1 | RowsetStatePB_Name(rowset->rowset_meta_state())); |
723 | 2 | } else { |
724 | 2 | static_cast<void>(RowsetMetaManager::remove(meta, tablet_uid, rowset->rowset_id())); |
725 | 2 | #ifndef BE_TEST |
726 | 2 | _engine.add_unused_rowset(rowset); |
727 | 2 | #endif |
728 | 2 | VLOG_NOTICE << "delete transaction from engine successfully." |
729 | 2 | << " partition_id: " << key.first << ", transaction_id: " << key.second |
730 | 2 | << ", tablet: " << tablet_info.to_string() << ", rowset: " |
731 | 2 | << (rowset != nullptr ? rowset->rowset_id().to_string() : "0"); |
732 | 2 | } |
733 | 3 | } |
734 | 4 | it->second.erase(load_itr); |
735 | 4 | } |
736 | 4 | if (it->second.empty()) { |
737 | 4 | txn_tablet_map.erase(it); |
738 | 4 | g_tablet_txn_info_txn_partitions_count << -1; |
739 | 4 | _clear_txn_partition_map_unlocked(transaction_id, partition_id); |
740 | 4 | } |
741 | 4 | return st; |
742 | 4 | } |
743 | | |
744 | | void TxnManager::get_tablet_related_txns(TTabletId tablet_id, TabletUid tablet_uid, |
745 | | int64_t* partition_id, |
746 | 6 | std::set<int64_t>* transaction_ids) { |
747 | 6 | if (partition_id == nullptr || transaction_ids == nullptr) { |
748 | 0 | LOG(WARNING) << "parameter is null when get transactions by tablet"; |
749 | 0 | return; |
750 | 0 | } |
751 | | |
752 | 6 | TabletInfo tablet_info(tablet_id, tablet_uid); |
753 | 12 | for (int32_t i = 0; i < _txn_map_shard_size; i++) { |
754 | 6 | std::shared_lock txn_rdlock(_txn_map_locks[i]); |
755 | 6 | txn_tablet_map_t& txn_tablet_map = _txn_tablet_maps[i]; |
756 | 6 | for (auto& it : txn_tablet_map) { |
757 | 1 | if (it.second.find(tablet_info) != it.second.end()) { |
758 | 1 | *partition_id = it.first.first; |
759 | 1 | transaction_ids->insert(it.first.second); |
760 | 1 | VLOG_NOTICE << "find transaction on tablet." |
761 | 1 | << "partition_id: " << it.first.first |
762 | 1 | << ", transaction_id: " << it.first.second |
763 | 1 | << ", tablet: " << tablet_info.to_string(); |
764 | 1 | } |
765 | 1 | } |
766 | 6 | } |
767 | 6 | } |
768 | | |
769 | | // force drop all txns related with the tablet |
770 | | // maybe lock error, because not get txn lock before remove from meta |
771 | | void TxnManager::force_rollback_tablet_related_txns(OlapMeta* meta, TTabletId tablet_id, |
772 | 902 | TabletUid tablet_uid) { |
773 | 902 | TabletInfo tablet_info(tablet_id, tablet_uid); |
774 | 912k | for (int32_t i = 0; i < _txn_map_shard_size; i++) { |
775 | 911k | std::lock_guard<std::shared_mutex> txn_wrlock(_txn_map_locks[i]); |
776 | 911k | txn_tablet_map_t& txn_tablet_map = _txn_tablet_maps[i]; |
777 | 912k | for (auto it = txn_tablet_map.begin(); it != txn_tablet_map.end();) { |
778 | 1.10k | auto load_itr = it->second.find(tablet_info); |
779 | 1.10k | if (load_itr != it->second.end()) { |
780 | 0 | auto& load_info = load_itr->second; |
781 | 0 | auto& rowset = load_info->rowset; |
782 | 0 | if (rowset != nullptr && meta != nullptr) { |
783 | 0 | LOG(INFO) << " delete transaction from engine " |
784 | 0 | << ", tablet: " << tablet_info.to_string() |
785 | 0 | << ", rowset id: " << rowset->rowset_id(); |
786 | 0 | static_cast<void>( |
787 | 0 | RowsetMetaManager::remove(meta, tablet_uid, rowset->rowset_id())); |
788 | 0 | } |
789 | 0 | LOG(INFO) << "remove tablet related txn." |
790 | 0 | << " partition_id: " << it->first.first |
791 | 0 | << ", transaction_id: " << it->first.second |
792 | 0 | << ", tablet: " << tablet_info.to_string() << ", rowset: " |
793 | 0 | << (rowset != nullptr ? rowset->rowset_id().to_string() : "0"); |
794 | 0 | it->second.erase(load_itr); |
795 | 0 | } |
796 | 1.10k | if (it->second.empty()) { |
797 | 0 | _clear_txn_partition_map_unlocked(it->first.second, it->first.first); |
798 | 0 | it = txn_tablet_map.erase(it); |
799 | 0 | g_tablet_txn_info_txn_partitions_count << -1; |
800 | 1.10k | } else { |
801 | 1.10k | ++it; |
802 | 1.10k | } |
803 | 1.10k | } |
804 | 911k | } |
805 | 902 | if (meta != nullptr) { |
806 | 902 | Status st = RowsetMetaManager::remove_tablet_related_partial_update_info(meta, tablet_id); |
807 | 902 | if (!st.ok()) { |
808 | 0 | LOG_WARNING("failed to partial update info, tablet_id={}, err={}", tablet_id, |
809 | 0 | st.to_string()); |
810 | 0 | } |
811 | 902 | } |
812 | 902 | } |
813 | | |
814 | | void TxnManager::get_txn_related_tablets(const TTransactionId transaction_id, |
815 | | TPartitionId partition_id, |
816 | 175 | std::map<TabletInfo, RowsetSharedPtr>* tablet_infos) { |
817 | | // get tablets in this transaction |
818 | 175 | pair<int64_t, int64_t> key(partition_id, transaction_id); |
819 | 175 | std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id)); |
820 | 175 | txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
821 | 175 | auto it = txn_tablet_map.find(key); |
822 | 175 | if (it == txn_tablet_map.end()) { |
823 | 2 | VLOG_NOTICE << "could not find tablet for" |
824 | 1 | << " partition_id=" << partition_id << ", transaction_id=" << transaction_id; |
825 | 2 | return; |
826 | 2 | } |
827 | 173 | auto& load_info_map = it->second; |
828 | | |
829 | | // each tablet |
830 | 989 | for (auto& load_info : load_info_map) { |
831 | 989 | const TabletInfo& tablet_info = load_info.first; |
832 | | // must not check rowset == null here, because if rowset == null |
833 | | // publish version should failed |
834 | 989 | tablet_infos->emplace(tablet_info, load_info.second->rowset); |
835 | 989 | } |
836 | 173 | } |
837 | | |
838 | 12 | void TxnManager::get_all_related_tablets(std::set<TabletInfo>* tablet_infos) { |
839 | 12.3k | for (int32_t i = 0; i < _txn_map_shard_size; i++) { |
840 | 12.2k | std::shared_lock txn_rdlock(_txn_map_locks[i]); |
841 | 12.2k | for (auto& it : _txn_tablet_maps[i]) { |
842 | 268 | for (auto& tablet_load_it : it.second) { |
843 | 268 | tablet_infos->emplace(tablet_load_it.first); |
844 | 268 | } |
845 | 28 | } |
846 | 12.2k | } |
847 | 12 | } |
848 | | |
849 | | void TxnManager::get_all_commit_tablet_txn_info_by_tablet( |
850 | 72 | const Tablet& tablet, CommitTabletTxnInfoVec* commit_tablet_txn_info_vec) { |
851 | 72.1k | for (int32_t i = 0; i < _txn_map_shard_size; i++) { |
852 | 72.0k | std::shared_lock txn_rdlock(_txn_map_locks[i]); |
853 | 72.0k | for (const auto& [txn_key, load_info_map] : _txn_tablet_maps[i]) { |
854 | 148 | auto tablet_load_it = load_info_map.find(tablet.get_tablet_info()); |
855 | 148 | if (tablet_load_it != load_info_map.end()) { |
856 | 10 | const auto& [_, load_info] = *tablet_load_it; |
857 | 10 | const auto& rowset = load_info->rowset; |
858 | 10 | const auto& delete_bitmap = load_info->delete_bitmap; |
859 | 10 | if (!rowset || !delete_bitmap) { |
860 | 2 | continue; |
861 | 2 | } |
862 | 8 | commit_tablet_txn_info_vec->push_back({ |
863 | 8 | .transaction_id = txn_key.second, |
864 | 8 | .partition_id = txn_key.first, |
865 | 8 | .delete_bitmap = delete_bitmap, |
866 | 8 | .rowset_ids = load_info->rowset_ids, |
867 | 8 | .partial_update_info = load_info->partial_update_info, |
868 | 8 | }); |
869 | 8 | } |
870 | 148 | } |
871 | 72.0k | } |
872 | 72 | } |
873 | | |
874 | 29 | void TxnManager::build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>* expire_txn_map) { |
875 | 29 | int64_t now = UnixSeconds(); |
876 | | // traverse the txn map, and get all expired txns |
877 | 29.7k | for (int32_t i = 0; i < _txn_map_shard_size; i++) { |
878 | 29.6k | std::shared_lock txn_rdlock(_txn_map_locks[i]); |
879 | 29.6k | for (auto&& [txn_key, tablet_txn_infos] : _txn_tablet_maps[i]) { |
880 | 64 | auto txn_id = txn_key.second; |
881 | 612 | for (auto&& [tablet_info, txn_info] : tablet_txn_infos) { |
882 | 612 | double diff = difftime(now, txn_info->creation_time); |
883 | 612 | if (diff < config::pending_data_expire_time_sec) { |
884 | 612 | continue; |
885 | 612 | } |
886 | | |
887 | 0 | (*expire_txn_map)[tablet_info].push_back(txn_id); |
888 | 0 | if (VLOG_IS_ON(3)) { |
889 | 0 | VLOG_NOTICE << "find expired txn." |
890 | 0 | << " tablet=" << tablet_info.to_string() |
891 | 0 | << " transaction_id=" << txn_id << " exist_sec=" << diff; |
892 | 0 | } |
893 | 0 | } |
894 | 64 | } |
895 | 29.6k | } |
896 | 29 | } |
897 | | |
898 | | void TxnManager::get_partition_ids(const TTransactionId transaction_id, |
899 | 2 | std::vector<TPartitionId>* partition_ids) { |
900 | 2 | std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id)); |
901 | 2 | txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id); |
902 | 2 | auto it = txn_partition_map.find(transaction_id); |
903 | 2 | if (it != txn_partition_map.end()) { |
904 | 1 | for (int64_t partition_id : it->second) { |
905 | 1 | partition_ids->push_back(partition_id); |
906 | 1 | } |
907 | 1 | } |
908 | 2 | } |
909 | | |
910 | 2.10k | void TxnManager::_insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id) { |
911 | 2.10k | txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id); |
912 | 2.10k | auto find = txn_partition_map.find(transaction_id); |
913 | 2.10k | if (find == txn_partition_map.end()) { |
914 | 196 | txn_partition_map[transaction_id] = std::unordered_set<int64_t>(); |
915 | 196 | } |
916 | 2.10k | txn_partition_map[transaction_id].insert(partition_id); |
917 | 2.10k | } |
918 | | |
919 | 165 | void TxnManager::_clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id) { |
920 | 165 | txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id); |
921 | 165 | auto it = txn_partition_map.find(transaction_id); |
922 | 165 | if (it != txn_partition_map.end()) { |
923 | 165 | it->second.erase(partition_id); |
924 | 165 | if (it->second.empty()) { |
925 | 165 | txn_partition_map.erase(it); |
926 | 165 | } |
927 | 165 | } |
928 | 165 | } |
929 | | |
930 | | void TxnManager::add_txn_tablet_delta_writer(int64_t transaction_id, int64_t tablet_id, |
931 | 0 | DeltaWriter* delta_writer) { |
932 | 0 | std::lock_guard<std::shared_mutex> txn_wrlock( |
933 | 0 | _get_txn_tablet_delta_writer_map_lock(transaction_id)); |
934 | 0 | txn_tablet_delta_writer_map_t& txn_tablet_delta_writer_map = |
935 | 0 | _get_txn_tablet_delta_writer_map(transaction_id); |
936 | 0 | auto find = txn_tablet_delta_writer_map.find(transaction_id); |
937 | 0 | if (find == txn_tablet_delta_writer_map.end()) { |
938 | 0 | txn_tablet_delta_writer_map[transaction_id] = std::map<int64_t, DeltaWriter*>(); |
939 | 0 | } |
940 | 0 | txn_tablet_delta_writer_map[transaction_id][tablet_id] = delta_writer; |
941 | 0 | } |
942 | | |
943 | | void TxnManager::finish_slave_tablet_pull_rowset(int64_t transaction_id, int64_t tablet_id, |
944 | 0 | int64_t node_id, bool is_succeed) { |
945 | 0 | std::lock_guard<std::shared_mutex> txn_wrlock( |
946 | 0 | _get_txn_tablet_delta_writer_map_lock(transaction_id)); |
947 | 0 | txn_tablet_delta_writer_map_t& txn_tablet_delta_writer_map = |
948 | 0 | _get_txn_tablet_delta_writer_map(transaction_id); |
949 | 0 | auto find_txn = txn_tablet_delta_writer_map.find(transaction_id); |
950 | 0 | if (find_txn == txn_tablet_delta_writer_map.end()) { |
951 | 0 | LOG(WARNING) << "delta writer manager is not exist, txn_id=" << transaction_id |
952 | 0 | << ", tablet_id=" << tablet_id; |
953 | 0 | return; |
954 | 0 | } |
955 | 0 | auto find_tablet = txn_tablet_delta_writer_map[transaction_id].find(tablet_id); |
956 | 0 | if (find_tablet == txn_tablet_delta_writer_map[transaction_id].end()) { |
957 | 0 | LOG(WARNING) << "delta writer is not exist, txn_id=" << transaction_id |
958 | 0 | << ", tablet_id=" << tablet_id; |
959 | 0 | return; |
960 | 0 | } |
961 | 0 | DeltaWriter* delta_writer = txn_tablet_delta_writer_map[transaction_id][tablet_id]; |
962 | 0 | delta_writer->finish_slave_tablet_pull_rowset(node_id, is_succeed); |
963 | 0 | } |
964 | | |
965 | 0 | void TxnManager::clear_txn_tablet_delta_writer(int64_t transaction_id) { |
966 | 0 | std::lock_guard<std::shared_mutex> txn_wrlock( |
967 | 0 | _get_txn_tablet_delta_writer_map_lock(transaction_id)); |
968 | 0 | txn_tablet_delta_writer_map_t& txn_tablet_delta_writer_map = |
969 | 0 | _get_txn_tablet_delta_writer_map(transaction_id); |
970 | 0 | auto it = txn_tablet_delta_writer_map.find(transaction_id); |
971 | 0 | if (it != txn_tablet_delta_writer_map.end()) { |
972 | 0 | txn_tablet_delta_writer_map.erase(it); |
973 | 0 | } |
974 | 0 | VLOG_CRITICAL << "remove delta writer manager, txn_id=" << transaction_id; |
975 | 0 | } |
976 | | |
977 | 944 | int64_t TxnManager::get_txn_by_tablet_version(int64_t tablet_id, int64_t version) { |
978 | 944 | char key[16]; |
979 | 944 | memcpy(key, &tablet_id, sizeof(int64_t)); |
980 | 944 | memcpy(key + sizeof(int64_t), &version, sizeof(int64_t)); |
981 | 944 | CacheKey cache_key((const char*)&key, sizeof(key)); |
982 | | |
983 | 944 | auto* handle = _tablet_version_cache->lookup(cache_key); |
984 | 944 | if (handle == nullptr) { |
985 | 939 | return -1; |
986 | 939 | } |
987 | 5 | int64_t res = ((CacheValue*)_tablet_version_cache->value(handle))->value; |
988 | 5 | _tablet_version_cache->release(handle); |
989 | 5 | return res; |
990 | 944 | } |
991 | | |
992 | 942 | void TxnManager::update_tablet_version_txn(int64_t tablet_id, int64_t version, int64_t txn_id) { |
993 | 942 | char key[16]; |
994 | 942 | memcpy(key, &tablet_id, sizeof(int64_t)); |
995 | 942 | memcpy(key + sizeof(int64_t), &version, sizeof(int64_t)); |
996 | 942 | CacheKey cache_key((const char*)&key, sizeof(key)); |
997 | | |
998 | 942 | auto* value = new CacheValue; |
999 | 942 | value->value = txn_id; |
1000 | 942 | auto* handle = _tablet_version_cache->insert(cache_key, value, 1, sizeof(txn_id), |
1001 | 942 | CachePriority::NORMAL); |
1002 | 942 | _tablet_version_cache->release(handle); |
1003 | 942 | } |
1004 | | |
1005 | | TxnState TxnManager::get_txn_state(TPartitionId partition_id, TTransactionId transaction_id, |
1006 | 0 | TTabletId tablet_id, TabletUid tablet_uid) { |
1007 | 0 | pair<int64_t, int64_t> key(partition_id, transaction_id); |
1008 | 0 | TabletInfo tablet_info(tablet_id, tablet_uid); |
1009 | |
|
1010 | 0 | std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id)); |
1011 | |
|
1012 | 0 | auto& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
1013 | 0 | auto it = txn_tablet_map.find(key); |
1014 | 0 | if (it == txn_tablet_map.end()) { |
1015 | 0 | return TxnState::NOT_FOUND; |
1016 | 0 | } |
1017 | | |
1018 | 0 | auto& tablet_txn_info_map = it->second; |
1019 | 0 | auto tablet_txn_info_iter = tablet_txn_info_map.find(tablet_info); |
1020 | 0 | if (tablet_txn_info_iter == tablet_txn_info_map.end()) { |
1021 | 0 | return TxnState::NOT_FOUND; |
1022 | 0 | } |
1023 | | |
1024 | 0 | const auto& txn_info = tablet_txn_info_iter->second; |
1025 | 0 | return txn_info->state; |
1026 | 0 | } |
1027 | | |
1028 | | } // namespace doris |