/root/doris/be/src/olap/txn_manager.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "txn_manager.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <fmt/ranges.h> |
22 | | #include <thrift/protocol/TDebugProtocol.h> |
23 | | #include <time.h> |
24 | | |
25 | | #include <filesystem> |
26 | | #include <iterator> |
27 | | #include <list> |
28 | | #include <new> |
29 | | #include <ostream> |
30 | | #include <queue> |
31 | | #include <set> |
32 | | #include <string> |
33 | | |
34 | | #include "common/config.h" |
35 | | #include "common/logging.h" |
36 | | #include "common/status.h" |
37 | | #include "olap/data_dir.h" |
38 | | #include "olap/delta_writer.h" |
39 | | #include "olap/olap_common.h" |
40 | | #include "olap/partial_update_info.h" |
41 | | #include "olap/rowset/pending_rowset_helper.h" |
42 | | #include "olap/rowset/rowset_meta.h" |
43 | | #include "olap/rowset/rowset_meta_manager.h" |
44 | | #include "olap/schema_change.h" |
45 | | #include "olap/segment_loader.h" |
46 | | #include "olap/storage_engine.h" |
47 | | #include "olap/tablet_manager.h" |
48 | | #include "olap/tablet_meta.h" |
49 | | #include "olap/tablet_meta_manager.h" |
50 | | #include "olap/task/engine_publish_version_task.h" |
51 | | #include "util/debug_points.h" |
52 | | #include "util/time.h" |
53 | | |
54 | | namespace doris { |
55 | | class OlapMeta; |
56 | | } // namespace doris |
57 | | |
58 | | using std::map; |
59 | | using std::pair; |
60 | | using std::set; |
61 | | using std::string; |
62 | | using std::stringstream; |
63 | | using std::vector; |
64 | | |
65 | | namespace doris { |
66 | | using namespace ErrorCode; |
67 | | |
68 | | TxnManager::TxnManager(StorageEngine& engine, int32_t txn_map_shard_size, int32_t txn_shard_size) |
69 | | : _engine(engine), |
70 | | _txn_map_shard_size(txn_map_shard_size), |
71 | 134 | _txn_shard_size(txn_shard_size) { |
72 | 134 | DCHECK_GT(_txn_map_shard_size, 0); |
73 | 134 | DCHECK_GT(_txn_shard_size, 0); |
74 | 134 | DCHECK_EQ(_txn_map_shard_size & (_txn_map_shard_size - 1), 0); |
75 | 134 | DCHECK_EQ(_txn_shard_size & (_txn_shard_size - 1), 0); |
76 | 134 | _txn_map_locks = new std::shared_mutex[_txn_map_shard_size]; |
77 | 134 | _txn_tablet_maps = new txn_tablet_map_t[_txn_map_shard_size]; |
78 | 134 | _txn_partition_maps = new txn_partition_map_t[_txn_map_shard_size]; |
79 | 134 | _txn_mutex = new std::shared_mutex[_txn_shard_size]; |
80 | 134 | _txn_tablet_delta_writer_map = new txn_tablet_delta_writer_map_t[_txn_map_shard_size]; |
81 | 134 | _txn_tablet_delta_writer_map_locks = new std::shared_mutex[_txn_map_shard_size]; |
82 | | // For debugging |
83 | 134 | _tablet_version_cache = std::make_unique<TabletVersionCache>(100000); |
84 | 134 | } |
85 | | |
86 | | // prepare txn should always be allowed because ingest task will be retried |
87 | | // could not distinguish rollup, schema change or base table, prepare txn successfully will allow |
88 | | // ingest retried |
89 | | Status TxnManager::prepare_txn(TPartitionId partition_id, const Tablet& tablet, |
90 | | TTransactionId transaction_id, const PUniqueId& load_id, |
91 | 24 | bool ingest) { |
92 | | // check if the tablet has already been shutdown. If it has, it indicates that |
93 | | // it is an old tablet, and data should not be imported into the old tablet. |
94 | | // Otherwise, it may lead to data loss during migration. |
95 | 24 | if (tablet.tablet_state() == TABLET_SHUTDOWN) { |
96 | 0 | return Status::InternalError<false>( |
97 | 0 | "The tablet's state is shutdown, tablet_id: {}. The tablet may have been dropped " |
98 | 0 | "or migrationed. Please check if the table has been dropped or try again.", |
99 | 0 | tablet.tablet_id()); |
100 | 0 | } |
101 | 24 | return prepare_txn(partition_id, transaction_id, tablet.tablet_id(), tablet.tablet_uid(), |
102 | 24 | load_id, ingest); |
103 | 24 | } |
104 | | |
105 | | // most used for ut |
106 | | Status TxnManager::prepare_txn(TPartitionId partition_id, TTransactionId transaction_id, |
107 | | TTabletId tablet_id, TabletUid tablet_uid, const PUniqueId& load_id, |
108 | 32 | bool ingest) { |
109 | 32 | TxnKey key(partition_id, transaction_id); |
110 | 32 | TabletInfo tablet_info(tablet_id, tablet_uid); |
111 | 32 | std::lock_guard<std::shared_mutex> txn_wrlock(_get_txn_map_lock(transaction_id)); |
112 | 32 | txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
113 | | |
114 | 32 | DBUG_EXECUTE_IF("TxnManager.prepare_txn.random_failed", { |
115 | 32 | if (rand() % 100 < (100 * dp->param("percent", 0.5))) { |
116 | 32 | LOG_WARNING("TxnManager.prepare_txn.random_failed random failed") |
117 | 32 | .tag("txn_id", transaction_id) |
118 | 32 | .tag("tablet_id", tablet_id); |
119 | 32 | return Status::InternalError("debug prepare txn random failed"); |
120 | 32 | } |
121 | 32 | }); |
122 | 32 | DBUG_EXECUTE_IF("TxnManager.prepare_txn.wait", { |
123 | 32 | if (auto wait = dp->param<int>("duration", 0); wait > 0) { |
124 | 32 | LOG_WARNING("TxnManager.prepare_txn.wait") |
125 | 32 | .tag("txn_id", transaction_id) |
126 | 32 | .tag("tablet_id", tablet_id) |
127 | 32 | .tag("wait ms", wait); |
128 | 32 | std::this_thread::sleep_for(std::chrono::milliseconds(wait)); |
129 | 32 | } |
130 | 32 | }); |
131 | | |
132 | | /// Step 1: check if the transaction is already exist |
133 | 32 | do { |
134 | 32 | auto iter = txn_tablet_map.find(key); |
135 | 32 | if (iter == txn_tablet_map.end()) { |
136 | 28 | break; |
137 | 28 | } |
138 | | |
139 | | // exist TxnKey |
140 | 4 | auto& txn_tablet_info_map = iter->second; |
141 | 4 | auto load_itr = txn_tablet_info_map.find(tablet_info); |
142 | 4 | if (load_itr == txn_tablet_info_map.end()) { |
143 | 2 | break; |
144 | 2 | } |
145 | | |
146 | | // found load for txn,tablet |
147 | 2 | auto& load_info = load_itr->second; |
148 | | // case 1: user commit rowset, then the load id must be equal |
149 | | // check if load id is equal |
150 | 2 | auto& load_id = load_info->load_id; |
151 | 2 | if (load_info->load_id.hi() == load_id.hi() && load_info->load_id.lo() == load_id.lo() && |
152 | 2 | load_info->rowset != nullptr) { |
153 | 1 | LOG(WARNING) << "find transaction exists when add to engine." |
154 | 1 | << "partition_id: " << key.first << ", transaction_id: " << key.second |
155 | 1 | << ", tablet: " << tablet_info.to_string(); |
156 | 1 | return Status::OK(); |
157 | 1 | } |
158 | 2 | } while (false); |
159 | | |
160 | | /// Step 2: check if there are too many transactions on running. |
161 | | // check if there are too many transactions on running. |
162 | | // if yes, reject the request. |
163 | 31 | txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id); |
164 | 31 | if (txn_partition_map.size() > config::max_runnings_transactions_per_txn_map) { |
165 | 0 | return Status::Error<TOO_MANY_TRANSACTIONS>("too many transactions: {}, limit: {}", |
166 | 0 | txn_tablet_map.size(), |
167 | 0 | config::max_runnings_transactions_per_txn_map); |
168 | 0 | } |
169 | | |
170 | | /// Step 3: Add transaction to engine |
171 | | // not found load id |
172 | | // case 1: user start a new txn, rowset = null |
173 | | // case 2: loading txn from meta env |
174 | 31 | auto load_info = std::make_shared<TabletTxnInfo>(load_id, nullptr, ingest); |
175 | 31 | load_info->prepare(); |
176 | 31 | txn_tablet_map[key][tablet_info] = std::move(load_info); |
177 | 31 | _insert_txn_partition_map_unlocked(transaction_id, partition_id); |
178 | 31 | VLOG_NOTICE << "add transaction to engine successfully." |
179 | 0 | << "partition_id: " << key.first << ", transaction_id: " << key.second |
180 | 0 | << ", tablet: " << tablet_info.to_string(); |
181 | 31 | return Status::OK(); |
182 | 31 | } |
183 | | |
184 | | Status TxnManager::commit_txn(TPartitionId partition_id, const Tablet& tablet, |
185 | | TTransactionId transaction_id, const PUniqueId& load_id, |
186 | | const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard, |
187 | | bool is_recovery, |
188 | 19 | std::shared_ptr<PartialUpdateInfo> partial_update_info) { |
189 | 19 | return commit_txn(tablet.data_dir()->get_meta(), partition_id, transaction_id, |
190 | 19 | tablet.tablet_id(), tablet.tablet_uid(), load_id, rowset_ptr, |
191 | 19 | std::move(guard), is_recovery, partial_update_info); |
192 | 19 | } |
193 | | |
194 | | Status TxnManager::publish_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, |
195 | | TTransactionId transaction_id, const Version& version, |
196 | 0 | TabletPublishStatistics* stats) { |
197 | 0 | return publish_txn(tablet->data_dir()->get_meta(), partition_id, transaction_id, |
198 | 0 | tablet->tablet_id(), tablet->tablet_uid(), version, stats); |
199 | 0 | } |
200 | | |
201 | | void TxnManager::abort_txn(TPartitionId partition_id, TTransactionId transaction_id, |
202 | 0 | TTabletId tablet_id, TabletUid tablet_uid) { |
203 | 0 | pair<int64_t, int64_t> key(partition_id, transaction_id); |
204 | 0 | TabletInfo tablet_info(tablet_id, tablet_uid); |
205 | |
|
206 | 0 | std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id)); |
207 | |
|
208 | 0 | auto& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
209 | 0 | auto it = txn_tablet_map.find(key); |
210 | 0 | if (it == txn_tablet_map.end()) { |
211 | 0 | return; |
212 | 0 | } |
213 | | |
214 | 0 | auto& tablet_txn_info_map = it->second; |
215 | 0 | auto tablet_txn_info_iter = tablet_txn_info_map.find(tablet_info); |
216 | 0 | if (tablet_txn_info_iter == tablet_txn_info_map.end()) { |
217 | 0 | return; |
218 | 0 | } |
219 | | |
220 | 0 | auto& txn_info = tablet_txn_info_iter->second; |
221 | 0 | txn_info->abort(); |
222 | 0 | } |
223 | | |
224 | | // delete the txn from manager if it is not committed(not have a valid rowset) |
225 | | Status TxnManager::rollback_txn(TPartitionId partition_id, const Tablet& tablet, |
226 | 5 | TTransactionId transaction_id) { |
227 | 5 | return rollback_txn(partition_id, transaction_id, tablet.tablet_id(), tablet.tablet_uid()); |
228 | 5 | } |
229 | | |
230 | | Status TxnManager::delete_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, |
231 | 0 | TTransactionId transaction_id) { |
232 | 0 | return delete_txn(tablet->data_dir()->get_meta(), partition_id, transaction_id, |
233 | 0 | tablet->tablet_id(), tablet->tablet_uid()); |
234 | 0 | } |
235 | | |
236 | | void TxnManager::set_txn_related_delete_bitmap( |
237 | | TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, |
238 | | TabletUid tablet_uid, bool unique_key_merge_on_write, DeleteBitmapPtr delete_bitmap, |
239 | | const RowsetIdUnorderedSet& rowset_ids, |
240 | 4 | std::shared_ptr<PartialUpdateInfo> partial_update_info) { |
241 | 4 | pair<int64_t, int64_t> key(partition_id, transaction_id); |
242 | 4 | TabletInfo tablet_info(tablet_id, tablet_uid); |
243 | | |
244 | 4 | std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id)); |
245 | 4 | { |
246 | | // get tx |
247 | 4 | std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id)); |
248 | 4 | txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
249 | 4 | auto it = txn_tablet_map.find(key); |
250 | 4 | if (it == txn_tablet_map.end()) { |
251 | 1 | LOG(WARNING) << "transaction_id: " << transaction_id |
252 | 1 | << " partition_id: " << partition_id << " may be cleared"; |
253 | 1 | return; |
254 | 1 | } |
255 | 3 | auto load_itr = it->second.find(tablet_info); |
256 | 3 | if (load_itr == it->second.end()) { |
257 | 0 | LOG(WARNING) << "transaction_id: " << transaction_id |
258 | 0 | << " partition_id: " << partition_id << " tablet_id: " << tablet_id |
259 | 0 | << " may be cleared"; |
260 | 0 | return; |
261 | 0 | } |
262 | 3 | auto& load_info = load_itr->second; |
263 | 3 | load_info->unique_key_merge_on_write = unique_key_merge_on_write; |
264 | 3 | load_info->delete_bitmap = delete_bitmap; |
265 | 3 | load_info->rowset_ids = rowset_ids; |
266 | 3 | load_info->partial_update_info = partial_update_info; |
267 | 3 | } |
268 | 3 | } |
269 | | |
270 | | Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id, |
271 | | TTransactionId transaction_id, TTabletId tablet_id, |
272 | | TabletUid tablet_uid, const PUniqueId& load_id, |
273 | | const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard, |
274 | | bool is_recovery, |
275 | 30 | std::shared_ptr<PartialUpdateInfo> partial_update_info) { |
276 | 30 | if (partition_id < 1 || transaction_id < 1 || tablet_id < 1) { |
277 | 0 | LOG(WARNING) << "invalid commit req " |
278 | 0 | << " partition_id=" << partition_id << " transaction_id=" << transaction_id |
279 | 0 | << " tablet_id=" << tablet_id; |
280 | 0 | return Status::InternalError("invalid partition id"); |
281 | 0 | } |
282 | | |
283 | 30 | pair<int64_t, int64_t> key(partition_id, transaction_id); |
284 | 30 | TabletInfo tablet_info(tablet_id, tablet_uid); |
285 | 30 | if (rowset_ptr == nullptr) { |
286 | 0 | return Status::Error<ROWSET_INVALID>( |
287 | 0 | "could not commit txn because rowset ptr is null. partition_id: {}, " |
288 | 0 | "transaction_id: {}, tablet: {}", |
289 | 0 | key.first, key.second, tablet_info.to_string()); |
290 | 0 | } |
291 | | |
292 | 30 | DBUG_EXECUTE_IF("TxnManager.commit_txn.random_failed", { |
293 | 30 | if (rand() % 100 < (100 * dp->param("percent", 0.5))) { |
294 | 30 | LOG_WARNING("TxnManager.commit_txn.random_failed") |
295 | 30 | .tag("txn_id", transaction_id) |
296 | 30 | .tag("tablet_id", tablet_id); |
297 | 30 | return Status::InternalError("debug commit txn random failed"); |
298 | 30 | } |
299 | 30 | }); |
300 | 30 | DBUG_EXECUTE_IF("TxnManager.commit_txn.wait", { |
301 | 30 | if (auto wait = dp->param<int>("duration", 0); wait > 0) { |
302 | 30 | LOG_WARNING("TxnManager.commit_txn.wait") |
303 | 30 | .tag("txn_id", transaction_id) |
304 | 30 | .tag("tablet_id", tablet_id) |
305 | 30 | .tag("wait ms", wait); |
306 | 30 | std::this_thread::sleep_for(std::chrono::milliseconds(wait)); |
307 | 30 | } |
308 | 30 | }); |
309 | | |
310 | 30 | std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id)); |
311 | | // this while loop just run only once, just for if break |
312 | 30 | do { |
313 | | // get tx |
314 | 30 | std::shared_lock rdlock(_get_txn_map_lock(transaction_id)); |
315 | 30 | auto rs_pb = rowset_ptr->rowset_meta()->get_rowset_pb(); |
316 | | // TODO(dx): remove log after fix partition id eq 0 bug |
317 | 30 | if (!rs_pb.has_partition_id() || rs_pb.partition_id() == 0) { |
318 | 1 | rowset_ptr->rowset_meta()->set_partition_id(partition_id); |
319 | 1 | LOG(WARNING) << "cant get partition id from rs pb, get from func arg partition_id=" |
320 | 1 | << partition_id; |
321 | 1 | } |
322 | 30 | txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
323 | 30 | auto it = txn_tablet_map.find(key); |
324 | 30 | if (it == txn_tablet_map.end()) { |
325 | 8 | break; |
326 | 8 | } |
327 | | |
328 | 22 | auto load_itr = it->second.find(tablet_info); |
329 | 22 | if (load_itr == it->second.end()) { |
330 | 0 | break; |
331 | 0 | } |
332 | | |
333 | | // found load for txn,tablet |
334 | | // case 1: user commit rowset, then the load id must be equal |
335 | 22 | auto& load_info = load_itr->second; |
336 | | // check if load id is equal |
337 | 22 | if (load_info->rowset == nullptr) { |
338 | 20 | break; |
339 | 20 | } |
340 | | |
341 | 2 | if (load_info->load_id.hi() != load_id.hi() || load_info->load_id.lo() != load_id.lo()) { |
342 | 0 | break; |
343 | 0 | } |
344 | | |
345 | | // find a rowset with same rowset id, then it means a duplicate call |
346 | 2 | if (load_info->rowset->rowset_id() == rowset_ptr->rowset_id()) { |
347 | 1 | LOG(INFO) << "find rowset exists when commit transaction to engine." |
348 | 1 | << "partition_id: " << key.first << ", transaction_id: " << key.second |
349 | 1 | << ", tablet: " << tablet_info.to_string() |
350 | 1 | << ", rowset_id: " << load_info->rowset->rowset_id(); |
351 | | // Should not remove this rowset from pending rowsets |
352 | 1 | load_info->pending_rs_guard = std::move(guard); |
353 | 1 | return Status::OK(); |
354 | 1 | } |
355 | | |
356 | | // find a rowset with different rowset id, then it should not happen, just return errors |
357 | 1 | return Status::Error<PUSH_TRANSACTION_ALREADY_EXIST>( |
358 | 1 | "find rowset exists when commit transaction to engine. but rowset ids are not " |
359 | 1 | "same. partition_id: {}, transaction_id: {}, tablet: {}, exist rowset_id: {}, new " |
360 | 1 | "rowset_id: {}", |
361 | 1 | key.first, key.second, tablet_info.to_string(), |
362 | 1 | load_info->rowset->rowset_id().to_string(), rowset_ptr->rowset_id().to_string()); |
363 | 2 | } while (false); |
364 | | |
365 | | // if not in recovery mode, then should persist the meta to meta env |
366 | | // save meta need access disk, it maybe very slow, so that it is not in global txn lock |
367 | | // it is under a single txn lock |
368 | 28 | if (!is_recovery) { |
369 | 28 | Status save_status = |
370 | 28 | RowsetMetaManager::save(meta, tablet_uid, rowset_ptr->rowset_id(), |
371 | 28 | rowset_ptr->rowset_meta()->get_rowset_pb(), false); |
372 | 28 | DBUG_EXECUTE_IF("TxnManager.RowsetMetaManager.save_wait", { |
373 | 28 | if (auto wait = dp->param<int>("duration", 0); wait > 0) { |
374 | 28 | LOG_WARNING("TxnManager.RowsetMetaManager.save_wait") |
375 | 28 | .tag("txn_id", transaction_id) |
376 | 28 | .tag("tablet_id", tablet_id) |
377 | 28 | .tag("wait ms", wait); |
378 | 28 | std::this_thread::sleep_for(std::chrono::milliseconds(wait)); |
379 | 28 | } |
380 | 28 | }); |
381 | 28 | if (!save_status.ok()) { |
382 | 0 | save_status.append(fmt::format(", txn id: {}", transaction_id)); |
383 | 0 | return save_status; |
384 | 0 | } |
385 | | |
386 | 28 | if (partial_update_info && partial_update_info->is_partial_update) { |
387 | 0 | PartialUpdateInfoPB partial_update_info_pb; |
388 | 0 | partial_update_info->to_pb(&partial_update_info_pb); |
389 | 0 | save_status = RowsetMetaManager::save_partial_update_info( |
390 | 0 | meta, tablet_id, partition_id, transaction_id, partial_update_info_pb); |
391 | 0 | if (!save_status.ok()) { |
392 | 0 | save_status.append(fmt::format(", txn_id: {}", transaction_id)); |
393 | 0 | return save_status; |
394 | 0 | } |
395 | 0 | } |
396 | 28 | } |
397 | | |
398 | 28 | TabletSharedPtr tablet; |
399 | 28 | std::shared_ptr<PartialUpdateInfo> decoded_partial_update_info {nullptr}; |
400 | 28 | if (is_recovery) { |
401 | 0 | tablet = _engine.tablet_manager()->get_tablet(tablet_id, tablet_uid); |
402 | 0 | if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) { |
403 | 0 | PartialUpdateInfoPB partial_update_info_pb; |
404 | 0 | auto st = RowsetMetaManager::try_get_partial_update_info( |
405 | 0 | meta, tablet_id, partition_id, transaction_id, &partial_update_info_pb); |
406 | 0 | if (st.ok()) { |
407 | 0 | decoded_partial_update_info = std::make_shared<PartialUpdateInfo>(); |
408 | 0 | decoded_partial_update_info->from_pb(&partial_update_info_pb); |
409 | 0 | DCHECK(decoded_partial_update_info->is_partial_update); |
410 | 0 | } else if (!st.is<META_KEY_NOT_FOUND>()) { |
411 | | // the load is not a partial update |
412 | 0 | return st; |
413 | 0 | } |
414 | 0 | } |
415 | 0 | } |
416 | | |
417 | 28 | { |
418 | 28 | std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id)); |
419 | 28 | auto load_info = std::make_shared<TabletTxnInfo>(load_id, rowset_ptr); |
420 | 28 | load_info->pending_rs_guard = std::move(guard); |
421 | 28 | if (is_recovery) { |
422 | 0 | if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) { |
423 | 0 | load_info->unique_key_merge_on_write = true; |
424 | 0 | load_info->delete_bitmap.reset(new DeleteBitmap(tablet->tablet_id())); |
425 | 0 | if (decoded_partial_update_info) { |
426 | 0 | LOG_INFO( |
427 | 0 | "get partial update info from RocksDB during recovery. txn_id={}, " |
428 | 0 | "partition_id={}, tablet_id={}, partial_update_info=[{}]", |
429 | 0 | transaction_id, partition_id, tablet_id, |
430 | 0 | decoded_partial_update_info->summary()); |
431 | 0 | load_info->partial_update_info = decoded_partial_update_info; |
432 | 0 | } |
433 | 0 | } |
434 | 0 | } |
435 | 28 | load_info->commit(); |
436 | | |
437 | 28 | txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
438 | 28 | txn_tablet_map[key][tablet_info] = std::move(load_info); |
439 | 28 | _insert_txn_partition_map_unlocked(transaction_id, partition_id); |
440 | 28 | VLOG_NOTICE << "commit transaction to engine successfully." |
441 | 0 | << " partition_id: " << key.first << ", transaction_id: " << key.second |
442 | 0 | << ", tablet: " << tablet_info.to_string() |
443 | 0 | << ", rowsetid: " << rowset_ptr->rowset_id() |
444 | 0 | << ", version: " << rowset_ptr->version().first; |
445 | 28 | } |
446 | 28 | return Status::OK(); |
447 | 28 | } |
448 | | |
449 | | // remove a txn from txn manager |
450 | | Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, |
451 | | TTransactionId transaction_id, TTabletId tablet_id, |
452 | | TabletUid tablet_uid, const Version& version, |
453 | 11 | TabletPublishStatistics* stats) { |
454 | 11 | auto tablet = _engine.tablet_manager()->get_tablet(tablet_id); |
455 | 11 | if (tablet == nullptr) { |
456 | 0 | return Status::OK(); |
457 | 0 | } |
458 | 11 | DCHECK(stats != nullptr); |
459 | | |
460 | 11 | pair<int64_t, int64_t> key(partition_id, transaction_id); |
461 | 11 | TabletInfo tablet_info(tablet_id, tablet_uid); |
462 | 11 | RowsetSharedPtr rowset; |
463 | 11 | std::shared_ptr<TabletTxnInfo> tablet_txn_info; |
464 | 11 | int64_t t1 = MonotonicMicros(); |
465 | | /// Step 1: get rowset, tablet_txn_info by key |
466 | 11 | { |
467 | 11 | std::shared_lock txn_rlock(_get_txn_lock(transaction_id)); |
468 | 11 | std::shared_lock txn_map_rlock(_get_txn_map_lock(transaction_id)); |
469 | 11 | stats->lock_wait_time_us += MonotonicMicros() - t1; |
470 | | |
471 | 11 | txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
472 | 11 | if (auto it = txn_tablet_map.find(key); it != txn_tablet_map.end()) { |
473 | 10 | auto& tablet_map = it->second; |
474 | 10 | if (auto txn_info_iter = tablet_map.find(tablet_info); |
475 | 10 | txn_info_iter != tablet_map.end()) { |
476 | | // found load for txn,tablet |
477 | | // case 1: user commit rowset, then the load id must be equal |
478 | 10 | tablet_txn_info = txn_info_iter->second; |
479 | 10 | rowset = tablet_txn_info->rowset; |
480 | 10 | } |
481 | 10 | } |
482 | 11 | } |
483 | 11 | if (rowset == nullptr) { |
484 | 1 | return Status::Error<TRANSACTION_NOT_EXIST>( |
485 | 1 | "publish txn failed, rowset not found. partition_id={}, transaction_id={}, " |
486 | 1 | "tablet={}", |
487 | 1 | partition_id, transaction_id, tablet_info.to_string()); |
488 | 1 | } |
489 | 10 | DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_before_save_rs_meta", { |
490 | 10 | if (rand() % 100 < (100 * dp->param("percent", 0.5))) { |
491 | 10 | LOG_WARNING("TxnManager.publish_txn.random_failed_before_save_rs_meta") |
492 | 10 | .tag("txn_id", transaction_id) |
493 | 10 | .tag("tablet_id", tablet_id); |
494 | 10 | return Status::InternalError("debug publish txn before save rs meta random failed"); |
495 | 10 | } |
496 | 10 | }); |
497 | 10 | DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_before_save_rs_meta", { |
498 | 10 | if (auto wait = dp->param<int>("duration", 0); wait > 0) { |
499 | 10 | LOG_WARNING("TxnManager.publish_txn.wait_before_save_rs_meta") |
500 | 10 | .tag("txn_id", transaction_id) |
501 | 10 | .tag("tablet_id", tablet_id) |
502 | 10 | .tag("wait ms", wait); |
503 | 10 | std::this_thread::sleep_for(std::chrono::milliseconds(wait)); |
504 | 10 | } |
505 | 10 | }); |
506 | | |
507 | | /// Step 2: make rowset visible |
508 | | // save meta need access disk, it maybe very slow, so that it is not in global txn lock |
509 | | // it is under a single txn lock |
510 | | // TODO(ygl): rowset is already set version here, memory is changed, if save failed |
511 | | // it maybe a fatal error |
512 | 10 | rowset->make_visible(version); |
513 | | |
514 | 10 | DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_after_save_rs_meta", { |
515 | 10 | if (rand() % 100 < (100 * dp->param("percent", 0.5))) { |
516 | 10 | LOG_WARNING("TxnManager.publish_txn.random_failed_after_save_rs_meta") |
517 | 10 | .tag("txn_id", transaction_id) |
518 | 10 | .tag("tablet_id", tablet_id); |
519 | 10 | return Status::InternalError("debug publish txn after save rs meta random failed"); |
520 | 10 | } |
521 | 10 | }); |
522 | 10 | DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_after_save_rs_meta", { |
523 | 10 | if (auto wait = dp->param<int>("duration", 0); wait > 0) { |
524 | 10 | LOG_WARNING("TxnManager.publish_txn.wait_after_save_rs_meta") |
525 | 10 | .tag("txn_id", transaction_id) |
526 | 10 | .tag("tablet_id", tablet_id) |
527 | 10 | .tag("wait ms", wait); |
528 | 10 | std::this_thread::sleep_for(std::chrono::milliseconds(wait)); |
529 | 10 | } |
530 | 10 | }); |
531 | | // update delete_bitmap |
532 | 10 | if (tablet_txn_info->unique_key_merge_on_write) { |
533 | 2 | int64_t t2 = MonotonicMicros(); |
534 | 2 | RETURN_IF_ERROR(tablet->update_delete_bitmap(tablet_txn_info.get(), transaction_id)); |
535 | 2 | int64_t t3 = MonotonicMicros(); |
536 | 2 | stats->calc_delete_bitmap_time_us = t3 - t2; |
537 | 2 | RETURN_IF_ERROR(TabletMetaManager::save_delete_bitmap( |
538 | 2 | tablet->data_dir(), tablet->tablet_id(), tablet_txn_info->delete_bitmap, |
539 | 2 | version.second)); |
540 | 2 | stats->save_meta_time_us = MonotonicMicros() - t3; |
541 | 2 | } |
542 | | |
543 | | /// Step 3: add to binlog |
544 | 10 | auto enable_binlog = tablet->is_enable_binlog(); |
545 | 10 | if (enable_binlog) { |
546 | 0 | auto status = rowset->add_to_binlog(); |
547 | 0 | if (!status.ok()) { |
548 | 0 | return Status::Error<ROWSET_ADD_TO_BINLOG_FAILED>( |
549 | 0 | "add rowset to binlog failed. when publish txn rowset_id: {}, tablet id: {}, " |
550 | 0 | "txn id: {}", |
551 | 0 | rowset->rowset_id().to_string(), tablet_id, transaction_id); |
552 | 0 | } |
553 | 0 | } |
554 | | |
555 | | /// Step 4: save meta |
556 | 10 | int64_t t5 = MonotonicMicros(); |
557 | 10 | auto status = RowsetMetaManager::save(meta, tablet_uid, rowset->rowset_id(), |
558 | 10 | rowset->rowset_meta()->get_rowset_pb(), enable_binlog); |
559 | 10 | stats->save_meta_time_us += MonotonicMicros() - t5; |
560 | 10 | if (!status.ok()) { |
561 | 0 | status.append(fmt::format(", txn id: {}", transaction_id)); |
562 | 0 | return status; |
563 | 0 | } |
564 | | |
565 | 10 | if (tablet_txn_info->unique_key_merge_on_write && tablet_txn_info->partial_update_info && |
566 | 10 | tablet_txn_info->partial_update_info->is_partial_update) { |
567 | 0 | status = RowsetMetaManager::remove_partial_update_info(meta, tablet_id, partition_id, |
568 | 0 | transaction_id); |
569 | 0 | if (!status) { |
570 | | // discard the error status and print the warning log |
571 | 0 | LOG_WARNING( |
572 | 0 | "fail to remove partial update info from RocksDB. txn_id={}, rowset_id={}, " |
573 | 0 | "tablet_id={}, tablet_uid={}", |
574 | 0 | transaction_id, rowset->rowset_id().to_string(), tablet_id, |
575 | 0 | tablet_uid.to_string()); |
576 | 0 | } |
577 | 0 | } |
578 | | |
579 | | // TODO(Drogon): remove these test codes |
580 | 10 | if (enable_binlog) { |
581 | 0 | auto version_str = fmt::format("{}", version.first); |
582 | 0 | VLOG_DEBUG << fmt::format("tabletid: {}, version: {}, binlog filepath: {}", tablet_id, |
583 | 0 | version_str, tablet->get_binlog_filepath(version_str)); |
584 | 0 | } |
585 | | |
586 | | /// Step 5: remove tablet_info from tnx_tablet_map |
587 | | // txn_tablet_map[key] empty, remove key from txn_tablet_map |
588 | 10 | int64_t t6 = MonotonicMicros(); |
589 | 10 | std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id)); |
590 | 10 | std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id)); |
591 | 10 | stats->lock_wait_time_us += MonotonicMicros() - t6; |
592 | 10 | txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
593 | 10 | if (auto it = txn_tablet_map.find(key); it != txn_tablet_map.end()) { |
594 | 10 | it->second.erase(tablet_info); |
595 | 10 | VLOG_NOTICE << "publish txn successfully." |
596 | 0 | << " partition_id: " << key.first << ", txn_id: " << key.second |
597 | 0 | << ", tablet_id: " << tablet_info.tablet_id |
598 | 0 | << ", rowsetid: " << rowset->rowset_id() << ", version: " << version.first |
599 | 0 | << "," << version.second; |
600 | 10 | if (it->second.empty()) { |
601 | 10 | txn_tablet_map.erase(it); |
602 | 10 | _clear_txn_partition_map_unlocked(transaction_id, partition_id); |
603 | 10 | } |
604 | 10 | } |
605 | | |
606 | 10 | return status; |
607 | 10 | } |
608 | | |
609 | | // txn could be rollbacked if it does not have related rowset |
610 | | // if the txn has related rowset then could not rollback it, because it |
611 | | // may be committed in another thread and our current thread meets errors when writing to data file |
612 | | // BE has to wait for fe call clear txn api |
613 | | Status TxnManager::rollback_txn(TPartitionId partition_id, TTransactionId transaction_id, |
614 | 7 | TTabletId tablet_id, TabletUid tablet_uid) { |
615 | 7 | pair<int64_t, int64_t> key(partition_id, transaction_id); |
616 | 7 | TabletInfo tablet_info(tablet_id, tablet_uid); |
617 | | |
618 | 7 | std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id)); |
619 | 7 | txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
620 | | |
621 | 7 | auto it = txn_tablet_map.find(key); |
622 | 7 | if (it == txn_tablet_map.end()) { |
623 | 0 | return Status::OK(); |
624 | 0 | } |
625 | | |
626 | 7 | auto& tablet_txn_info_map = it->second; |
627 | 7 | if (auto load_itr = tablet_txn_info_map.find(tablet_info); |
628 | 7 | load_itr != tablet_txn_info_map.end()) { |
629 | | // found load for txn,tablet |
630 | | // case 1: user commit rowset, then the load id must be equal |
631 | 7 | const auto& load_info = load_itr->second; |
632 | 7 | if (load_info->rowset != nullptr) { |
633 | 1 | return Status::Error<TRANSACTION_ALREADY_COMMITTED>( |
634 | 1 | "if rowset is not null, it means other thread may commit the rowset should " |
635 | 1 | "not delete txn any more"); |
636 | 1 | } |
637 | 7 | } |
638 | | |
639 | 6 | tablet_txn_info_map.erase(tablet_info); |
640 | 6 | LOG(INFO) << "rollback transaction from engine successfully." |
641 | 6 | << " partition_id: " << key.first << ", transaction_id: " << key.second |
642 | 6 | << ", tablet: " << tablet_info.to_string(); |
643 | 6 | if (tablet_txn_info_map.empty()) { |
644 | 6 | txn_tablet_map.erase(it); |
645 | 6 | _clear_txn_partition_map_unlocked(transaction_id, partition_id); |
646 | 6 | } |
647 | 6 | return Status::OK(); |
648 | 7 | } |
649 | | |
650 | | // fe call this api to clear unused rowsets in be |
651 | | // could not delete the rowset if it already has a valid version |
652 | | Status TxnManager::delete_txn(OlapMeta* meta, TPartitionId partition_id, |
653 | | TTransactionId transaction_id, TTabletId tablet_id, |
654 | 3 | TabletUid tablet_uid) { |
655 | 3 | pair<int64_t, int64_t> key(partition_id, transaction_id); |
656 | 3 | TabletInfo tablet_info(tablet_id, tablet_uid); |
657 | 3 | std::lock_guard<std::shared_mutex> txn_wrlock(_get_txn_map_lock(transaction_id)); |
658 | 3 | txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
659 | 3 | auto it = txn_tablet_map.find(key); |
660 | 3 | if (it == txn_tablet_map.end()) { |
661 | 0 | return Status::Error<TRANSACTION_NOT_EXIST>("key not founded from txn_tablet_map"); |
662 | 0 | } |
663 | 3 | auto load_itr = it->second.find(tablet_info); |
664 | 3 | if (load_itr != it->second.end()) { |
665 | | // found load for txn,tablet |
666 | | // case 1: user commit rowset, then the load id must be equal |
667 | 3 | auto& load_info = load_itr->second; |
668 | 3 | auto& rowset = load_info->rowset; |
669 | 3 | if (rowset != nullptr && meta != nullptr) { |
670 | 2 | if (!rowset->is_pending()) { |
671 | 0 | return Status::Error<TRANSACTION_ALREADY_COMMITTED>( |
672 | 0 | "could not delete transaction from engine, just remove it from memory not " |
673 | 0 | "delete from disk, because related rowset already published. partition_id: " |
674 | 0 | "{}, transaction_id: {}, tablet: {}, rowset id: {}, version: {}, state: {}", |
675 | 0 | key.first, key.second, tablet_info.to_string(), |
676 | 0 | rowset->rowset_id().to_string(), rowset->version().to_string(), |
677 | 0 | RowsetStatePB_Name(rowset->rowset_meta_state())); |
678 | 2 | } else { |
679 | 2 | static_cast<void>(RowsetMetaManager::remove(meta, tablet_uid, rowset->rowset_id())); |
680 | | #ifndef BE_TEST |
681 | | _engine.add_unused_rowset(rowset); |
682 | | #endif |
683 | 2 | VLOG_NOTICE << "delete transaction from engine successfully." |
684 | 0 | << " partition_id: " << key.first << ", transaction_id: " << key.second |
685 | 0 | << ", tablet: " << tablet_info.to_string() << ", rowset: " |
686 | 0 | << (rowset != nullptr ? rowset->rowset_id().to_string() : "0"); |
687 | 2 | } |
688 | 2 | } |
689 | 3 | it->second.erase(load_itr); |
690 | 3 | } |
691 | 3 | if (it->second.empty()) { |
692 | 3 | txn_tablet_map.erase(it); |
693 | 3 | _clear_txn_partition_map_unlocked(transaction_id, partition_id); |
694 | 3 | } |
695 | 3 | return Status::OK(); |
696 | 3 | } |
697 | | |
698 | | void TxnManager::get_tablet_related_txns(TTabletId tablet_id, TabletUid tablet_uid, |
699 | | int64_t* partition_id, |
700 | 4 | std::set<int64_t>* transaction_ids) { |
701 | 4 | if (partition_id == nullptr || transaction_ids == nullptr) { |
702 | 0 | LOG(WARNING) << "parameter is null when get transactions by tablet"; |
703 | 0 | return; |
704 | 0 | } |
705 | | |
706 | 4 | TabletInfo tablet_info(tablet_id, tablet_uid); |
707 | 8 | for (int32_t i = 0; i < _txn_map_shard_size; i++) { |
708 | 4 | std::shared_lock txn_rdlock(_txn_map_locks[i]); |
709 | 4 | txn_tablet_map_t& txn_tablet_map = _txn_tablet_maps[i]; |
710 | 4 | for (auto& it : txn_tablet_map) { |
711 | 0 | if (it.second.find(tablet_info) != it.second.end()) { |
712 | 0 | *partition_id = it.first.first; |
713 | 0 | transaction_ids->insert(it.first.second); |
714 | 0 | VLOG_NOTICE << "find transaction on tablet." |
715 | 0 | << "partition_id: " << it.first.first |
716 | 0 | << ", transaction_id: " << it.first.second |
717 | 0 | << ", tablet: " << tablet_info.to_string(); |
718 | 0 | } |
719 | 0 | } |
720 | 4 | } |
721 | 4 | } |
722 | | |
723 | | // force drop all txns related with the tablet |
724 | | // maybe lock error, because not get txn lock before remove from meta |
725 | | void TxnManager::force_rollback_tablet_related_txns(OlapMeta* meta, TTabletId tablet_id, |
726 | 0 | TabletUid tablet_uid) { |
727 | 0 | TabletInfo tablet_info(tablet_id, tablet_uid); |
728 | 0 | for (int32_t i = 0; i < _txn_map_shard_size; i++) { |
729 | 0 | std::lock_guard<std::shared_mutex> txn_wrlock(_txn_map_locks[i]); |
730 | 0 | txn_tablet_map_t& txn_tablet_map = _txn_tablet_maps[i]; |
731 | 0 | for (auto it = txn_tablet_map.begin(); it != txn_tablet_map.end();) { |
732 | 0 | auto load_itr = it->second.find(tablet_info); |
733 | 0 | if (load_itr != it->second.end()) { |
734 | 0 | auto& load_info = load_itr->second; |
735 | 0 | auto& rowset = load_info->rowset; |
736 | 0 | if (rowset != nullptr && meta != nullptr) { |
737 | 0 | LOG(INFO) << " delete transaction from engine " |
738 | 0 | << ", tablet: " << tablet_info.to_string() |
739 | 0 | << ", rowset id: " << rowset->rowset_id(); |
740 | 0 | static_cast<void>( |
741 | 0 | RowsetMetaManager::remove(meta, tablet_uid, rowset->rowset_id())); |
742 | 0 | } |
743 | 0 | LOG(INFO) << "remove tablet related txn." |
744 | 0 | << " partition_id: " << it->first.first |
745 | 0 | << ", transaction_id: " << it->first.second |
746 | 0 | << ", tablet: " << tablet_info.to_string() << ", rowset: " |
747 | 0 | << (rowset != nullptr ? rowset->rowset_id().to_string() : "0"); |
748 | 0 | it->second.erase(load_itr); |
749 | 0 | } |
750 | 0 | if (it->second.empty()) { |
751 | 0 | _clear_txn_partition_map_unlocked(it->first.second, it->first.first); |
752 | 0 | it = txn_tablet_map.erase(it); |
753 | 0 | } else { |
754 | 0 | ++it; |
755 | 0 | } |
756 | 0 | } |
757 | 0 | } |
758 | 0 | if (meta != nullptr) { |
759 | 0 | Status st = RowsetMetaManager::remove_tablet_related_partial_update_info(meta, tablet_id); |
760 | 0 | if (!st.ok()) { |
761 | 0 | LOG_WARNING("failed to partial update info, tablet_id={}, err={}", tablet_id, |
762 | 0 | st.to_string()); |
763 | 0 | } |
764 | 0 | } |
765 | 0 | } |
766 | | |
767 | | void TxnManager::get_txn_related_tablets(const TTransactionId transaction_id, |
768 | | TPartitionId partition_id, |
769 | 31 | std::map<TabletInfo, RowsetSharedPtr>* tablet_infos) { |
770 | | // get tablets in this transaction |
771 | 31 | pair<int64_t, int64_t> key(partition_id, transaction_id); |
772 | 31 | std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id)); |
773 | 31 | txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
774 | 31 | auto it = txn_tablet_map.find(key); |
775 | 31 | if (it == txn_tablet_map.end()) { |
776 | 2 | VLOG_NOTICE << "could not find tablet for" |
777 | 0 | << " partition_id=" << partition_id << ", transaction_id=" << transaction_id; |
778 | 2 | return; |
779 | 2 | } |
780 | 29 | auto& load_info_map = it->second; |
781 | | |
782 | | // each tablet |
783 | 35 | for (auto& load_info : load_info_map) { |
784 | 35 | const TabletInfo& tablet_info = load_info.first; |
785 | | // must not check rowset == null here, because if rowset == null |
786 | | // publish version should failed |
787 | 35 | tablet_infos->emplace(tablet_info, load_info.second->rowset); |
788 | 35 | } |
789 | 29 | } |
790 | | |
791 | 17 | void TxnManager::get_all_related_tablets(std::set<TabletInfo>* tablet_infos) { |
792 | 34 | for (int32_t i = 0; i < _txn_map_shard_size; i++) { |
793 | 17 | std::shared_lock txn_rdlock(_txn_map_locks[i]); |
794 | 17 | for (auto& it : _txn_tablet_maps[i]) { |
795 | 0 | for (auto& tablet_load_it : it.second) { |
796 | 0 | tablet_infos->emplace(tablet_load_it.first); |
797 | 0 | } |
798 | 0 | } |
799 | 17 | } |
800 | 17 | } |
801 | | |
802 | | void TxnManager::get_all_commit_tablet_txn_info_by_tablet( |
803 | 1 | const TabletSharedPtr& tablet, CommitTabletTxnInfoVec* commit_tablet_txn_info_vec) { |
804 | 2 | for (int32_t i = 0; i < _txn_map_shard_size; i++) { |
805 | 1 | std::shared_lock txn_rdlock(_txn_map_locks[i]); |
806 | 2 | for (const auto& [txn_key, load_info_map] : _txn_tablet_maps[i]) { |
807 | 2 | auto tablet_load_it = load_info_map.find(tablet->get_tablet_info()); |
808 | 2 | if (tablet_load_it != load_info_map.end()) { |
809 | 2 | const auto& [_, load_info] = *tablet_load_it; |
810 | 2 | const auto& rowset = load_info->rowset; |
811 | 2 | const auto& delete_bitmap = load_info->delete_bitmap; |
812 | 2 | if (!rowset || !delete_bitmap) { |
813 | 1 | continue; |
814 | 1 | } |
815 | 1 | commit_tablet_txn_info_vec->push_back({ |
816 | 1 | .transaction_id = txn_key.second, |
817 | 1 | .partition_id = txn_key.first, |
818 | 1 | .delete_bitmap = delete_bitmap, |
819 | 1 | .rowset_ids = load_info->rowset_ids, |
820 | 1 | .partial_update_info = load_info->partial_update_info, |
821 | 1 | }); |
822 | 1 | } |
823 | 2 | } |
824 | 1 | } |
825 | 1 | } |
826 | | |
827 | 0 | void TxnManager::build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>* expire_txn_map) { |
828 | 0 | int64_t now = UnixSeconds(); |
829 | | // traverse the txn map, and get all expired txns |
830 | 0 | for (int32_t i = 0; i < _txn_map_shard_size; i++) { |
831 | 0 | std::shared_lock txn_rdlock(_txn_map_locks[i]); |
832 | 0 | for (auto&& [txn_key, tablet_txn_infos] : _txn_tablet_maps[i]) { |
833 | 0 | auto txn_id = txn_key.second; |
834 | 0 | for (auto&& [tablet_info, txn_info] : tablet_txn_infos) { |
835 | 0 | double diff = difftime(now, txn_info->creation_time); |
836 | 0 | if (diff < config::pending_data_expire_time_sec) { |
837 | 0 | continue; |
838 | 0 | } |
839 | | |
840 | 0 | (*expire_txn_map)[tablet_info].push_back(txn_id); |
841 | 0 | if (VLOG_IS_ON(3)) { |
842 | 0 | VLOG_NOTICE << "find expired txn." |
843 | 0 | << " tablet=" << tablet_info.to_string() |
844 | 0 | << " transaction_id=" << txn_id << " exist_sec=" << diff; |
845 | 0 | } |
846 | 0 | } |
847 | 0 | } |
848 | 0 | } |
849 | 0 | } |
850 | | |
851 | | void TxnManager::get_partition_ids(const TTransactionId transaction_id, |
852 | 0 | std::vector<TPartitionId>* partition_ids) { |
853 | 0 | std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id)); |
854 | 0 | txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id); |
855 | 0 | auto it = txn_partition_map.find(transaction_id); |
856 | 0 | if (it != txn_partition_map.end()) { |
857 | 0 | for (int64_t partition_id : it->second) { |
858 | 0 | partition_ids->push_back(partition_id); |
859 | 0 | } |
860 | 0 | } |
861 | 0 | } |
862 | | |
863 | 59 | void TxnManager::_insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id) { |
864 | 59 | txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id); |
865 | 59 | auto find = txn_partition_map.find(transaction_id); |
866 | 59 | if (find == txn_partition_map.end()) { |
867 | 36 | txn_partition_map[transaction_id] = std::unordered_set<int64_t>(); |
868 | 36 | } |
869 | 59 | txn_partition_map[transaction_id].insert(partition_id); |
870 | 59 | } |
871 | | |
872 | 19 | void TxnManager::_clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id) { |
873 | 19 | txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id); |
874 | 19 | auto it = txn_partition_map.find(transaction_id); |
875 | 19 | if (it != txn_partition_map.end()) { |
876 | 19 | it->second.erase(partition_id); |
877 | 19 | if (it->second.empty()) { |
878 | 19 | txn_partition_map.erase(it); |
879 | 19 | } |
880 | 19 | } |
881 | 19 | } |
882 | | |
883 | | void TxnManager::add_txn_tablet_delta_writer(int64_t transaction_id, int64_t tablet_id, |
884 | 0 | DeltaWriter* delta_writer) { |
885 | 0 | std::lock_guard<std::shared_mutex> txn_wrlock( |
886 | 0 | _get_txn_tablet_delta_writer_map_lock(transaction_id)); |
887 | 0 | txn_tablet_delta_writer_map_t& txn_tablet_delta_writer_map = |
888 | 0 | _get_txn_tablet_delta_writer_map(transaction_id); |
889 | 0 | auto find = txn_tablet_delta_writer_map.find(transaction_id); |
890 | 0 | if (find == txn_tablet_delta_writer_map.end()) { |
891 | 0 | txn_tablet_delta_writer_map[transaction_id] = std::map<int64_t, DeltaWriter*>(); |
892 | 0 | } |
893 | 0 | txn_tablet_delta_writer_map[transaction_id][tablet_id] = delta_writer; |
894 | 0 | } |
895 | | |
896 | | void TxnManager::finish_slave_tablet_pull_rowset(int64_t transaction_id, int64_t tablet_id, |
897 | 0 | int64_t node_id, bool is_succeed) { |
898 | 0 | std::lock_guard<std::shared_mutex> txn_wrlock( |
899 | 0 | _get_txn_tablet_delta_writer_map_lock(transaction_id)); |
900 | 0 | txn_tablet_delta_writer_map_t& txn_tablet_delta_writer_map = |
901 | 0 | _get_txn_tablet_delta_writer_map(transaction_id); |
902 | 0 | auto find_txn = txn_tablet_delta_writer_map.find(transaction_id); |
903 | 0 | if (find_txn == txn_tablet_delta_writer_map.end()) { |
904 | 0 | LOG(WARNING) << "delta writer manager is not exist, txn_id=" << transaction_id |
905 | 0 | << ", tablet_id=" << tablet_id; |
906 | 0 | return; |
907 | 0 | } |
908 | 0 | auto find_tablet = txn_tablet_delta_writer_map[transaction_id].find(tablet_id); |
909 | 0 | if (find_tablet == txn_tablet_delta_writer_map[transaction_id].end()) { |
910 | 0 | LOG(WARNING) << "delta writer is not exist, txn_id=" << transaction_id |
911 | 0 | << ", tablet_id=" << tablet_id; |
912 | 0 | return; |
913 | 0 | } |
914 | 0 | DeltaWriter* delta_writer = txn_tablet_delta_writer_map[transaction_id][tablet_id]; |
915 | 0 | delta_writer->finish_slave_tablet_pull_rowset(node_id, is_succeed); |
916 | 0 | } |
917 | | |
918 | 0 | void TxnManager::clear_txn_tablet_delta_writer(int64_t transaction_id) { |
919 | 0 | std::lock_guard<std::shared_mutex> txn_wrlock( |
920 | 0 | _get_txn_tablet_delta_writer_map_lock(transaction_id)); |
921 | 0 | txn_tablet_delta_writer_map_t& txn_tablet_delta_writer_map = |
922 | 0 | _get_txn_tablet_delta_writer_map(transaction_id); |
923 | 0 | auto it = txn_tablet_delta_writer_map.find(transaction_id); |
924 | 0 | if (it != txn_tablet_delta_writer_map.end()) { |
925 | 0 | txn_tablet_delta_writer_map.erase(it); |
926 | 0 | } |
927 | 0 | VLOG_CRITICAL << "remove delta writer manager, txn_id=" << transaction_id; |
928 | 0 | } |
929 | | |
930 | 6 | int64_t TxnManager::get_txn_by_tablet_version(int64_t tablet_id, int64_t version) { |
931 | 6 | char key[16]; |
932 | 6 | memcpy(key, &tablet_id, sizeof(int64_t)); |
933 | 6 | memcpy(key + sizeof(int64_t), &version, sizeof(int64_t)); |
934 | 6 | CacheKey cache_key((const char*)&key, sizeof(key)); |
935 | | |
936 | 6 | auto* handle = _tablet_version_cache->lookup(cache_key); |
937 | 6 | if (handle == nullptr) { |
938 | 1 | return -1; |
939 | 1 | } |
940 | 5 | int64_t res = ((CacheValue*)_tablet_version_cache->value(handle))->value; |
941 | 5 | _tablet_version_cache->release(handle); |
942 | 5 | return res; |
943 | 6 | } |
944 | | |
945 | 4 | void TxnManager::update_tablet_version_txn(int64_t tablet_id, int64_t version, int64_t txn_id) { |
946 | 4 | char key[16]; |
947 | 4 | memcpy(key, &tablet_id, sizeof(int64_t)); |
948 | 4 | memcpy(key + sizeof(int64_t), &version, sizeof(int64_t)); |
949 | 4 | CacheKey cache_key((const char*)&key, sizeof(key)); |
950 | | |
951 | 4 | auto* value = new CacheValue; |
952 | 4 | value->value = txn_id; |
953 | 4 | auto* handle = _tablet_version_cache->insert(cache_key, value, 1, sizeof(txn_id), |
954 | 4 | CachePriority::NORMAL); |
955 | 4 | _tablet_version_cache->release(handle); |
956 | 4 | } |
957 | | |
958 | | TxnState TxnManager::get_txn_state(TPartitionId partition_id, TTransactionId transaction_id, |
959 | 0 | TTabletId tablet_id, TabletUid tablet_uid) { |
960 | 0 | pair<int64_t, int64_t> key(partition_id, transaction_id); |
961 | 0 | TabletInfo tablet_info(tablet_id, tablet_uid); |
962 | |
|
963 | 0 | std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id)); |
964 | |
|
965 | 0 | auto& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
966 | 0 | auto it = txn_tablet_map.find(key); |
967 | 0 | if (it == txn_tablet_map.end()) { |
968 | 0 | return TxnState::NOT_FOUND; |
969 | 0 | } |
970 | | |
971 | 0 | auto& tablet_txn_info_map = it->second; |
972 | 0 | auto tablet_txn_info_iter = tablet_txn_info_map.find(tablet_info); |
973 | 0 | if (tablet_txn_info_iter == tablet_txn_info_map.end()) { |
974 | 0 | return TxnState::NOT_FOUND; |
975 | 0 | } |
976 | | |
977 | 0 | const auto& txn_info = tablet_txn_info_iter->second; |
978 | 0 | return txn_info->state; |
979 | 0 | } |
980 | | |
981 | | } // namespace doris |