/root/doris/be/src/olap/txn_manager.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <butil/macros.h> |
21 | | #include <gen_cpp/Types_types.h> |
22 | | #include <gen_cpp/types.pb.h> |
23 | | #include <stddef.h> |
24 | | #include <stdint.h> |
25 | | |
26 | | #include <boost/container/detail/std_fwd.hpp> |
27 | | #include <map> |
28 | | #include <memory> |
29 | | #include <mutex> |
30 | | #include <set> |
31 | | #include <shared_mutex> |
32 | | #include <unordered_map> |
33 | | #include <unordered_set> |
34 | | #include <utility> |
35 | | #include <vector> |
36 | | |
37 | | #include "common/status.h" |
38 | | #include "olap/olap_common.h" |
39 | | #include "olap/rowset/pending_rowset_helper.h" |
40 | | #include "olap/rowset/rowset.h" |
41 | | #include "olap/rowset/rowset_meta.h" |
42 | | #include "olap/rowset/segment_v2/segment.h" |
43 | | #include "olap/rowset/segment_v2/segment_writer.h" |
44 | | #include "olap/tablet.h" |
45 | | #include "olap/tablet_meta.h" |
46 | | #include "runtime/memory/lru_cache_policy.h" |
47 | | #include "util/time.h" |
48 | | #include "vec/core/block.h" |
49 | | |
50 | | namespace doris { |
51 | | class DeltaWriter; |
52 | | class OlapMeta; |
53 | | struct TabletPublishStatistics; |
54 | | struct PartialUpdateInfo; |
55 | | |
56 | | enum class TxnState { |
57 | | NOT_FOUND = 0, |
58 | | PREPARED = 1, |
59 | | COMMITTED = 2, |
60 | | ROLLEDBACK = 3, |
61 | | ABORTED = 4, |
62 | | DELETED = 5, |
63 | | }; |
64 | | |
65 | | struct TabletTxnInfo { |
66 | | PUniqueId load_id; |
67 | | RowsetSharedPtr rowset; |
68 | | PendingRowsetGuard pending_rs_guard; |
69 | | bool unique_key_merge_on_write {false}; |
70 | | DeleteBitmapPtr delete_bitmap; |
71 | | // records rowsets calc in commit txn |
72 | | RowsetIdUnorderedSet rowset_ids; |
73 | | int64_t creation_time; |
74 | | bool ingest {false}; |
75 | | std::shared_ptr<PartialUpdateInfo> partial_update_info; |
76 | | TxnState state {TxnState::PREPARED}; |
77 | | |
78 | | TabletTxnInfo() = default; |
79 | | |
80 | | TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset) |
81 | 29 | : load_id(load_id), rowset(rowset), creation_time(UnixSeconds()) {} |
82 | | |
83 | | TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool ingest_arg) |
84 | 31 | : load_id(load_id), rowset(rowset), creation_time(UnixSeconds()), ingest(ingest_arg) {} |
85 | | |
86 | | TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool merge_on_write, |
87 | | DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& ids) |
88 | | : load_id(load_id), |
89 | | rowset(rowset), |
90 | | unique_key_merge_on_write(merge_on_write), |
91 | | delete_bitmap(delete_bitmap), |
92 | | rowset_ids(ids), |
93 | 0 | creation_time(UnixSeconds()) {} |
94 | | |
95 | 31 | void prepare() { state = TxnState::PREPARED; } |
96 | 29 | void commit() { state = TxnState::COMMITTED; } |
97 | 0 | void rollback() { state = TxnState::ROLLEDBACK; } |
98 | 0 | void abort() { |
99 | 0 | if (state == TxnState::PREPARED) { |
100 | 0 | state = TxnState::ABORTED; |
101 | 0 | } |
102 | 0 | } |
103 | | }; |
104 | | |
105 | | struct CommitTabletTxnInfo { |
106 | | TTransactionId transaction_id {0}; |
107 | | TPartitionId partition_id {0}; |
108 | | DeleteBitmapPtr delete_bitmap; |
109 | | RowsetIdUnorderedSet rowset_ids; |
110 | | std::shared_ptr<PartialUpdateInfo> partial_update_info; |
111 | | }; |
112 | | |
113 | | using CommitTabletTxnInfoVec = std::vector<CommitTabletTxnInfo>; |
114 | | |
115 | | // txn manager is used to manage mapping between tablet and txns |
116 | | class TxnManager { |
117 | | public: |
118 | | TxnManager(StorageEngine& engine, int32_t txn_map_shard_size, int32_t txn_shard_size); |
119 | | |
120 | 151 | ~TxnManager() { |
121 | 151 | delete[] _txn_tablet_maps; |
122 | 151 | delete[] _txn_partition_maps; |
123 | 151 | delete[] _txn_map_locks; |
124 | 151 | delete[] _txn_mutex; |
125 | 151 | delete[] _txn_tablet_delta_writer_map; |
126 | 151 | delete[] _txn_tablet_delta_writer_map_locks; |
127 | 151 | } |
128 | | |
129 | | class CacheValue : public LRUCacheValueBase { |
130 | | public: |
131 | | int64_t value; |
132 | | }; |
133 | | |
134 | | // add a txn to manager |
135 | | // partition id is useful in publish version stage because version is associated with partition |
136 | | Status prepare_txn(TPartitionId partition_id, const Tablet& tablet, |
137 | | TTransactionId transaction_id, const PUniqueId& load_id, |
138 | | bool is_ingest = false); |
139 | | // most used for ut |
140 | | Status prepare_txn(TPartitionId partition_id, TTransactionId transaction_id, |
141 | | TTabletId tablet_id, TabletUid tablet_uid, const PUniqueId& load_id, |
142 | | bool is_ingest = false); |
143 | | |
144 | | Status commit_txn(TPartitionId partition_id, const Tablet& tablet, |
145 | | TTransactionId transaction_id, const PUniqueId& load_id, |
146 | | const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard, bool is_recovery, |
147 | | std::shared_ptr<PartialUpdateInfo> partial_update_info = nullptr); |
148 | | |
149 | | Status publish_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, |
150 | | TTransactionId transaction_id, const Version& version, |
151 | | TabletPublishStatistics* stats, |
152 | | std::shared_ptr<TabletTxnInfo>& extend_tablet_txn_info); |
153 | | |
154 | | // delete the txn from manager if it is not committed(not have a valid rowset) |
155 | | Status rollback_txn(TPartitionId partition_id, const Tablet& tablet, |
156 | | TTransactionId transaction_id); |
157 | | |
158 | | Status delete_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, |
159 | | TTransactionId transaction_id); |
160 | | |
161 | | Status commit_txn(OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id, |
162 | | TTabletId tablet_id, TabletUid tablet_uid, const PUniqueId& load_id, |
163 | | const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard, bool is_recovery, |
164 | | std::shared_ptr<PartialUpdateInfo> partial_update_info = nullptr); |
165 | | |
166 | | // remove a txn from txn manager |
167 | | // not persist rowset meta because |
168 | | Status publish_txn(OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id, |
169 | | TTabletId tablet_id, TabletUid tablet_uid, const Version& version, |
170 | | TabletPublishStatistics* stats, |
171 | | std::shared_ptr<TabletTxnInfo>& extend_tablet_txn_info); |
172 | | |
173 | | // only abort not committed txn |
174 | | void abort_txn(TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, |
175 | | TabletUid tablet_uid); |
176 | | |
177 | | // delete the txn from manager if it is not committed(not have a valid rowset) |
178 | | Status rollback_txn(TPartitionId partition_id, TTransactionId transaction_id, |
179 | | TTabletId tablet_id, TabletUid tablet_uid); |
180 | | |
181 | | // remove the txn from txn manager |
182 | | // delete the related rowset if it is not null |
183 | | // delete rowset related data if it is not null |
184 | | Status delete_txn(OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id, |
185 | | TTabletId tablet_id, TabletUid tablet_uid); |
186 | | |
187 | | void get_tablet_related_txns(TTabletId tablet_id, TabletUid tablet_uid, int64_t* partition_id, |
188 | | std::set<int64_t>* transaction_ids); |
189 | | |
190 | | void get_txn_related_tablets(const TTransactionId transaction_id, TPartitionId partition_ids, |
191 | | std::map<TabletInfo, RowsetSharedPtr>* tablet_infos); |
192 | | |
193 | | void get_all_related_tablets(std::set<TabletInfo>* tablet_infos); |
194 | | |
195 | | // Get all expired txns and save them in expire_txn_map. |
196 | | // This is currently called before reporting all tablet info, to avoid iterating txn map for every tablets. |
197 | | void build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>* expire_txn_map); |
198 | | |
199 | | void force_rollback_tablet_related_txns(OlapMeta* meta, TTabletId tablet_id, |
200 | | TabletUid tablet_uid); |
201 | | |
202 | | void get_partition_ids(const TTransactionId transaction_id, |
203 | | std::vector<TPartitionId>* partition_ids); |
204 | | |
205 | | void add_txn_tablet_delta_writer(int64_t transaction_id, int64_t tablet_id, |
206 | | DeltaWriter* delta_writer); |
207 | | void clear_txn_tablet_delta_writer(int64_t transaction_id); |
208 | | void finish_slave_tablet_pull_rowset(int64_t transaction_id, int64_t tablet_id, int64_t node_id, |
209 | | bool is_succeed); |
210 | | |
211 | | void set_txn_related_delete_bitmap(TPartitionId partition_id, TTransactionId transaction_id, |
212 | | TTabletId tablet_id, TabletUid tablet_uid, |
213 | | bool unique_key_merge_on_write, |
214 | | DeleteBitmapPtr delete_bitmap, |
215 | | const RowsetIdUnorderedSet& rowset_ids, |
216 | | std::shared_ptr<PartialUpdateInfo> partial_update_info); |
217 | | void get_all_commit_tablet_txn_info_by_tablet( |
218 | | const TabletSharedPtr& tablet, CommitTabletTxnInfoVec* commit_tablet_txn_info_vec); |
219 | | |
220 | | int64_t get_txn_by_tablet_version(int64_t tablet_id, int64_t version); |
221 | | void update_tablet_version_txn(int64_t tablet_id, int64_t version, int64_t txn_id); |
222 | | |
223 | | TxnState get_txn_state(TPartitionId partition_id, TTransactionId transaction_id, |
224 | | TTabletId tablet_id, TabletUid tablet_uid); |
225 | | |
226 | | void remove_txn_tablet_info(TPartitionId partition_id, TTransactionId transaction_id, |
227 | | TTabletId tablet_id, TabletUid tablet_uid); |
228 | | |
229 | | private: |
230 | | using TxnKey = std::pair<int64_t, int64_t>; // partition_id, transaction_id; |
231 | | |
232 | | // Implement TxnKey hash function to support TxnKey as a key for `unordered_map`. |
233 | | struct TxnKeyHash { |
234 | | template <typename T, typename U> |
235 | 221 | size_t operator()(const std::pair<T, U>& e) const { |
236 | 221 | return std::hash<T>()(e.first) ^ std::hash<U>()(e.second); |
237 | 221 | } |
238 | | }; |
239 | | |
240 | | // Implement TxnKey equal function to support TxnKey as a key for `unordered_map`. |
241 | | struct TxnKeyEqual { |
242 | | template <class T, typename U> |
243 | 126 | bool operator()(const std::pair<T, U>& l, const std::pair<T, U>& r) const { |
244 | 126 | return l.first == r.first && l.second == r.second; |
245 | 126 | } |
246 | | }; |
247 | | |
248 | | using txn_tablet_map_t = |
249 | | std::unordered_map<TxnKey, std::map<TabletInfo, std::shared_ptr<TabletTxnInfo>>, |
250 | | TxnKeyHash, TxnKeyEqual>; |
251 | | using txn_partition_map_t = std::unordered_map<int64_t, std::unordered_set<int64_t>>; |
252 | | using txn_tablet_delta_writer_map_t = |
253 | | std::unordered_map<int64_t, std::map<int64_t, DeltaWriter*>>; |
254 | | |
255 | | std::shared_mutex& _get_txn_map_lock(TTransactionId transactionId); |
256 | | |
257 | | txn_tablet_map_t& _get_txn_tablet_map(TTransactionId transactionId); |
258 | | |
259 | | txn_partition_map_t& _get_txn_partition_map(TTransactionId transactionId); |
260 | | |
261 | | inline std::shared_mutex& _get_txn_lock(TTransactionId transactionId); |
262 | | |
263 | | std::shared_mutex& _get_txn_tablet_delta_writer_map_lock(TTransactionId transactionId); |
264 | | |
265 | | txn_tablet_delta_writer_map_t& _get_txn_tablet_delta_writer_map(TTransactionId transactionId); |
266 | | |
267 | | // Insert or remove (transaction_id, partition_id) from _txn_partition_map |
268 | | // get _txn_map_lock before calling. |
269 | | void _insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id); |
270 | | void _clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id); |
271 | | |
272 | | void _remove_txn_tablet_info_unlocked(TPartitionId partition_id, TTransactionId transaction_id, |
273 | | TTabletId tablet_id, TabletUid tablet_uid, |
274 | | std::lock_guard<std::shared_mutex>& txn_lock, |
275 | | std::lock_guard<std::shared_mutex>& wrlock); |
276 | | |
277 | | class TabletVersionCache : public LRUCachePolicyTrackingManual { |
278 | | public: |
279 | | TabletVersionCache(size_t capacity) |
280 | | : LRUCachePolicyTrackingManual(CachePolicy::CacheType::TABLET_VERSION_CACHE, |
281 | | capacity, LRUCacheType::NUMBER, -1, |
282 | | DEFAULT_LRU_CACHE_NUM_SHARDS, |
283 | 153 | DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY, false) {} |
284 | | }; |
285 | | |
286 | | private: |
287 | | StorageEngine& _engine; |
288 | | |
289 | | const int32_t _txn_map_shard_size; |
290 | | |
291 | | const int32_t _txn_shard_size; |
292 | | |
293 | | // _txn_map_locks[i] protect _txn_tablet_maps[i], i=0,1,2...,and i < _txn_map_shard_size |
294 | | txn_tablet_map_t* _txn_tablet_maps = nullptr; |
295 | | // transaction_id -> corresponding partition ids |
296 | | // This is mainly for the clear txn task received from FE, which may only has transaction id, |
297 | | // so we need this map to find out which partitions are corresponding to a transaction id. |
298 | | // The _txn_partition_maps[i] should be constructed/deconstructed/modified alongside with '_txn_tablet_maps[i]' |
299 | | txn_partition_map_t* _txn_partition_maps = nullptr; |
300 | | |
301 | | std::shared_mutex* _txn_map_locks = nullptr; |
302 | | |
303 | | std::shared_mutex* _txn_mutex = nullptr; |
304 | | |
305 | | txn_tablet_delta_writer_map_t* _txn_tablet_delta_writer_map = nullptr; |
306 | | std::unique_ptr<TabletVersionCache> _tablet_version_cache; |
307 | | std::shared_mutex* _txn_tablet_delta_writer_map_locks = nullptr; |
308 | | DISALLOW_COPY_AND_ASSIGN(TxnManager); |
309 | | }; // TxnManager |
310 | | |
311 | 161 | inline std::shared_mutex& TxnManager::_get_txn_map_lock(TTransactionId transactionId) { |
312 | 161 | return _txn_map_locks[transactionId & (_txn_map_shard_size - 1)]; |
313 | 161 | } |
314 | | |
315 | 159 | inline TxnManager::txn_tablet_map_t& TxnManager::_get_txn_tablet_map(TTransactionId transactionId) { |
316 | 159 | return _txn_tablet_maps[transactionId & (_txn_map_shard_size - 1)]; |
317 | 159 | } |
318 | | |
319 | | inline TxnManager::txn_partition_map_t& TxnManager::_get_txn_partition_map( |
320 | 113 | TTransactionId transactionId) { |
321 | 113 | return _txn_partition_maps[transactionId & (_txn_map_shard_size - 1)]; |
322 | 113 | } |
323 | | |
324 | 56 | inline std::shared_mutex& TxnManager::_get_txn_lock(TTransactionId transactionId) { |
325 | 56 | return _txn_mutex[transactionId & (_txn_shard_size - 1)]; |
326 | 56 | } |
327 | | |
328 | | inline std::shared_mutex& TxnManager::_get_txn_tablet_delta_writer_map_lock( |
329 | 0 | TTransactionId transactionId) { |
330 | 0 | return _txn_tablet_delta_writer_map_locks[transactionId & (_txn_map_shard_size - 1)]; |
331 | 0 | } |
332 | | |
333 | | inline TxnManager::txn_tablet_delta_writer_map_t& TxnManager::_get_txn_tablet_delta_writer_map( |
334 | 0 | TTransactionId transactionId) { |
335 | 0 | return _txn_tablet_delta_writer_map[transactionId & (_txn_map_shard_size - 1)]; |
336 | 0 | } |
337 | | |
338 | | } // namespace doris |