/root/doris/be/src/olap/txn_manager.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <butil/macros.h> |
21 | | #include <gen_cpp/Types_types.h> |
22 | | #include <gen_cpp/types.pb.h> |
23 | | #include <stddef.h> |
24 | | #include <stdint.h> |
25 | | |
26 | | #include <boost/container/detail/std_fwd.hpp> |
27 | | #include <map> |
28 | | #include <memory> |
29 | | #include <mutex> |
30 | | #include <set> |
31 | | #include <shared_mutex> |
32 | | #include <unordered_map> |
33 | | #include <unordered_set> |
34 | | #include <utility> |
35 | | #include <vector> |
36 | | |
37 | | #include "common/status.h" |
38 | | #include "olap/olap_common.h" |
39 | | #include "olap/rowset/pending_rowset_helper.h" |
40 | | #include "olap/rowset/rowset.h" |
41 | | #include "olap/rowset/rowset_meta.h" |
42 | | #include "olap/rowset/segment_v2/segment.h" |
43 | | #include "olap/rowset/segment_v2/segment_writer.h" |
44 | | #include "olap/tablet.h" |
45 | | #include "olap/tablet_meta.h" |
46 | | #include "runtime/memory/lru_cache_policy.h" |
47 | | #include "util/time.h" |
48 | | #include "vec/core/block.h" |
49 | | |
50 | | namespace doris { |
51 | | class DeltaWriter; |
52 | | class OlapMeta; |
53 | | struct TabletPublishStatistics; |
54 | | struct PartialUpdateInfo; |
55 | | |
56 | | enum class TxnState { |
57 | | NOT_FOUND = 0, |
58 | | PREPARED = 1, |
59 | | COMMITTED = 2, |
60 | | ROLLEDBACK = 3, |
61 | | ABORTED = 4, |
62 | | DELETED = 5, |
63 | | }; |
64 | | enum class PublishStatus { INIT = 0, PREPARE = 1, SUCCEED = 2 }; |
65 | | |
66 | | struct TxnPublishInfo { |
67 | | int64_t publish_version {-1}; |
68 | | int64_t base_compaction_cnt {-1}; |
69 | | int64_t cumulative_compaction_cnt {-1}; |
70 | | int64_t cumulative_point {-1}; |
71 | | }; |
72 | | |
73 | | struct TabletTxnInfo { |
74 | | PUniqueId load_id; |
75 | | RowsetSharedPtr rowset; |
76 | | PendingRowsetGuard pending_rs_guard; |
77 | | bool unique_key_merge_on_write {false}; |
78 | | DeleteBitmapPtr delete_bitmap; |
79 | | // records rowsets calc in commit txn |
80 | | RowsetIdUnorderedSet rowset_ids; |
81 | | int64_t creation_time; |
82 | | bool ingest {false}; |
83 | | std::shared_ptr<PartialUpdateInfo> partial_update_info; |
84 | | |
85 | | // for cloud only, used to determine if a retry CloudTabletCalcDeleteBitmapTask |
86 | | // needs to re-calculate the delete bitmap |
87 | | std::shared_ptr<PublishStatus> publish_status; |
88 | | TxnPublishInfo publish_info; |
89 | | |
90 | | TxnState state {TxnState::PREPARED}; |
91 | 0 | TabletTxnInfo() = default; |
92 | | |
93 | | TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset) |
94 | | : load_id(std::move(load_id)), |
95 | | rowset(std::move(rowset)), |
96 | 29 | creation_time(UnixSeconds()) {} |
97 | | |
98 | | TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool ingest_arg) |
99 | | : load_id(std::move(load_id)), |
100 | | rowset(std::move(rowset)), |
101 | | creation_time(UnixSeconds()), |
102 | 32 | ingest(ingest_arg) {} |
103 | | |
104 | | TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool merge_on_write, |
105 | | DeleteBitmapPtr delete_bitmap, RowsetIdUnorderedSet ids) |
106 | | : load_id(std::move(load_id)), |
107 | | rowset(std::move(rowset)), |
108 | | unique_key_merge_on_write(merge_on_write), |
109 | | delete_bitmap(std::move(delete_bitmap)), |
110 | | rowset_ids(std::move(ids)), |
111 | 0 | creation_time(UnixSeconds()) {} |
112 | | |
113 | 32 | void prepare() { state = TxnState::PREPARED; } |
114 | 29 | void commit() { state = TxnState::COMMITTED; } |
115 | 0 | void rollback() { state = TxnState::ROLLEDBACK; } |
116 | 0 | void abort() { |
117 | 0 | if (state == TxnState::PREPARED) { |
118 | 0 | state = TxnState::ABORTED; |
119 | 0 | } |
120 | 0 | } |
121 | | }; |
122 | | |
123 | | struct CommitTabletTxnInfo { |
124 | | TTransactionId transaction_id {0}; |
125 | | TPartitionId partition_id {0}; |
126 | | DeleteBitmapPtr delete_bitmap; |
127 | | RowsetIdUnorderedSet rowset_ids; |
128 | | std::shared_ptr<PartialUpdateInfo> partial_update_info; |
129 | | }; |
130 | | |
131 | | using CommitTabletTxnInfoVec = std::vector<CommitTabletTxnInfo>; |
132 | | |
133 | | // txn manager is used to manage mapping between tablet and txns |
134 | | class TxnManager { |
135 | | public: |
136 | | TxnManager(StorageEngine& engine, int32_t txn_map_shard_size, int32_t txn_shard_size); |
137 | | |
138 | 151 | ~TxnManager() { |
139 | 151 | delete[] _txn_tablet_maps; |
140 | 151 | delete[] _txn_partition_maps; |
141 | 151 | delete[] _txn_map_locks; |
142 | 151 | delete[] _txn_mutex; |
143 | 151 | delete[] _txn_tablet_delta_writer_map; |
144 | 151 | delete[] _txn_tablet_delta_writer_map_locks; |
145 | 151 | } |
146 | | |
147 | | class CacheValue : public LRUCacheValueBase { |
148 | | public: |
149 | | int64_t value; |
150 | | }; |
151 | | |
152 | | // add a txn to manager |
153 | | // partition id is useful in publish version stage because version is associated with partition |
154 | | Status prepare_txn(TPartitionId partition_id, const Tablet& tablet, |
155 | | TTransactionId transaction_id, const PUniqueId& load_id, |
156 | | bool is_ingest = false); |
157 | | // most used for ut |
158 | | Status prepare_txn(TPartitionId partition_id, TTransactionId transaction_id, |
159 | | TTabletId tablet_id, TabletUid tablet_uid, const PUniqueId& load_id, |
160 | | bool is_ingest = false); |
161 | | |
162 | | Status commit_txn(TPartitionId partition_id, const Tablet& tablet, |
163 | | TTransactionId transaction_id, const PUniqueId& load_id, |
164 | | const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard, bool is_recovery, |
165 | | std::shared_ptr<PartialUpdateInfo> partial_update_info = nullptr); |
166 | | |
167 | | Status publish_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, |
168 | | TTransactionId transaction_id, const Version& version, |
169 | | TabletPublishStatistics* stats); |
170 | | |
171 | | // delete the txn from manager if it is not committed(not have a valid rowset) |
172 | | Status rollback_txn(TPartitionId partition_id, const Tablet& tablet, |
173 | | TTransactionId transaction_id); |
174 | | |
175 | | Status delete_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, |
176 | | TTransactionId transaction_id); |
177 | | |
178 | | Status commit_txn(OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id, |
179 | | TTabletId tablet_id, TabletUid tablet_uid, const PUniqueId& load_id, |
180 | | const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard, bool is_recovery, |
181 | | std::shared_ptr<PartialUpdateInfo> partial_update_info = nullptr); |
182 | | |
183 | | // remove a txn from txn manager |
184 | | // not persist rowset meta because |
185 | | Status publish_txn(OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id, |
186 | | TTabletId tablet_id, TabletUid tablet_uid, const Version& version, |
187 | | TabletPublishStatistics* stats); |
188 | | |
189 | | // only abort not committed txn |
190 | | void abort_txn(TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, |
191 | | TabletUid tablet_uid); |
192 | | |
193 | | // delete the txn from manager if it is not committed(not have a valid rowset) |
194 | | Status rollback_txn(TPartitionId partition_id, TTransactionId transaction_id, |
195 | | TTabletId tablet_id, TabletUid tablet_uid); |
196 | | |
197 | | // remove the txn from txn manager |
198 | | // delete the related rowset if it is not null |
199 | | // delete rowset related data if it is not null |
200 | | Status delete_txn(OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id, |
201 | | TTabletId tablet_id, TabletUid tablet_uid); |
202 | | |
203 | | void get_tablet_related_txns(TTabletId tablet_id, TabletUid tablet_uid, int64_t* partition_id, |
204 | | std::set<int64_t>* transaction_ids); |
205 | | |
206 | | void get_txn_related_tablets(const TTransactionId transaction_id, TPartitionId partition_ids, |
207 | | std::map<TabletInfo, RowsetSharedPtr>* tablet_infos); |
208 | | |
209 | | void get_all_related_tablets(std::set<TabletInfo>* tablet_infos); |
210 | | |
211 | | // Get all expired txns and save them in expire_txn_map. |
212 | | // This is currently called before reporting all tablet info, to avoid iterating txn map for every tablets. |
213 | | void build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>* expire_txn_map); |
214 | | |
215 | | void force_rollback_tablet_related_txns(OlapMeta* meta, TTabletId tablet_id, |
216 | | TabletUid tablet_uid); |
217 | | |
218 | | void get_partition_ids(const TTransactionId transaction_id, |
219 | | std::vector<TPartitionId>* partition_ids); |
220 | | |
221 | | void add_txn_tablet_delta_writer(int64_t transaction_id, int64_t tablet_id, |
222 | | DeltaWriter* delta_writer); |
223 | | void clear_txn_tablet_delta_writer(int64_t transaction_id); |
224 | | void finish_slave_tablet_pull_rowset(int64_t transaction_id, int64_t tablet_id, int64_t node_id, |
225 | | bool is_succeed); |
226 | | |
227 | | void set_txn_related_delete_bitmap(TPartitionId partition_id, TTransactionId transaction_id, |
228 | | TTabletId tablet_id, TabletUid tablet_uid, |
229 | | bool unique_key_merge_on_write, |
230 | | DeleteBitmapPtr delete_bitmap, |
231 | | const RowsetIdUnorderedSet& rowset_ids, |
232 | | std::shared_ptr<PartialUpdateInfo> partial_update_info); |
233 | | void get_all_commit_tablet_txn_info_by_tablet( |
234 | | const Tablet& tablet, CommitTabletTxnInfoVec* commit_tablet_txn_info_vec); |
235 | | |
236 | | int64_t get_txn_by_tablet_version(int64_t tablet_id, int64_t version); |
237 | | void update_tablet_version_txn(int64_t tablet_id, int64_t version, int64_t txn_id); |
238 | | |
239 | | TxnState get_txn_state(TPartitionId partition_id, TTransactionId transaction_id, |
240 | | TTabletId tablet_id, TabletUid tablet_uid); |
241 | | |
242 | | private: |
243 | | using TxnKey = std::pair<int64_t, int64_t>; // partition_id, transaction_id; |
244 | | |
245 | | // Implement TxnKey hash function to support TxnKey as a key for `unordered_map`. |
246 | | struct TxnKeyHash { |
247 | | template <typename T, typename U> |
248 | 195 | size_t operator()(const std::pair<T, U>& e) const { |
249 | 195 | return std::hash<T>()(e.first) ^ std::hash<U>()(e.second); |
250 | 195 | } |
251 | | }; |
252 | | |
253 | | // Implement TxnKey equal function to support TxnKey as a key for `unordered_map`. |
254 | | struct TxnKeyEqual { |
255 | | template <class T, typename U> |
256 | 125 | bool operator()(const std::pair<T, U>& l, const std::pair<T, U>& r) const { |
257 | 125 | return l.first == r.first && l.second == r.second; |
258 | 125 | } |
259 | | }; |
260 | | |
261 | | using txn_tablet_map_t = |
262 | | std::unordered_map<TxnKey, std::map<TabletInfo, std::shared_ptr<TabletTxnInfo>>, |
263 | | TxnKeyHash, TxnKeyEqual>; |
264 | | using txn_partition_map_t = std::unordered_map<int64_t, std::unordered_set<int64_t>>; |
265 | | using txn_tablet_delta_writer_map_t = |
266 | | std::unordered_map<int64_t, std::map<int64_t, DeltaWriter*>>; |
267 | | |
268 | | std::shared_mutex& _get_txn_map_lock(TTransactionId transactionId); |
269 | | |
270 | | txn_tablet_map_t& _get_txn_tablet_map(TTransactionId transactionId); |
271 | | |
272 | | txn_partition_map_t& _get_txn_partition_map(TTransactionId transactionId); |
273 | | |
274 | | inline std::shared_mutex& _get_txn_lock(TTransactionId transactionId); |
275 | | |
276 | | std::shared_mutex& _get_txn_tablet_delta_writer_map_lock(TTransactionId transactionId); |
277 | | |
278 | | txn_tablet_delta_writer_map_t& _get_txn_tablet_delta_writer_map(TTransactionId transactionId); |
279 | | |
280 | | // Insert or remove (transaction_id, partition_id) from _txn_partition_map |
281 | | // get _txn_map_lock before calling. |
282 | | void _insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id); |
283 | | void _clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id); |
284 | | |
285 | | class TabletVersionCache : public LRUCachePolicy { |
286 | | public: |
287 | | TabletVersionCache(size_t capacity) |
288 | | : LRUCachePolicy(CachePolicy::CacheType::TABLET_VERSION_CACHE, capacity, |
289 | | LRUCacheType::NUMBER, -1, DEFAULT_LRU_CACHE_NUM_SHARDS, |
290 | 151 | DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY, false) {} |
291 | | }; |
292 | | |
293 | | private: |
294 | | StorageEngine& _engine; |
295 | | |
296 | | const int32_t _txn_map_shard_size; |
297 | | |
298 | | const int32_t _txn_shard_size; |
299 | | |
300 | | // _txn_map_locks[i] protect _txn_tablet_maps[i], i=0,1,2...,and i < _txn_map_shard_size |
301 | | txn_tablet_map_t* _txn_tablet_maps = nullptr; |
302 | | // transaction_id -> corresponding partition ids |
303 | | // This is mainly for the clear txn task received from FE, which may only has transaction id, |
304 | | // so we need this map to find out which partitions are corresponding to a transaction id. |
305 | | // The _txn_partition_maps[i] should be constructed/deconstructed/modified alongside with '_txn_tablet_maps[i]' |
306 | | txn_partition_map_t* _txn_partition_maps = nullptr; |
307 | | |
308 | | std::shared_mutex* _txn_map_locks = nullptr; |
309 | | |
310 | | std::shared_mutex* _txn_mutex = nullptr; |
311 | | |
312 | | txn_tablet_delta_writer_map_t* _txn_tablet_delta_writer_map = nullptr; |
313 | | std::unique_ptr<TabletVersionCache> _tablet_version_cache; |
314 | | std::shared_mutex* _txn_tablet_delta_writer_map_locks = nullptr; |
315 | | DISALLOW_COPY_AND_ASSIGN(TxnManager); |
316 | | }; // TxnManager |
317 | | |
318 | 163 | inline std::shared_mutex& TxnManager::_get_txn_map_lock(TTransactionId transactionId) { |
319 | 163 | return _txn_map_locks[transactionId & (_txn_map_shard_size - 1)]; |
320 | 163 | } |
321 | | |
322 | 163 | inline TxnManager::txn_tablet_map_t& TxnManager::_get_txn_tablet_map(TTransactionId transactionId) { |
323 | 163 | return _txn_tablet_maps[transactionId & (_txn_map_shard_size - 1)]; |
324 | 163 | } |
325 | | |
326 | | inline TxnManager::txn_partition_map_t& TxnManager::_get_txn_partition_map( |
327 | 113 | TTransactionId transactionId) { |
328 | 113 | return _txn_partition_maps[transactionId & (_txn_map_shard_size - 1)]; |
329 | 113 | } |
330 | | |
331 | 59 | inline std::shared_mutex& TxnManager::_get_txn_lock(TTransactionId transactionId) { |
332 | 59 | return _txn_mutex[transactionId & (_txn_shard_size - 1)]; |
333 | 59 | } |
334 | | |
335 | | inline std::shared_mutex& TxnManager::_get_txn_tablet_delta_writer_map_lock( |
336 | 0 | TTransactionId transactionId) { |
337 | 0 | return _txn_tablet_delta_writer_map_locks[transactionId & (_txn_map_shard_size - 1)]; |
338 | 0 | } |
339 | | |
340 | | inline TxnManager::txn_tablet_delta_writer_map_t& TxnManager::_get_txn_tablet_delta_writer_map( |
341 | 0 | TTransactionId transactionId) { |
342 | 0 | return _txn_tablet_delta_writer_map[transactionId & (_txn_map_shard_size - 1)]; |
343 | 0 | } |
344 | | |
345 | | } // namespace doris |