Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <butil/macros.h> |
21 | | #include <gen_cpp/Types_types.h> |
22 | | #include <gen_cpp/types.pb.h> |
23 | | #include <stddef.h> |
24 | | #include <stdint.h> |
25 | | |
26 | | #include <boost/container/detail/std_fwd.hpp> |
27 | | #include <map> |
28 | | #include <memory> |
29 | | #include <mutex> |
30 | | #include <set> |
31 | | #include <shared_mutex> |
32 | | #include <unordered_map> |
33 | | #include <unordered_set> |
34 | | #include <utility> |
35 | | #include <vector> |
36 | | |
37 | | #include "common/status.h" |
38 | | #include "core/block/block.h" |
39 | | #include "runtime/memory/lru_cache_policy.h" |
40 | | #include "storage/olap_common.h" |
41 | | #include "storage/rowset/pending_rowset_helper.h" |
42 | | #include "storage/rowset/rowset.h" |
43 | | #include "storage/rowset/rowset_meta.h" |
44 | | #include "storage/segment/segment.h" |
45 | | #include "storage/segment/segment_writer.h" |
46 | | #include "storage/tablet/tablet.h" |
47 | | #include "storage/tablet/tablet_meta.h" |
48 | | #include "util/time.h" |
49 | | |
50 | | namespace doris { |
51 | | class DeltaWriter; |
52 | | class OlapMeta; |
53 | | struct TabletPublishStatistics; |
54 | | struct PartialUpdateInfo; |
55 | | |
56 | | enum class TxnState { |
57 | | NOT_FOUND = 0, |
58 | | PREPARED = 1, |
59 | | COMMITTED = 2, |
60 | | ROLLEDBACK = 3, |
61 | | ABORTED = 4, |
62 | | DELETED = 5, |
63 | | }; |
64 | | enum class PublishStatus { INIT = 0, PREPARE = 1, SUCCEED = 2 }; |
65 | | |
66 | | struct TxnPublishInfo { |
67 | | int64_t publish_version {-1}; |
68 | | int64_t base_compaction_cnt {-1}; |
69 | | int64_t cumulative_compaction_cnt {-1}; |
70 | | int64_t cumulative_point {-1}; |
71 | | }; |
72 | | |
73 | | struct TabletTxnInfo { |
74 | | PUniqueId load_id; |
75 | | RowsetSharedPtr rowset; |
76 | | // The list of rowsets committed along with the transaction rowset |
77 | | // currently contains only the binlog<Row> rowset. |
78 | | std::vector<RowsetSharedPtr> attach_rowsets; |
79 | | PendingRowsetGuard pending_rs_guard; |
80 | | bool unique_key_merge_on_write {false}; |
81 | | DeleteBitmapPtr delete_bitmap; |
82 | | // copy delete_bitmap of data rowset to binlog |
83 | | DeleteBitmapPtr binlog_delvec; |
84 | | // records rowsets calc in commit txn |
85 | | RowsetIdUnorderedSet rowset_ids; |
86 | | int64_t creation_time; |
87 | | bool ingest {false}; |
88 | | std::shared_ptr<PartialUpdateInfo> partial_update_info; |
89 | | |
90 | | // for cloud only, used to determine if a retry CloudTabletCalcDeleteBitmapTask |
91 | | // needs to re-calculate the delete bitmap |
92 | | std::shared_ptr<PublishStatus> publish_status; |
93 | | TxnPublishInfo publish_info; |
94 | | |
95 | | // for cloud only, used to calculate delete bitmap for txn load |
96 | | bool is_txn_load = false; |
97 | | std::vector<RowsetSharedPtr> invisible_rowsets; |
98 | | int64_t lock_id; |
99 | | int64_t next_visible_version; |
100 | | |
101 | | TxnState state {TxnState::PREPARED}; |
102 | 0 | TabletTxnInfo() = default; |
103 | | |
104 | | TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset) |
105 | 36 | : load_id(std::move(load_id)), |
106 | 36 | rowset(std::move(rowset)), |
107 | 36 | creation_time(UnixSeconds()) {} |
108 | | |
109 | | TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool ingest_arg) |
110 | 36 | : load_id(std::move(load_id)), |
111 | 36 | rowset(std::move(rowset)), |
112 | 36 | creation_time(UnixSeconds()), |
113 | 36 | ingest(ingest_arg) {} |
114 | | |
115 | | TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool merge_on_write, |
116 | | DeleteBitmapPtr delete_bitmap, RowsetIdUnorderedSet ids) |
117 | | : load_id(std::move(load_id)), |
118 | | rowset(std::move(rowset)), |
119 | | unique_key_merge_on_write(merge_on_write), |
120 | | delete_bitmap(std::move(delete_bitmap)), |
121 | | rowset_ids(std::move(ids)), |
122 | 0 | creation_time(UnixSeconds()) {} |
123 | | |
124 | 36 | void prepare() { state = TxnState::PREPARED; } |
125 | 36 | void commit() { state = TxnState::COMMITTED; } |
126 | 0 | void rollback() { state = TxnState::ROLLEDBACK; } |
127 | 0 | void abort() { |
128 | 0 | if (state == TxnState::PREPARED) { |
129 | 0 | state = TxnState::ABORTED; |
130 | 0 | } |
131 | 0 | } |
132 | | }; |
133 | | |
134 | | struct CommitTabletTxnInfo { |
135 | | TTransactionId transaction_id {0}; |
136 | | TPartitionId partition_id {0}; |
137 | | DeleteBitmapPtr delete_bitmap; |
138 | | RowsetIdUnorderedSet rowset_ids; |
139 | | std::shared_ptr<PartialUpdateInfo> partial_update_info; |
140 | | }; |
141 | | |
142 | | using CommitTabletTxnInfoVec = std::vector<CommitTabletTxnInfo>; |
143 | | |
144 | | // txn manager is used to manage mapping between tablet and txns |
145 | | class TxnManager { |
146 | | public: |
147 | | TxnManager(StorageEngine& engine, int32_t txn_map_shard_size, int32_t txn_shard_size); |
148 | | |
149 | 375 | ~TxnManager() { |
150 | 375 | delete[] _txn_tablet_maps; |
151 | 375 | delete[] _txn_partition_maps; |
152 | 375 | delete[] _txn_map_locks; |
153 | 375 | delete[] _txn_mutex; |
154 | 375 | delete[] _txn_tablet_delta_writer_map; |
155 | 375 | delete[] _txn_tablet_delta_writer_map_locks; |
156 | 375 | } |
157 | | |
158 | | class CacheValue : public LRUCacheValueBase { |
159 | | public: |
160 | | int64_t value; |
161 | | }; |
162 | | |
163 | | // add a txn to manager |
164 | | // partition id is useful in publish version stage because version is associated with partition |
165 | | Status prepare_txn(TPartitionId partition_id, const Tablet& tablet, |
166 | | TTransactionId transaction_id, const PUniqueId& load_id, |
167 | | bool is_ingest = false); |
168 | | // most used for ut |
169 | | Status prepare_txn(TPartitionId partition_id, TTransactionId transaction_id, |
170 | | TTabletId tablet_id, TabletUid tablet_uid, const PUniqueId& load_id, |
171 | | bool is_ingest = false); |
172 | | |
173 | | Status commit_txn(TPartitionId partition_id, const Tablet& tablet, |
174 | | TTransactionId transaction_id, const PUniqueId& load_id, |
175 | | const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard, bool is_recovery, |
176 | | std::shared_ptr<PartialUpdateInfo> partial_update_info = nullptr, |
177 | | std::vector<RowsetSharedPtr>* attach_rowsets = nullptr); |
178 | | |
179 | | Status publish_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, |
180 | | TTransactionId transaction_id, const Version& version, |
181 | | TabletPublishStatistics* stats, |
182 | | std::shared_ptr<TabletTxnInfo>& extend_tablet_txn_info, |
183 | | const int64_t commit_tso = -1); |
184 | | |
185 | | // delete the txn from manager if it is not committed(not have a valid rowset) |
186 | | Status rollback_txn(TPartitionId partition_id, const Tablet& tablet, |
187 | | TTransactionId transaction_id); |
188 | | |
189 | | Status delete_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, |
190 | | TTransactionId transaction_id); |
191 | | |
192 | | Status commit_txn(OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id, |
193 | | TTabletId tablet_id, TabletUid tablet_uid, const PUniqueId& load_id, |
194 | | const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard, bool is_recovery, |
195 | | std::shared_ptr<PartialUpdateInfo> partial_update_info = nullptr, |
196 | | std::vector<RowsetSharedPtr>* attach_rowsets = nullptr); |
197 | | |
198 | | // remove a txn from txn manager |
199 | | // not persist rowset meta because |
200 | | Status publish_txn(OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id, |
201 | | TTabletId tablet_id, TabletUid tablet_uid, const Version& version, |
202 | | TabletPublishStatistics* stats, |
203 | | std::shared_ptr<TabletTxnInfo>& extend_tablet_txn_info, |
204 | | const int64_t commit_tso = -1); |
205 | | |
206 | | // only abort not committed txn |
207 | | void abort_txn(TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, |
208 | | TabletUid tablet_uid); |
209 | | |
210 | | // delete the txn from manager if it is not committed(not have a valid rowset) |
211 | | Status rollback_txn(TPartitionId partition_id, TTransactionId transaction_id, |
212 | | TTabletId tablet_id, TabletUid tablet_uid); |
213 | | |
214 | | // remove the txn from txn manager |
215 | | // delete the related rowset if it is not null |
216 | | // delete rowset related data if it is not null |
217 | | Status delete_txn(OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id, |
218 | | TTabletId tablet_id, TabletUid tablet_uid); |
219 | | |
220 | | void get_tablet_related_txns(TTabletId tablet_id, TabletUid tablet_uid, int64_t* partition_id, |
221 | | std::set<int64_t>* transaction_ids); |
222 | | |
223 | | void get_txn_related_tablets( |
224 | | const TTransactionId transaction_id, TPartitionId partition_ids, |
225 | | std::map<TabletInfo, RowsetSharedPtr>* tablet_infos, |
226 | | std::map<TabletInfo, std::vector<RowsetSharedPtr>>* tablet_attach_rowsets = nullptr); |
227 | | |
228 | | void get_all_related_tablets(std::set<TabletInfo>* tablet_infos); |
229 | | |
230 | | // Get all expired txns and save them in expire_txn_map. |
231 | | // This is currently called before reporting all tablet info, to avoid iterating txn map for every tablets. |
232 | | void build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>* expire_txn_map); |
233 | | |
234 | | void force_rollback_tablet_related_txns(OlapMeta* meta, TTabletId tablet_id, |
235 | | TabletUid tablet_uid); |
236 | | |
237 | | void get_partition_ids(const TTransactionId transaction_id, |
238 | | std::vector<TPartitionId>* partition_ids); |
239 | | |
240 | | void add_txn_tablet_delta_writer(int64_t transaction_id, int64_t tablet_id, |
241 | | DeltaWriter* delta_writer); |
242 | | void clear_txn_tablet_delta_writer(int64_t transaction_id); |
243 | | void finish_slave_tablet_pull_rowset(int64_t transaction_id, int64_t tablet_id, int64_t node_id, |
244 | | bool is_succeed); |
245 | | |
246 | | void set_txn_related_delete_bitmap(TPartitionId partition_id, TTransactionId transaction_id, |
247 | | TTabletId tablet_id, TabletUid tablet_uid, |
248 | | bool unique_key_merge_on_write, |
249 | | DeleteBitmapPtr delete_bitmap, |
250 | | const RowsetIdUnorderedSet& rowset_ids, |
251 | | std::shared_ptr<PartialUpdateInfo> partial_update_info); |
252 | | void get_all_commit_tablet_txn_info_by_tablet( |
253 | | const Tablet& tablet, CommitTabletTxnInfoVec* commit_tablet_txn_info_vec); |
254 | | |
255 | | int64_t get_txn_by_tablet_version(int64_t tablet_id, int64_t version); |
256 | | void update_tablet_version_txn(int64_t tablet_id, int64_t version, int64_t txn_id); |
257 | | |
258 | | TxnState get_txn_state(TPartitionId partition_id, TTransactionId transaction_id, |
259 | | TTabletId tablet_id, TabletUid tablet_uid); |
260 | | |
261 | | void remove_txn_tablet_info(TPartitionId partition_id, TTransactionId transaction_id, |
262 | | TTabletId tablet_id, TabletUid tablet_uid); |
263 | | |
264 | | private: |
265 | | using TxnKey = std::pair<int64_t, int64_t>; // partition_id, transaction_id; |
266 | | |
267 | | // Implement TxnKey hash function to support TxnKey as a key for `unordered_map`. |
268 | | struct TxnKeyHash { |
269 | | template <typename T, typename U> |
270 | 269 | size_t operator()(const std::pair<T, U>& e) const { |
271 | 269 | return std::hash<T>()(e.first) ^ std::hash<U>()(e.second); |
272 | 269 | } |
273 | | }; |
274 | | |
275 | | // Implement TxnKey equal function to support TxnKey as a key for `unordered_map`. |
276 | | struct TxnKeyEqual { |
277 | | template <class T, typename U> |
278 | 142 | bool operator()(const std::pair<T, U>& l, const std::pair<T, U>& r) const { |
279 | 142 | return l.first == r.first && l.second == r.second; |
280 | 142 | } |
281 | | }; |
282 | | |
283 | | using txn_tablet_map_t = |
284 | | std::unordered_map<TxnKey, std::map<TabletInfo, std::shared_ptr<TabletTxnInfo>>, |
285 | | TxnKeyHash, TxnKeyEqual>; |
286 | | using txn_partition_map_t = std::unordered_map<int64_t, std::unordered_set<int64_t>>; |
287 | | using txn_tablet_delta_writer_map_t = |
288 | | std::unordered_map<int64_t, std::map<int64_t, DeltaWriter*>>; |
289 | | |
290 | | std::shared_mutex& _get_txn_map_lock(TTransactionId transactionId); |
291 | | |
292 | | txn_tablet_map_t& _get_txn_tablet_map(TTransactionId transactionId); |
293 | | |
294 | | txn_partition_map_t& _get_txn_partition_map(TTransactionId transactionId); |
295 | | |
296 | | inline std::shared_mutex& _get_txn_lock(TTransactionId transactionId); |
297 | | |
298 | | std::shared_mutex& _get_txn_tablet_delta_writer_map_lock(TTransactionId transactionId); |
299 | | |
300 | | txn_tablet_delta_writer_map_t& _get_txn_tablet_delta_writer_map(TTransactionId transactionId); |
301 | | |
302 | | // Insert or remove (transaction_id, partition_id) from _txn_partition_map |
303 | | // get _txn_map_lock before calling. |
304 | | void _insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id); |
305 | | void _clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id); |
306 | | |
307 | | void _remove_txn_tablet_info_unlocked(TPartitionId partition_id, TTransactionId transaction_id, |
308 | | TTabletId tablet_id, TabletUid tablet_uid, |
309 | | std::lock_guard<std::shared_mutex>& txn_lock, |
310 | | std::lock_guard<std::shared_mutex>& wrlock); |
311 | | |
312 | | class TabletVersionCache : public LRUCachePolicy { |
313 | | public: |
314 | | TabletVersionCache(size_t capacity) |
315 | 375 | : LRUCachePolicy(CachePolicy::CacheType::TABLET_VERSION_CACHE, capacity, |
316 | 375 | LRUCacheType::NUMBER, /*sweeptime*/ -1, |
317 | 375 | /*num_shards*/ 32, |
318 | 375 | /*element_count_capacity*/ 0, /*enable_prune*/ false, |
319 | 375 | /*is_lru_k*/ false) {} |
320 | | }; |
321 | | |
322 | | private: |
323 | | StorageEngine& _engine; |
324 | | |
325 | | const int32_t _txn_map_shard_size; |
326 | | |
327 | | const int32_t _txn_shard_size; |
328 | | |
329 | | // _txn_map_locks[i] protect _txn_tablet_maps[i], i=0,1,2...,and i < _txn_map_shard_size |
330 | | txn_tablet_map_t* _txn_tablet_maps = nullptr; |
331 | | // transaction_id -> corresponding partition ids |
332 | | // This is mainly for the clear txn task received from FE, which may only has transaction id, |
333 | | // so we need this map to find out which partitions are corresponding to a transaction id. |
334 | | // The _txn_partition_maps[i] should be constructed/deconstructed/modified alongside with '_txn_tablet_maps[i]' |
335 | | txn_partition_map_t* _txn_partition_maps = nullptr; |
336 | | |
337 | | std::shared_mutex* _txn_map_locks = nullptr; |
338 | | |
339 | | std::shared_mutex* _txn_mutex = nullptr; |
340 | | |
341 | | txn_tablet_delta_writer_map_t* _txn_tablet_delta_writer_map = nullptr; |
342 | | std::unique_ptr<TabletVersionCache> _tablet_version_cache; |
343 | | std::shared_mutex* _txn_tablet_delta_writer_map_locks = nullptr; |
344 | | DISALLOW_COPY_AND_ASSIGN(TxnManager); |
345 | | }; // TxnManager |
346 | | |
347 | 199 | inline std::shared_mutex& TxnManager::_get_txn_map_lock(TTransactionId transactionId) { |
348 | 199 | return _txn_map_locks[transactionId & (_txn_map_shard_size - 1)]; |
349 | 199 | } |
350 | | |
351 | 197 | inline TxnManager::txn_tablet_map_t& TxnManager::_get_txn_tablet_map(TTransactionId transactionId) { |
352 | 197 | return _txn_tablet_maps[transactionId & (_txn_map_shard_size - 1)]; |
353 | 197 | } |
354 | | |
355 | | inline TxnManager::txn_partition_map_t& TxnManager::_get_txn_partition_map( |
356 | 138 | TTransactionId transactionId) { |
357 | 138 | return _txn_partition_maps[transactionId & (_txn_map_shard_size - 1)]; |
358 | 138 | } |
359 | | |
360 | 76 | inline std::shared_mutex& TxnManager::_get_txn_lock(TTransactionId transactionId) { |
361 | 76 | return _txn_mutex[transactionId & (_txn_shard_size - 1)]; |
362 | 76 | } |
363 | | |
364 | | inline std::shared_mutex& TxnManager::_get_txn_tablet_delta_writer_map_lock( |
365 | 0 | TTransactionId transactionId) { |
366 | 0 | return _txn_tablet_delta_writer_map_locks[transactionId & (_txn_map_shard_size - 1)]; |
367 | 0 | } |
368 | | |
369 | | inline TxnManager::txn_tablet_delta_writer_map_t& TxnManager::_get_txn_tablet_delta_writer_map( |
370 | 0 | TTransactionId transactionId) { |
371 | 0 | return _txn_tablet_delta_writer_map[transactionId & (_txn_map_shard_size - 1)]; |
372 | 0 | } |
373 | | |
374 | | } // namespace doris |