/root/doris/be/src/olap/txn_manager.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "txn_manager.h" |
19 | | |
20 | | #include <bvar/bvar.h> |
21 | | #include <fmt/format.h> |
22 | | #include <fmt/ranges.h> |
23 | | #include <thrift/protocol/TDebugProtocol.h> |
24 | | #include <time.h> |
25 | | |
26 | | #include <filesystem> |
27 | | #include <iterator> |
28 | | #include <list> |
29 | | #include <new> |
30 | | #include <ostream> |
31 | | #include <queue> |
32 | | #include <set> |
33 | | #include <string> |
34 | | |
35 | | #include "common/config.h" |
36 | | #include "common/logging.h" |
37 | | #include "common/status.h" |
38 | | #include "olap/data_dir.h" |
39 | | #include "olap/delta_writer.h" |
40 | | #include "olap/olap_common.h" |
41 | | #include "olap/partial_update_info.h" |
42 | | #include "olap/rowset/beta_rowset.h" |
43 | | #include "olap/rowset/pending_rowset_helper.h" |
44 | | #include "olap/rowset/rowset_meta.h" |
45 | | #include "olap/rowset/rowset_meta_manager.h" |
46 | | #include "olap/schema_change.h" |
47 | | #include "olap/segment_loader.h" |
48 | | #include "olap/storage_engine.h" |
49 | | #include "olap/tablet_manager.h" |
50 | | #include "olap/tablet_meta.h" |
51 | | #include "olap/tablet_meta_manager.h" |
52 | | #include "olap/task/engine_publish_version_task.h" |
53 | | #include "util/debug_points.h" |
54 | | #include "util/time.h" |
55 | | |
56 | | namespace doris { |
57 | | class OlapMeta; |
58 | | } // namespace doris |
59 | | |
60 | | using std::map; |
61 | | using std::pair; |
62 | | using std::set; |
63 | | using std::string; |
64 | | using std::stringstream; |
65 | | using std::vector; |
66 | | |
67 | | namespace doris { |
68 | | using namespace ErrorCode; |
69 | | |
70 | | bvar::Adder<int64_t> g_tablet_txn_info_txn_partitions_count("tablet_txn_info_txn_partitions_count"); |
71 | | |
72 | | TxnManager::TxnManager(StorageEngine& engine, int32_t txn_map_shard_size, int32_t txn_shard_size) |
73 | | : _engine(engine), |
74 | | _txn_map_shard_size(txn_map_shard_size), |
75 | 207 | _txn_shard_size(txn_shard_size) { |
76 | 207 | DCHECK_GT(_txn_map_shard_size, 0); |
77 | 207 | DCHECK_GT(_txn_shard_size, 0); |
78 | 207 | DCHECK_EQ(_txn_map_shard_size & (_txn_map_shard_size - 1), 0); |
79 | 207 | DCHECK_EQ(_txn_shard_size & (_txn_shard_size - 1), 0); |
80 | 207 | _txn_map_locks = new std::shared_mutex[_txn_map_shard_size]; |
81 | 207 | _txn_tablet_maps = new txn_tablet_map_t[_txn_map_shard_size]; |
82 | 207 | _txn_partition_maps = new txn_partition_map_t[_txn_map_shard_size]; |
83 | 207 | _txn_mutex = new std::shared_mutex[_txn_shard_size]; |
84 | 207 | _txn_tablet_delta_writer_map = new txn_tablet_delta_writer_map_t[_txn_map_shard_size]; |
85 | 207 | _txn_tablet_delta_writer_map_locks = new std::shared_mutex[_txn_map_shard_size]; |
86 | | // For debugging |
87 | 207 | _tablet_version_cache = std::make_unique<TabletVersionCache>(100000); |
88 | 207 | } |
89 | | |
90 | | // prepare txn should always be allowed because ingest task will be retried |
91 | | // could not distinguish rollup, schema change or base table, prepare txn successfully will allow |
92 | | // ingest retried |
93 | | Status TxnManager::prepare_txn(TPartitionId partition_id, const Tablet& tablet, |
94 | | TTransactionId transaction_id, const PUniqueId& load_id, |
95 | 28 | bool ingest) { |
96 | | // check if the tablet has already been shutdown. If it has, it indicates that |
97 | | // it is an old tablet, and data should not be imported into the old tablet. |
98 | | // Otherwise, it may lead to data loss during migration. |
99 | 28 | if (tablet.tablet_state() == TABLET_SHUTDOWN) { |
100 | 0 | return Status::InternalError<false>( |
101 | 0 | "The tablet's state is shutdown, tablet_id: {}. The tablet may have been dropped " |
102 | 0 | "or migrationed. Please check if the table has been dropped or try again.", |
103 | 0 | tablet.tablet_id()); |
104 | 0 | } |
105 | 28 | return prepare_txn(partition_id, transaction_id, tablet.tablet_id(), tablet.tablet_uid(), |
106 | 28 | load_id, ingest); |
107 | 28 | } |
108 | | |
109 | | // most used for ut |
110 | | Status TxnManager::prepare_txn(TPartitionId partition_id, TTransactionId transaction_id, |
111 | | TTabletId tablet_id, TabletUid tablet_uid, const PUniqueId& load_id, |
112 | 36 | bool ingest) { |
113 | 36 | TxnKey key(partition_id, transaction_id); |
114 | 36 | TabletInfo tablet_info(tablet_id, tablet_uid); |
115 | 36 | std::lock_guard<std::shared_mutex> txn_wrlock(_get_txn_map_lock(transaction_id)); |
116 | 36 | txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
117 | | |
118 | 36 | DBUG_EXECUTE_IF("TxnManager.prepare_txn.random_failed", { |
119 | 36 | if (rand() % 100 < (100 * dp->param("percent", 0.5))) { |
120 | 36 | LOG_WARNING("TxnManager.prepare_txn.random_failed random failed") |
121 | 36 | .tag("txn_id", transaction_id) |
122 | 36 | .tag("tablet_id", tablet_id); |
123 | 36 | return Status::InternalError("debug prepare txn random failed"); |
124 | 36 | } |
125 | 36 | }); |
126 | 36 | DBUG_EXECUTE_IF("TxnManager.prepare_txn.wait", { |
127 | 36 | if (auto wait = dp->param<int>("duration", 0); wait > 0) { |
128 | 36 | LOG_WARNING("TxnManager.prepare_txn.wait") |
129 | 36 | .tag("txn_id", transaction_id) |
130 | 36 | .tag("tablet_id", tablet_id) |
131 | 36 | .tag("wait ms", wait); |
132 | 36 | std::this_thread::sleep_for(std::chrono::milliseconds(wait)); |
133 | 36 | } |
134 | 36 | }); |
135 | | |
136 | | /// Step 1: check if the transaction is already exist |
137 | 36 | do { |
138 | 36 | auto iter = txn_tablet_map.find(key); |
139 | 36 | if (iter == txn_tablet_map.end()) { |
140 | 32 | break; |
141 | 32 | } |
142 | | |
143 | | // exist TxnKey |
144 | 4 | auto& txn_tablet_info_map = iter->second; |
145 | 4 | auto load_itr = txn_tablet_info_map.find(tablet_info); |
146 | 4 | if (load_itr == txn_tablet_info_map.end()) { |
147 | 2 | break; |
148 | 2 | } |
149 | | |
150 | | // found load for txn,tablet |
151 | 2 | auto& load_info = load_itr->second; |
152 | | // case 1: user commit rowset, then the load id must be equal |
153 | | // check if load id is equal |
154 | 2 | auto& load_id = load_info->load_id; |
155 | 2 | if (load_info->load_id.hi() == load_id.hi() && load_info->load_id.lo() == load_id.lo() && |
156 | 2 | load_info->rowset != nullptr) { |
157 | 1 | LOG(WARNING) << "find transaction exists when add to engine." |
158 | 1 | << "partition_id: " << key.first << ", transaction_id: " << key.second |
159 | 1 | << ", tablet: " << tablet_info.to_string(); |
160 | 1 | return Status::OK(); |
161 | 1 | } |
162 | 2 | } while (false); |
163 | | |
164 | | /// Step 2: check if there are too many transactions on running. |
165 | | // check if there are too many transactions on running. |
166 | | // if yes, reject the request. |
167 | 35 | txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id); |
168 | 35 | if (txn_partition_map.size() > config::max_runnings_transactions_per_txn_map) { |
169 | 0 | return Status::Error<TOO_MANY_TRANSACTIONS>("too many transactions: {}, limit: {}", |
170 | 0 | txn_tablet_map.size(), |
171 | 0 | config::max_runnings_transactions_per_txn_map); |
172 | 0 | } |
173 | | |
174 | | /// Step 3: Add transaction to engine |
175 | | // not found load id |
176 | | // case 1: user start a new txn, rowset = null |
177 | | // case 2: loading txn from meta env |
178 | 35 | auto load_info = std::make_shared<TabletTxnInfo>(load_id, nullptr, ingest); |
179 | 35 | load_info->prepare(); |
180 | 35 | if (!txn_tablet_map.contains(key)) { |
181 | 32 | g_tablet_txn_info_txn_partitions_count << 1; |
182 | 32 | } |
183 | 35 | txn_tablet_map[key][tablet_info] = std::move(load_info); |
184 | 35 | _insert_txn_partition_map_unlocked(transaction_id, partition_id); |
185 | 35 | VLOG_NOTICE << "add transaction to engine successfully." |
186 | 0 | << "partition_id: " << key.first << ", transaction_id: " << key.second |
187 | 0 | << ", tablet: " << tablet_info.to_string(); |
188 | 35 | return Status::OK(); |
189 | 35 | } |
190 | | |
191 | | Status TxnManager::commit_txn(TPartitionId partition_id, const Tablet& tablet, |
192 | | TTransactionId transaction_id, const PUniqueId& load_id, |
193 | | const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard, |
194 | | bool is_recovery, |
195 | 23 | std::shared_ptr<PartialUpdateInfo> partial_update_info) { |
196 | 23 | return commit_txn(tablet.data_dir()->get_meta(), partition_id, transaction_id, |
197 | 23 | tablet.tablet_id(), tablet.tablet_uid(), load_id, rowset_ptr, |
198 | 23 | std::move(guard), is_recovery, partial_update_info); |
199 | 23 | } |
200 | | |
201 | | Status TxnManager::publish_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, |
202 | | TTransactionId transaction_id, const Version& version, |
203 | 3 | TabletPublishStatistics* stats) { |
204 | 3 | return publish_txn(tablet->data_dir()->get_meta(), partition_id, transaction_id, |
205 | 3 | tablet->tablet_id(), tablet->tablet_uid(), version, stats); |
206 | 3 | } |
207 | | |
208 | | void TxnManager::abort_txn(TPartitionId partition_id, TTransactionId transaction_id, |
209 | 0 | TTabletId tablet_id, TabletUid tablet_uid) { |
210 | 0 | pair<int64_t, int64_t> key(partition_id, transaction_id); |
211 | 0 | TabletInfo tablet_info(tablet_id, tablet_uid); |
212 | |
|
213 | 0 | std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id)); |
214 | |
|
215 | 0 | auto& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
216 | 0 | auto it = txn_tablet_map.find(key); |
217 | 0 | if (it == txn_tablet_map.end()) { |
218 | 0 | return; |
219 | 0 | } |
220 | | |
221 | 0 | auto& tablet_txn_info_map = it->second; |
222 | 0 | auto tablet_txn_info_iter = tablet_txn_info_map.find(tablet_info); |
223 | 0 | if (tablet_txn_info_iter == tablet_txn_info_map.end()) { |
224 | 0 | return; |
225 | 0 | } |
226 | | |
227 | 0 | auto& txn_info = tablet_txn_info_iter->second; |
228 | 0 | txn_info->abort(); |
229 | 0 | } |
230 | | |
231 | | // delete the txn from manager if it is not committed(not have a valid rowset) |
232 | | Status TxnManager::rollback_txn(TPartitionId partition_id, const Tablet& tablet, |
233 | 5 | TTransactionId transaction_id) { |
234 | 5 | return rollback_txn(partition_id, transaction_id, tablet.tablet_id(), tablet.tablet_uid()); |
235 | 5 | } |
236 | | |
237 | | Status TxnManager::delete_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, |
238 | 0 | TTransactionId transaction_id) { |
239 | 0 | return delete_txn(tablet->data_dir()->get_meta(), partition_id, transaction_id, |
240 | 0 | tablet->tablet_id(), tablet->tablet_uid()); |
241 | 0 | } |
242 | | |
243 | | void TxnManager::set_txn_related_delete_bitmap( |
244 | | TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, |
245 | | TabletUid tablet_uid, bool unique_key_merge_on_write, DeleteBitmapPtr delete_bitmap, |
246 | | const RowsetIdUnorderedSet& rowset_ids, |
247 | 5 | std::shared_ptr<PartialUpdateInfo> partial_update_info) { |
248 | 5 | pair<int64_t, int64_t> key(partition_id, transaction_id); |
249 | 5 | TabletInfo tablet_info(tablet_id, tablet_uid); |
250 | | |
251 | 5 | std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id)); |
252 | 5 | { |
253 | | // get tx |
254 | 5 | std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id)); |
255 | 5 | txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
256 | 5 | auto it = txn_tablet_map.find(key); |
257 | 5 | if (it == txn_tablet_map.end()) { |
258 | 1 | LOG(WARNING) << "transaction_id: " << transaction_id |
259 | 1 | << " partition_id: " << partition_id << " may be cleared"; |
260 | 1 | return; |
261 | 1 | } |
262 | 4 | auto load_itr = it->second.find(tablet_info); |
263 | 4 | if (load_itr == it->second.end()) { |
264 | 0 | LOG(WARNING) << "transaction_id: " << transaction_id |
265 | 0 | << " partition_id: " << partition_id << " tablet_id: " << tablet_id |
266 | 0 | << " may be cleared"; |
267 | 0 | return; |
268 | 0 | } |
269 | 4 | auto& load_info = load_itr->second; |
270 | 4 | load_info->unique_key_merge_on_write = unique_key_merge_on_write; |
271 | 4 | load_info->delete_bitmap = delete_bitmap; |
272 | 4 | load_info->rowset_ids = rowset_ids; |
273 | 4 | load_info->partial_update_info = partial_update_info; |
274 | 4 | } |
275 | 4 | } |
276 | | |
277 | | Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id, |
278 | | TTransactionId transaction_id, TTabletId tablet_id, |
279 | | TabletUid tablet_uid, const PUniqueId& load_id, |
280 | | const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard, |
281 | | bool is_recovery, |
282 | 34 | std::shared_ptr<PartialUpdateInfo> partial_update_info) { |
283 | 34 | if (partition_id < 1 || transaction_id < 1 || tablet_id < 1) { |
284 | 0 | LOG(WARNING) << "invalid commit req " |
285 | 0 | << " partition_id=" << partition_id << " transaction_id=" << transaction_id |
286 | 0 | << " tablet_id=" << tablet_id; |
287 | 0 | return Status::InternalError("invalid partition id"); |
288 | 0 | } |
289 | | |
290 | 34 | pair<int64_t, int64_t> key(partition_id, transaction_id); |
291 | 34 | TabletInfo tablet_info(tablet_id, tablet_uid); |
292 | 34 | if (rowset_ptr == nullptr) { |
293 | 0 | return Status::Error<ROWSET_INVALID>( |
294 | 0 | "could not commit txn because rowset ptr is null. partition_id: {}, " |
295 | 0 | "transaction_id: {}, tablet: {}", |
296 | 0 | key.first, key.second, tablet_info.to_string()); |
297 | 0 | } |
298 | | |
299 | 34 | DBUG_EXECUTE_IF("TxnManager.commit_txn.random_failed", { |
300 | 34 | if (rand() % 100 < (100 * dp->param("percent", 0.5))) { |
301 | 34 | LOG_WARNING("TxnManager.commit_txn.random_failed") |
302 | 34 | .tag("txn_id", transaction_id) |
303 | 34 | .tag("tablet_id", tablet_id); |
304 | 34 | return Status::InternalError("debug commit txn random failed"); |
305 | 34 | } |
306 | 34 | }); |
307 | 34 | DBUG_EXECUTE_IF("TxnManager.commit_txn.wait", { |
308 | 34 | if (auto wait = dp->param<int>("duration", 0); wait > 0) { |
309 | 34 | LOG_WARNING("TxnManager.commit_txn.wait") |
310 | 34 | .tag("txn_id", transaction_id) |
311 | 34 | .tag("tablet_id", tablet_id) |
312 | 34 | .tag("wait ms", wait); |
313 | 34 | std::this_thread::sleep_for(std::chrono::milliseconds(wait)); |
314 | 34 | } |
315 | 34 | }); |
316 | | |
317 | 34 | std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id)); |
318 | | // this while loop just run only once, just for if break |
319 | 34 | do { |
320 | | // get tx |
321 | 34 | std::shared_lock rdlock(_get_txn_map_lock(transaction_id)); |
322 | 34 | auto rs_pb = rowset_ptr->rowset_meta()->get_rowset_pb(); |
323 | | // TODO(dx): remove log after fix partition id eq 0 bug |
324 | 34 | if (!rs_pb.has_partition_id() || rs_pb.partition_id() == 0) { |
325 | 1 | rowset_ptr->rowset_meta()->set_partition_id(partition_id); |
326 | 1 | LOG(WARNING) << "cant get partition id from rs pb, get from func arg partition_id=" |
327 | 1 | << partition_id; |
328 | 1 | } |
329 | 34 | txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
330 | 34 | auto it = txn_tablet_map.find(key); |
331 | 34 | if (it == txn_tablet_map.end()) { |
332 | 8 | break; |
333 | 8 | } |
334 | | |
335 | 26 | auto load_itr = it->second.find(tablet_info); |
336 | 26 | if (load_itr == it->second.end()) { |
337 | 0 | break; |
338 | 0 | } |
339 | | |
340 | | // found load for txn,tablet |
341 | | // case 1: user commit rowset, then the load id must be equal |
342 | 26 | auto& load_info = load_itr->second; |
343 | | // check if load id is equal |
344 | 26 | if (load_info->rowset == nullptr) { |
345 | 24 | break; |
346 | 24 | } |
347 | | |
348 | 2 | if (load_info->load_id.hi() != load_id.hi() || load_info->load_id.lo() != load_id.lo()) { |
349 | 0 | break; |
350 | 0 | } |
351 | | |
352 | | // find a rowset with same rowset id, then it means a duplicate call |
353 | 2 | if (load_info->rowset->rowset_id() == rowset_ptr->rowset_id()) { |
354 | 1 | LOG(INFO) << "find rowset exists when commit transaction to engine." |
355 | 1 | << "partition_id: " << key.first << ", transaction_id: " << key.second |
356 | 1 | << ", tablet: " << tablet_info.to_string() |
357 | 1 | << ", rowset_id: " << load_info->rowset->rowset_id(); |
358 | | // Should not remove this rowset from pending rowsets |
359 | 1 | load_info->pending_rs_guard = std::move(guard); |
360 | 1 | return Status::OK(); |
361 | 1 | } |
362 | | |
363 | | // find a rowset with different rowset id, then it should not happen, just return errors |
364 | 1 | return Status::Error<PUSH_TRANSACTION_ALREADY_EXIST>( |
365 | 1 | "find rowset exists when commit transaction to engine. but rowset ids are not " |
366 | 1 | "same. partition_id: {}, transaction_id: {}, tablet: {}, exist rowset_id: {}, new " |
367 | 1 | "rowset_id: {}", |
368 | 1 | key.first, key.second, tablet_info.to_string(), |
369 | 1 | load_info->rowset->rowset_id().to_string(), rowset_ptr->rowset_id().to_string()); |
370 | 2 | } while (false); |
371 | | |
372 | | // if not in recovery mode, then should persist the meta to meta env |
373 | | // save meta need access disk, it maybe very slow, so that it is not in global txn lock |
374 | | // it is under a single txn lock |
375 | 32 | if (!is_recovery) { |
376 | 32 | Status save_status = |
377 | 32 | RowsetMetaManager::save(meta, tablet_uid, rowset_ptr->rowset_id(), |
378 | 32 | rowset_ptr->rowset_meta()->get_rowset_pb(), false); |
379 | 32 | DBUG_EXECUTE_IF("TxnManager.RowsetMetaManager.save_wait", { |
380 | 32 | if (auto wait = dp->param<int>("duration", 0); wait > 0) { |
381 | 32 | LOG_WARNING("TxnManager.RowsetMetaManager.save_wait") |
382 | 32 | .tag("txn_id", transaction_id) |
383 | 32 | .tag("tablet_id", tablet_id) |
384 | 32 | .tag("wait ms", wait); |
385 | 32 | std::this_thread::sleep_for(std::chrono::milliseconds(wait)); |
386 | 32 | } |
387 | 32 | }); |
388 | 32 | if (!save_status.ok()) { |
389 | 0 | save_status.append(fmt::format(", txn id: {}", transaction_id)); |
390 | 0 | return save_status; |
391 | 0 | } |
392 | | |
393 | 32 | if (partial_update_info && partial_update_info->is_partial_update()) { |
394 | 0 | PartialUpdateInfoPB partial_update_info_pb; |
395 | 0 | partial_update_info->to_pb(&partial_update_info_pb); |
396 | 0 | save_status = RowsetMetaManager::save_partial_update_info( |
397 | 0 | meta, tablet_id, partition_id, transaction_id, partial_update_info_pb); |
398 | 0 | if (!save_status.ok()) { |
399 | 0 | save_status.append(fmt::format(", txn_id: {}", transaction_id)); |
400 | 0 | return save_status; |
401 | 0 | } |
402 | 0 | } |
403 | 32 | } |
404 | | |
405 | 32 | TabletSharedPtr tablet; |
406 | 32 | std::shared_ptr<PartialUpdateInfo> decoded_partial_update_info {nullptr}; |
407 | 32 | if (is_recovery) { |
408 | 0 | tablet = _engine.tablet_manager()->get_tablet(tablet_id, tablet_uid); |
409 | 0 | if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) { |
410 | 0 | PartialUpdateInfoPB partial_update_info_pb; |
411 | 0 | auto st = RowsetMetaManager::try_get_partial_update_info( |
412 | 0 | meta, tablet_id, partition_id, transaction_id, &partial_update_info_pb); |
413 | 0 | if (st.ok()) { |
414 | 0 | decoded_partial_update_info = std::make_shared<PartialUpdateInfo>(); |
415 | 0 | decoded_partial_update_info->from_pb(&partial_update_info_pb); |
416 | 0 | DCHECK(decoded_partial_update_info->is_partial_update()); |
417 | 0 | } else if (!st.is<META_KEY_NOT_FOUND>()) { |
418 | | // the load is not a partial update |
419 | 0 | return st; |
420 | 0 | } |
421 | 0 | } |
422 | 0 | } |
423 | | |
424 | 32 | { |
425 | 32 | std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id)); |
426 | 32 | auto load_info = std::make_shared<TabletTxnInfo>(load_id, rowset_ptr); |
427 | 32 | load_info->pending_rs_guard = std::move(guard); |
428 | 32 | if (is_recovery) { |
429 | 0 | if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) { |
430 | 0 | load_info->unique_key_merge_on_write = true; |
431 | 0 | load_info->delete_bitmap.reset(new DeleteBitmap(tablet->tablet_id())); |
432 | 0 | if (decoded_partial_update_info) { |
433 | 0 | LOG_INFO( |
434 | 0 | "get partial update info from RocksDB during recovery. txn_id={}, " |
435 | 0 | "partition_id={}, tablet_id={}, partial_update_info=[{}]", |
436 | 0 | transaction_id, partition_id, tablet_id, |
437 | 0 | decoded_partial_update_info->summary()); |
438 | 0 | load_info->partial_update_info = decoded_partial_update_info; |
439 | 0 | } |
440 | 0 | } |
441 | 0 | } |
442 | 32 | load_info->commit(); |
443 | | |
444 | 32 | txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
445 | 32 | txn_tablet_map[key][tablet_info] = std::move(load_info); |
446 | 32 | _insert_txn_partition_map_unlocked(transaction_id, partition_id); |
447 | 32 | VLOG_NOTICE << "commit transaction to engine successfully." |
448 | 0 | << " partition_id: " << key.first << ", transaction_id: " << key.second |
449 | 0 | << ", tablet: " << tablet_info.to_string() |
450 | 0 | << ", rowsetid: " << rowset_ptr->rowset_id() |
451 | 0 | << ", version: " << rowset_ptr->version().first; |
452 | 32 | } |
453 | 32 | return Status::OK(); |
454 | 32 | } |
455 | | |
456 | | // remove a txn from txn manager |
457 | | Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, |
458 | | TTransactionId transaction_id, TTabletId tablet_id, |
459 | | TabletUid tablet_uid, const Version& version, |
460 | 15 | TabletPublishStatistics* stats) { |
461 | 15 | auto tablet = _engine.tablet_manager()->get_tablet(tablet_id); |
462 | 15 | if (tablet == nullptr) { |
463 | 0 | return Status::OK(); |
464 | 0 | } |
465 | 15 | DCHECK(stats != nullptr); |
466 | | |
467 | 15 | pair<int64_t, int64_t> key(partition_id, transaction_id); |
468 | 15 | TabletInfo tablet_info(tablet_id, tablet_uid); |
469 | 15 | RowsetSharedPtr rowset; |
470 | 15 | std::shared_ptr<TabletTxnInfo> tablet_txn_info; |
471 | 15 | int64_t t1 = MonotonicMicros(); |
472 | | /// Step 1: get rowset, tablet_txn_info by key |
473 | 15 | { |
474 | 15 | std::shared_lock txn_rlock(_get_txn_lock(transaction_id)); |
475 | 15 | std::shared_lock txn_map_rlock(_get_txn_map_lock(transaction_id)); |
476 | 15 | stats->lock_wait_time_us += MonotonicMicros() - t1; |
477 | | |
478 | 15 | txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
479 | 15 | if (auto it = txn_tablet_map.find(key); it != txn_tablet_map.end()) { |
480 | 14 | auto& tablet_map = it->second; |
481 | 14 | if (auto txn_info_iter = tablet_map.find(tablet_info); |
482 | 14 | txn_info_iter != tablet_map.end()) { |
483 | | // found load for txn,tablet |
484 | | // case 1: user commit rowset, then the load id must be equal |
485 | 14 | tablet_txn_info = txn_info_iter->second; |
486 | 14 | rowset = tablet_txn_info->rowset; |
487 | 14 | } |
488 | 14 | } |
489 | 15 | } |
490 | 15 | if (rowset == nullptr) { |
491 | 1 | return Status::Error<TRANSACTION_NOT_EXIST>( |
492 | 1 | "publish txn failed, rowset not found. partition_id={}, transaction_id={}, " |
493 | 1 | "tablet={}", |
494 | 1 | partition_id, transaction_id, tablet_info.to_string()); |
495 | 1 | } |
496 | 14 | DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_before_save_rs_meta", { |
497 | 14 | if (rand() % 100 < (100 * dp->param("percent", 0.5))) { |
498 | 14 | LOG_WARNING("TxnManager.publish_txn.random_failed_before_save_rs_meta") |
499 | 14 | .tag("txn_id", transaction_id) |
500 | 14 | .tag("tablet_id", tablet_id); |
501 | 14 | return Status::InternalError("debug publish txn before save rs meta random failed"); |
502 | 14 | } |
503 | 14 | }); |
504 | 14 | DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_before_save_rs_meta", { |
505 | 14 | if (auto wait = dp->param<int>("duration", 0); wait > 0) { |
506 | 14 | LOG_WARNING("TxnManager.publish_txn.wait_before_save_rs_meta") |
507 | 14 | .tag("txn_id", transaction_id) |
508 | 14 | .tag("tablet_id", tablet_id) |
509 | 14 | .tag("wait ms", wait); |
510 | 14 | std::this_thread::sleep_for(std::chrono::milliseconds(wait)); |
511 | 14 | } |
512 | 14 | }); |
513 | | |
514 | | /// Step 2: make rowset visible |
515 | | // save meta need access disk, it maybe very slow, so that it is not in global txn lock |
516 | | // it is under a single txn lock |
517 | | // TODO(ygl): rowset is already set version here, memory is changed, if save failed |
518 | | // it maybe a fatal error |
519 | 14 | rowset->make_visible(version); |
520 | | |
521 | 14 | DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_after_save_rs_meta", { |
522 | 14 | if (rand() % 100 < (100 * dp->param("percent", 0.5))) { |
523 | 14 | LOG_WARNING("TxnManager.publish_txn.random_failed_after_save_rs_meta") |
524 | 14 | .tag("txn_id", transaction_id) |
525 | 14 | .tag("tablet_id", tablet_id); |
526 | 14 | return Status::InternalError("debug publish txn after save rs meta random failed"); |
527 | 14 | } |
528 | 14 | }); |
529 | 14 | DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_after_save_rs_meta", { |
530 | 14 | if (auto wait = dp->param<int>("duration", 0); wait > 0) { |
531 | 14 | LOG_WARNING("TxnManager.publish_txn.wait_after_save_rs_meta") |
532 | 14 | .tag("txn_id", transaction_id) |
533 | 14 | .tag("tablet_id", tablet_id) |
534 | 14 | .tag("wait ms", wait); |
535 | 14 | std::this_thread::sleep_for(std::chrono::milliseconds(wait)); |
536 | 14 | } |
537 | 14 | }); |
538 | | // update delete_bitmap |
539 | 14 | if (tablet_txn_info->unique_key_merge_on_write) { |
540 | 3 | int64_t t2 = MonotonicMicros(); |
541 | 3 | if (rowset->num_segments() > 1 && |
542 | 3 | !tablet_txn_info->delete_bitmap->has_calculated_for_multi_segments( |
543 | 0 | rowset->rowset_id())) { |
544 | | // delete bitmap is empty, should re-calculate delete bitmaps between segments |
545 | 0 | std::vector<segment_v2::SegmentSharedPtr> segments; |
546 | 0 | RETURN_IF_ERROR(std::static_pointer_cast<BetaRowset>(rowset)->load_segments(&segments)); |
547 | 0 | RETURN_IF_ERROR(tablet->calc_delete_bitmap_between_segments( |
548 | 0 | rowset->rowset_id(), segments, tablet_txn_info->delete_bitmap)); |
549 | 0 | } |
550 | | |
551 | 3 | RETURN_IF_ERROR( |
552 | 3 | Tablet::update_delete_bitmap(tablet, tablet_txn_info.get(), transaction_id)); |
553 | 3 | int64_t t3 = MonotonicMicros(); |
554 | 3 | stats->calc_delete_bitmap_time_us = t3 - t2; |
555 | 3 | RETURN_IF_ERROR(TabletMetaManager::save_delete_bitmap( |
556 | 3 | tablet->data_dir(), tablet->tablet_id(), tablet_txn_info->delete_bitmap, |
557 | 3 | version.second)); |
558 | 3 | stats->save_meta_time_us = MonotonicMicros() - t3; |
559 | 3 | } |
560 | | |
561 | | /// Step 3: add to binlog |
562 | 14 | auto enable_binlog = tablet->is_enable_binlog(); |
563 | 14 | if (enable_binlog) { |
564 | 0 | auto status = rowset->add_to_binlog(); |
565 | 0 | if (!status.ok()) { |
566 | 0 | return Status::Error<ROWSET_ADD_TO_BINLOG_FAILED>( |
567 | 0 | "add rowset to binlog failed. when publish txn rowset_id: {}, tablet id: {}, " |
568 | 0 | "txn id: {}, status: {}", |
569 | 0 | rowset->rowset_id().to_string(), tablet_id, transaction_id, |
570 | 0 | status.to_string_no_stack()); |
571 | 0 | } |
572 | 0 | } |
573 | | |
574 | | /// Step 4: save meta |
575 | 14 | int64_t t5 = MonotonicMicros(); |
576 | 14 | auto status = RowsetMetaManager::save(meta, tablet_uid, rowset->rowset_id(), |
577 | 14 | rowset->rowset_meta()->get_rowset_pb(), enable_binlog); |
578 | 14 | stats->save_meta_time_us += MonotonicMicros() - t5; |
579 | 14 | if (!status.ok()) { |
580 | 0 | status.append(fmt::format(", txn id: {}", transaction_id)); |
581 | 0 | return status; |
582 | 0 | } |
583 | | |
584 | 14 | if (tablet_txn_info->unique_key_merge_on_write && tablet_txn_info->partial_update_info && |
585 | 14 | tablet_txn_info->partial_update_info->is_partial_update()) { |
586 | 0 | status = RowsetMetaManager::remove_partial_update_info(meta, tablet_id, partition_id, |
587 | 0 | transaction_id); |
588 | 0 | if (!status) { |
589 | | // discard the error status and print the warning log |
590 | 0 | LOG_WARNING( |
591 | 0 | "fail to remove partial update info from RocksDB. txn_id={}, rowset_id={}, " |
592 | 0 | "tablet_id={}, tablet_uid={}", |
593 | 0 | transaction_id, rowset->rowset_id().to_string(), tablet_id, |
594 | 0 | tablet_uid.to_string()); |
595 | 0 | } |
596 | 0 | } |
597 | | |
598 | | // TODO(Drogon): remove these test codes |
599 | 14 | if (enable_binlog) { |
600 | 0 | auto version_str = fmt::format("{}", version.first); |
601 | 0 | VLOG_DEBUG << fmt::format("tabletid: {}, version: {}, binlog filepath: {}", tablet_id, |
602 | 0 | version_str, tablet->get_binlog_filepath(version_str)); |
603 | 0 | } |
604 | | |
605 | | /// Step 5: remove tablet_info from tnx_tablet_map |
606 | | // txn_tablet_map[key] empty, remove key from txn_tablet_map |
607 | 14 | int64_t t6 = MonotonicMicros(); |
608 | 14 | std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id)); |
609 | 14 | std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id)); |
610 | 14 | stats->lock_wait_time_us += MonotonicMicros() - t6; |
611 | 14 | _remove_txn_tablet_info_unlocked(partition_id, transaction_id, tablet_id, tablet_uid, txn_lock, |
612 | 14 | wrlock); |
613 | 14 | VLOG_NOTICE << "publish txn successfully." |
614 | 0 | << " partition_id: " << key.first << ", txn_id: " << key.second |
615 | 0 | << ", tablet_id: " << tablet_info.tablet_id << ", rowsetid: " << rowset->rowset_id() |
616 | 0 | << ", version: " << version.first << "," << version.second; |
617 | 14 | return status; |
618 | 14 | } |
619 | | |
620 | | void TxnManager::_remove_txn_tablet_info_unlocked(TPartitionId partition_id, |
621 | | TTransactionId transaction_id, |
622 | | TTabletId tablet_id, TabletUid tablet_uid, |
623 | | std::lock_guard<std::shared_mutex>& txn_lock, |
624 | 14 | std::lock_guard<std::shared_mutex>& wrlock) { |
625 | 14 | std::pair<int64_t, int64_t> key {partition_id, transaction_id}; |
626 | 14 | TabletInfo tablet_info {tablet_id, tablet_uid}; |
627 | 14 | txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
628 | 14 | if (auto it = txn_tablet_map.find(key); it != txn_tablet_map.end()) { |
629 | 14 | it->second.erase(tablet_info); |
630 | 14 | if (it->second.empty()) { |
631 | 14 | txn_tablet_map.erase(it); |
632 | 14 | g_tablet_txn_info_txn_partitions_count << -1; |
633 | 14 | _clear_txn_partition_map_unlocked(transaction_id, partition_id); |
634 | 14 | } |
635 | 14 | } |
636 | 14 | } |
637 | | |
638 | | void TxnManager::remove_txn_tablet_info(TPartitionId partition_id, TTransactionId transaction_id, |
639 | 0 | TTabletId tablet_id, TabletUid tablet_uid) { |
640 | 0 | std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id)); |
641 | 0 | std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id)); |
642 | 0 | _remove_txn_tablet_info_unlocked(partition_id, transaction_id, tablet_id, tablet_uid, txn_lock, |
643 | 0 | wrlock); |
644 | 0 | } |
645 | | |
646 | | // txn could be rollbacked if it does not have related rowset |
647 | | // if the txn has related rowset then could not rollback it, because it |
648 | | // may be committed in another thread and our current thread meets errors when writing to data file |
649 | | // BE has to wait for fe call clear txn api |
650 | | Status TxnManager::rollback_txn(TPartitionId partition_id, TTransactionId transaction_id, |
651 | 7 | TTabletId tablet_id, TabletUid tablet_uid) { |
652 | 7 | pair<int64_t, int64_t> key(partition_id, transaction_id); |
653 | 7 | TabletInfo tablet_info(tablet_id, tablet_uid); |
654 | | |
655 | 7 | std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id)); |
656 | 7 | txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
657 | | |
658 | 7 | auto it = txn_tablet_map.find(key); |
659 | 7 | if (it == txn_tablet_map.end()) { |
660 | 0 | return Status::OK(); |
661 | 0 | } |
662 | | |
663 | 7 | auto& tablet_txn_info_map = it->second; |
664 | 7 | if (auto load_itr = tablet_txn_info_map.find(tablet_info); |
665 | 7 | load_itr != tablet_txn_info_map.end()) { |
666 | | // found load for txn,tablet |
667 | | // case 1: user commit rowset, then the load id must be equal |
668 | 7 | const auto& load_info = load_itr->second; |
669 | 7 | if (load_info->rowset != nullptr) { |
670 | 1 | return Status::Error<TRANSACTION_ALREADY_COMMITTED>( |
671 | 1 | "if rowset is not null, it means other thread may commit the rowset should " |
672 | 1 | "not delete txn any more"); |
673 | 1 | } |
674 | 7 | } |
675 | | |
676 | 6 | tablet_txn_info_map.erase(tablet_info); |
677 | 6 | LOG(INFO) << "rollback transaction from engine successfully." |
678 | 6 | << " partition_id: " << key.first << ", transaction_id: " << key.second |
679 | 6 | << ", tablet: " << tablet_info.to_string(); |
680 | 6 | if (tablet_txn_info_map.empty()) { |
681 | 6 | txn_tablet_map.erase(it); |
682 | 6 | g_tablet_txn_info_txn_partitions_count << -1; |
683 | 6 | _clear_txn_partition_map_unlocked(transaction_id, partition_id); |
684 | 6 | } |
685 | 6 | return Status::OK(); |
686 | 7 | } |
687 | | |
688 | | // fe call this api to clear unused rowsets in be |
689 | | // could not delete the rowset if it already has a valid version |
690 | | Status TxnManager::delete_txn(OlapMeta* meta, TPartitionId partition_id, |
691 | | TTransactionId transaction_id, TTabletId tablet_id, |
692 | 3 | TabletUid tablet_uid) { |
693 | 3 | pair<int64_t, int64_t> key(partition_id, transaction_id); |
694 | 3 | TabletInfo tablet_info(tablet_id, tablet_uid); |
695 | 3 | std::lock_guard<std::shared_mutex> txn_wrlock(_get_txn_map_lock(transaction_id)); |
696 | 3 | txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
697 | 3 | auto it = txn_tablet_map.find(key); |
698 | 3 | if (it == txn_tablet_map.end()) { |
699 | 0 | return Status::Error<TRANSACTION_NOT_EXIST>("key not founded from txn_tablet_map"); |
700 | 0 | } |
701 | 3 | auto load_itr = it->second.find(tablet_info); |
702 | 3 | if (load_itr != it->second.end()) { |
703 | | // found load for txn,tablet |
704 | | // case 1: user commit rowset, then the load id must be equal |
705 | 3 | auto& load_info = load_itr->second; |
706 | 3 | auto& rowset = load_info->rowset; |
707 | 3 | if (rowset != nullptr && meta != nullptr) { |
708 | 2 | if (!rowset->is_pending()) { |
709 | 0 | return Status::Error<TRANSACTION_ALREADY_COMMITTED>( |
710 | 0 | "could not delete transaction from engine, just remove it from memory not " |
711 | 0 | "delete from disk, because related rowset already published. partition_id: " |
712 | 0 | "{}, transaction_id: {}, tablet: {}, rowset id: {}, version: {}, state: {}", |
713 | 0 | key.first, key.second, tablet_info.to_string(), |
714 | 0 | rowset->rowset_id().to_string(), rowset->version().to_string(), |
715 | 0 | RowsetStatePB_Name(rowset->rowset_meta_state())); |
716 | 2 | } else { |
717 | 2 | static_cast<void>(RowsetMetaManager::remove(meta, tablet_uid, rowset->rowset_id())); |
718 | | #ifndef BE_TEST |
719 | | _engine.add_unused_rowset(rowset); |
720 | | #endif |
721 | 2 | VLOG_NOTICE << "delete transaction from engine successfully." |
722 | 0 | << " partition_id: " << key.first << ", transaction_id: " << key.second |
723 | 0 | << ", tablet: " << tablet_info.to_string() << ", rowset: " |
724 | 0 | << (rowset != nullptr ? rowset->rowset_id().to_string() : "0"); |
725 | 2 | } |
726 | 2 | } |
727 | 3 | it->second.erase(load_itr); |
728 | 3 | } |
729 | 3 | if (it->second.empty()) { |
730 | 3 | txn_tablet_map.erase(it); |
731 | 3 | g_tablet_txn_info_txn_partitions_count << -1; |
732 | 3 | _clear_txn_partition_map_unlocked(transaction_id, partition_id); |
733 | 3 | } |
734 | 3 | return Status::OK(); |
735 | 3 | } |
736 | | |
737 | | void TxnManager::get_tablet_related_txns(TTabletId tablet_id, TabletUid tablet_uid, |
738 | | int64_t* partition_id, |
739 | 4 | std::set<int64_t>* transaction_ids) { |
740 | 4 | if (partition_id == nullptr || transaction_ids == nullptr) { |
741 | 0 | LOG(WARNING) << "parameter is null when get transactions by tablet"; |
742 | 0 | return; |
743 | 0 | } |
744 | | |
745 | 4 | TabletInfo tablet_info(tablet_id, tablet_uid); |
746 | 8 | for (int32_t i = 0; i < _txn_map_shard_size; i++) { |
747 | 4 | std::shared_lock txn_rdlock(_txn_map_locks[i]); |
748 | 4 | txn_tablet_map_t& txn_tablet_map = _txn_tablet_maps[i]; |
749 | 4 | for (auto& it : txn_tablet_map) { |
750 | 0 | if (it.second.find(tablet_info) != it.second.end()) { |
751 | 0 | *partition_id = it.first.first; |
752 | 0 | transaction_ids->insert(it.first.second); |
753 | 0 | VLOG_NOTICE << "find transaction on tablet." |
754 | 0 | << "partition_id: " << it.first.first |
755 | 0 | << ", transaction_id: " << it.first.second |
756 | 0 | << ", tablet: " << tablet_info.to_string(); |
757 | 0 | } |
758 | 0 | } |
759 | 4 | } |
760 | 4 | } |
761 | | |
762 | | // force drop all txns related with the tablet |
763 | | // maybe lock error, because not get txn lock before remove from meta |
764 | | void TxnManager::force_rollback_tablet_related_txns(OlapMeta* meta, TTabletId tablet_id, |
765 | 0 | TabletUid tablet_uid) { |
766 | 0 | TabletInfo tablet_info(tablet_id, tablet_uid); |
767 | 0 | for (int32_t i = 0; i < _txn_map_shard_size; i++) { |
768 | 0 | std::lock_guard<std::shared_mutex> txn_wrlock(_txn_map_locks[i]); |
769 | 0 | txn_tablet_map_t& txn_tablet_map = _txn_tablet_maps[i]; |
770 | 0 | for (auto it = txn_tablet_map.begin(); it != txn_tablet_map.end();) { |
771 | 0 | auto load_itr = it->second.find(tablet_info); |
772 | 0 | if (load_itr != it->second.end()) { |
773 | 0 | auto& load_info = load_itr->second; |
774 | 0 | auto& rowset = load_info->rowset; |
775 | 0 | if (rowset != nullptr && meta != nullptr) { |
776 | 0 | LOG(INFO) << " delete transaction from engine " |
777 | 0 | << ", tablet: " << tablet_info.to_string() |
778 | 0 | << ", rowset id: " << rowset->rowset_id(); |
779 | 0 | static_cast<void>( |
780 | 0 | RowsetMetaManager::remove(meta, tablet_uid, rowset->rowset_id())); |
781 | 0 | } |
782 | 0 | LOG(INFO) << "remove tablet related txn." |
783 | 0 | << " partition_id: " << it->first.first |
784 | 0 | << ", transaction_id: " << it->first.second |
785 | 0 | << ", tablet: " << tablet_info.to_string() << ", rowset: " |
786 | 0 | << (rowset != nullptr ? rowset->rowset_id().to_string() : "0"); |
787 | 0 | it->second.erase(load_itr); |
788 | 0 | } |
789 | 0 | if (it->second.empty()) { |
790 | 0 | _clear_txn_partition_map_unlocked(it->first.second, it->first.first); |
791 | 0 | it = txn_tablet_map.erase(it); |
792 | 0 | g_tablet_txn_info_txn_partitions_count << -1; |
793 | 0 | } else { |
794 | 0 | ++it; |
795 | 0 | } |
796 | 0 | } |
797 | 0 | } |
798 | 0 | if (meta != nullptr) { |
799 | 0 | Status st = RowsetMetaManager::remove_tablet_related_partial_update_info(meta, tablet_id); |
800 | 0 | if (!st.ok()) { |
801 | 0 | LOG_WARNING("failed to partial update info, tablet_id={}, err={}", tablet_id, |
802 | 0 | st.to_string()); |
803 | 0 | } |
804 | 0 | } |
805 | 0 | } |
806 | | |
807 | | void TxnManager::get_txn_related_tablets(const TTransactionId transaction_id, |
808 | | TPartitionId partition_id, |
809 | 35 | std::map<TabletInfo, RowsetSharedPtr>* tablet_infos) { |
810 | | // get tablets in this transaction |
811 | 35 | pair<int64_t, int64_t> key(partition_id, transaction_id); |
812 | 35 | std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id)); |
813 | 35 | txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
814 | 35 | auto it = txn_tablet_map.find(key); |
815 | 35 | if (it == txn_tablet_map.end()) { |
816 | 2 | VLOG_NOTICE << "could not find tablet for" |
817 | 0 | << " partition_id=" << partition_id << ", transaction_id=" << transaction_id; |
818 | 2 | return; |
819 | 2 | } |
820 | 33 | auto& load_info_map = it->second; |
821 | | |
822 | | // each tablet |
823 | 39 | for (auto& load_info : load_info_map) { |
824 | 39 | const TabletInfo& tablet_info = load_info.first; |
825 | | // must not check rowset == null here, because if rowset == null |
826 | | // publish version should failed |
827 | 39 | tablet_infos->emplace(tablet_info, load_info.second->rowset); |
828 | 39 | } |
829 | 33 | } |
830 | | |
831 | 0 | void TxnManager::get_all_related_tablets(std::set<TabletInfo>* tablet_infos) { |
832 | 0 | for (int32_t i = 0; i < _txn_map_shard_size; i++) { |
833 | 0 | std::shared_lock txn_rdlock(_txn_map_locks[i]); |
834 | 0 | for (auto& it : _txn_tablet_maps[i]) { |
835 | 0 | for (auto& tablet_load_it : it.second) { |
836 | 0 | tablet_infos->emplace(tablet_load_it.first); |
837 | 0 | } |
838 | 0 | } |
839 | 0 | } |
840 | 0 | } |
841 | | |
842 | | void TxnManager::get_all_commit_tablet_txn_info_by_tablet( |
843 | 1 | const Tablet& tablet, CommitTabletTxnInfoVec* commit_tablet_txn_info_vec) { |
844 | 2 | for (int32_t i = 0; i < _txn_map_shard_size; i++) { |
845 | 1 | std::shared_lock txn_rdlock(_txn_map_locks[i]); |
846 | 2 | for (const auto& [txn_key, load_info_map] : _txn_tablet_maps[i]) { |
847 | 2 | auto tablet_load_it = load_info_map.find(tablet.get_tablet_info()); |
848 | 2 | if (tablet_load_it != load_info_map.end()) { |
849 | 2 | const auto& [_, load_info] = *tablet_load_it; |
850 | 2 | const auto& rowset = load_info->rowset; |
851 | 2 | const auto& delete_bitmap = load_info->delete_bitmap; |
852 | 2 | if (!rowset || !delete_bitmap) { |
853 | 1 | continue; |
854 | 1 | } |
855 | 1 | commit_tablet_txn_info_vec->push_back({ |
856 | 1 | .transaction_id = txn_key.second, |
857 | 1 | .partition_id = txn_key.first, |
858 | 1 | .delete_bitmap = delete_bitmap, |
859 | 1 | .rowset_ids = load_info->rowset_ids, |
860 | 1 | .partial_update_info = load_info->partial_update_info, |
861 | 1 | }); |
862 | 1 | } |
863 | 2 | } |
864 | 1 | } |
865 | 1 | } |
866 | | |
867 | 0 | void TxnManager::build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>* expire_txn_map) { |
868 | 0 | int64_t now = UnixSeconds(); |
869 | | // traverse the txn map, and get all expired txns |
870 | 0 | for (int32_t i = 0; i < _txn_map_shard_size; i++) { |
871 | 0 | std::shared_lock txn_rdlock(_txn_map_locks[i]); |
872 | 0 | for (auto&& [txn_key, tablet_txn_infos] : _txn_tablet_maps[i]) { |
873 | 0 | auto txn_id = txn_key.second; |
874 | 0 | for (auto&& [tablet_info, txn_info] : tablet_txn_infos) { |
875 | 0 | double diff = difftime(now, txn_info->creation_time); |
876 | 0 | if (diff < config::pending_data_expire_time_sec) { |
877 | 0 | continue; |
878 | 0 | } |
879 | | |
880 | 0 | (*expire_txn_map)[tablet_info].push_back(txn_id); |
881 | 0 | if (VLOG_IS_ON(3)) { |
882 | 0 | VLOG_NOTICE << "find expired txn." |
883 | 0 | << " tablet=" << tablet_info.to_string() |
884 | 0 | << " transaction_id=" << txn_id << " exist_sec=" << diff; |
885 | 0 | } |
886 | 0 | } |
887 | 0 | } |
888 | 0 | } |
889 | 0 | } |
890 | | |
891 | | void TxnManager::get_partition_ids(const TTransactionId transaction_id, |
892 | 0 | std::vector<TPartitionId>* partition_ids) { |
893 | 0 | std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id)); |
894 | 0 | txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id); |
895 | 0 | auto it = txn_partition_map.find(transaction_id); |
896 | 0 | if (it != txn_partition_map.end()) { |
897 | 0 | for (int64_t partition_id : it->second) { |
898 | 0 | partition_ids->push_back(partition_id); |
899 | 0 | } |
900 | 0 | } |
901 | 0 | } |
902 | | |
903 | 67 | void TxnManager::_insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id) { |
904 | 67 | txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id); |
905 | 67 | auto find = txn_partition_map.find(transaction_id); |
906 | 67 | if (find == txn_partition_map.end()) { |
907 | 40 | txn_partition_map[transaction_id] = std::unordered_set<int64_t>(); |
908 | 40 | } |
909 | 67 | txn_partition_map[transaction_id].insert(partition_id); |
910 | 67 | } |
911 | | |
912 | 23 | void TxnManager::_clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id) { |
913 | 23 | txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id); |
914 | 23 | auto it = txn_partition_map.find(transaction_id); |
915 | 23 | if (it != txn_partition_map.end()) { |
916 | 23 | it->second.erase(partition_id); |
917 | 23 | if (it->second.empty()) { |
918 | 23 | txn_partition_map.erase(it); |
919 | 23 | } |
920 | 23 | } |
921 | 23 | } |
922 | | |
923 | | void TxnManager::add_txn_tablet_delta_writer(int64_t transaction_id, int64_t tablet_id, |
924 | 0 | DeltaWriter* delta_writer) { |
925 | 0 | std::lock_guard<std::shared_mutex> txn_wrlock( |
926 | 0 | _get_txn_tablet_delta_writer_map_lock(transaction_id)); |
927 | 0 | txn_tablet_delta_writer_map_t& txn_tablet_delta_writer_map = |
928 | 0 | _get_txn_tablet_delta_writer_map(transaction_id); |
929 | 0 | auto find = txn_tablet_delta_writer_map.find(transaction_id); |
930 | 0 | if (find == txn_tablet_delta_writer_map.end()) { |
931 | 0 | txn_tablet_delta_writer_map[transaction_id] = std::map<int64_t, DeltaWriter*>(); |
932 | 0 | } |
933 | 0 | txn_tablet_delta_writer_map[transaction_id][tablet_id] = delta_writer; |
934 | 0 | } |
935 | | |
936 | | void TxnManager::finish_slave_tablet_pull_rowset(int64_t transaction_id, int64_t tablet_id, |
937 | 0 | int64_t node_id, bool is_succeed) { |
938 | 0 | std::lock_guard<std::shared_mutex> txn_wrlock( |
939 | 0 | _get_txn_tablet_delta_writer_map_lock(transaction_id)); |
940 | 0 | txn_tablet_delta_writer_map_t& txn_tablet_delta_writer_map = |
941 | 0 | _get_txn_tablet_delta_writer_map(transaction_id); |
942 | 0 | auto find_txn = txn_tablet_delta_writer_map.find(transaction_id); |
943 | 0 | if (find_txn == txn_tablet_delta_writer_map.end()) { |
944 | 0 | LOG(WARNING) << "delta writer manager is not exist, txn_id=" << transaction_id |
945 | 0 | << ", tablet_id=" << tablet_id; |
946 | 0 | return; |
947 | 0 | } |
948 | 0 | auto find_tablet = txn_tablet_delta_writer_map[transaction_id].find(tablet_id); |
949 | 0 | if (find_tablet == txn_tablet_delta_writer_map[transaction_id].end()) { |
950 | 0 | LOG(WARNING) << "delta writer is not exist, txn_id=" << transaction_id |
951 | 0 | << ", tablet_id=" << tablet_id; |
952 | 0 | return; |
953 | 0 | } |
954 | 0 | DeltaWriter* delta_writer = txn_tablet_delta_writer_map[transaction_id][tablet_id]; |
955 | 0 | delta_writer->finish_slave_tablet_pull_rowset(node_id, is_succeed); |
956 | 0 | } |
957 | | |
958 | 0 | void TxnManager::clear_txn_tablet_delta_writer(int64_t transaction_id) { |
959 | 0 | std::lock_guard<std::shared_mutex> txn_wrlock( |
960 | 0 | _get_txn_tablet_delta_writer_map_lock(transaction_id)); |
961 | 0 | txn_tablet_delta_writer_map_t& txn_tablet_delta_writer_map = |
962 | 0 | _get_txn_tablet_delta_writer_map(transaction_id); |
963 | 0 | auto it = txn_tablet_delta_writer_map.find(transaction_id); |
964 | 0 | if (it != txn_tablet_delta_writer_map.end()) { |
965 | 0 | txn_tablet_delta_writer_map.erase(it); |
966 | 0 | } |
967 | 0 | VLOG_CRITICAL << "remove delta writer manager, txn_id=" << transaction_id; |
968 | 0 | } |
969 | | |
970 | 6 | int64_t TxnManager::get_txn_by_tablet_version(int64_t tablet_id, int64_t version) { |
971 | 6 | char key[16]; |
972 | 6 | memcpy(key, &tablet_id, sizeof(int64_t)); |
973 | 6 | memcpy(key + sizeof(int64_t), &version, sizeof(int64_t)); |
974 | 6 | CacheKey cache_key((const char*)&key, sizeof(key)); |
975 | | |
976 | 6 | auto* handle = _tablet_version_cache->lookup(cache_key); |
977 | 6 | if (handle == nullptr) { |
978 | 1 | return -1; |
979 | 1 | } |
980 | 5 | int64_t res = ((CacheValue*)_tablet_version_cache->value(handle))->value; |
981 | 5 | _tablet_version_cache->release(handle); |
982 | 5 | return res; |
983 | 6 | } |
984 | | |
985 | 4 | void TxnManager::update_tablet_version_txn(int64_t tablet_id, int64_t version, int64_t txn_id) { |
986 | 4 | char key[16]; |
987 | 4 | memcpy(key, &tablet_id, sizeof(int64_t)); |
988 | 4 | memcpy(key + sizeof(int64_t), &version, sizeof(int64_t)); |
989 | 4 | CacheKey cache_key((const char*)&key, sizeof(key)); |
990 | | |
991 | 4 | auto* value = new CacheValue; |
992 | 4 | value->value = txn_id; |
993 | 4 | auto* handle = _tablet_version_cache->insert(cache_key, value, 1, sizeof(txn_id), |
994 | 4 | CachePriority::NORMAL); |
995 | 4 | _tablet_version_cache->release(handle); |
996 | 4 | } |
997 | | |
998 | | TxnState TxnManager::get_txn_state(TPartitionId partition_id, TTransactionId transaction_id, |
999 | 0 | TTabletId tablet_id, TabletUid tablet_uid) { |
1000 | 0 | pair<int64_t, int64_t> key(partition_id, transaction_id); |
1001 | 0 | TabletInfo tablet_info(tablet_id, tablet_uid); |
1002 | |
|
1003 | 0 | std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id)); |
1004 | |
|
1005 | 0 | auto& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
1006 | 0 | auto it = txn_tablet_map.find(key); |
1007 | 0 | if (it == txn_tablet_map.end()) { |
1008 | 0 | return TxnState::NOT_FOUND; |
1009 | 0 | } |
1010 | | |
1011 | 0 | auto& tablet_txn_info_map = it->second; |
1012 | 0 | auto tablet_txn_info_iter = tablet_txn_info_map.find(tablet_info); |
1013 | 0 | if (tablet_txn_info_iter == tablet_txn_info_map.end()) { |
1014 | 0 | return TxnState::NOT_FOUND; |
1015 | 0 | } |
1016 | | |
1017 | 0 | const auto& txn_info = tablet_txn_info_iter->second; |
1018 | 0 | return txn_info->state; |
1019 | 0 | } |
1020 | | |
1021 | | } // namespace doris |