Coverage Report

Created: 2025-05-01 03:10

/root/doris/be/src/olap/txn_manager.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <butil/macros.h>
21
#include <gen_cpp/Types_types.h>
22
#include <gen_cpp/types.pb.h>
23
#include <stddef.h>
24
#include <stdint.h>
25
26
#include <boost/container/detail/std_fwd.hpp>
27
#include <map>
28
#include <memory>
29
#include <mutex>
30
#include <set>
31
#include <shared_mutex>
32
#include <unordered_map>
33
#include <unordered_set>
34
#include <utility>
35
#include <vector>
36
37
#include "common/status.h"
38
#include "olap/olap_common.h"
39
#include "olap/rowset/pending_rowset_helper.h"
40
#include "olap/rowset/rowset.h"
41
#include "olap/rowset/rowset_meta.h"
42
#include "olap/rowset/segment_v2/segment.h"
43
#include "olap/rowset/segment_v2/segment_writer.h"
44
#include "olap/tablet.h"
45
#include "olap/tablet_meta.h"
46
#include "runtime/memory/lru_cache_policy.h"
47
#include "util/time.h"
48
#include "vec/core/block.h"
49
50
namespace doris {
51
class DeltaWriter;
52
class OlapMeta;
53
struct TabletPublishStatistics;
54
struct PartialUpdateInfo;
55
56
enum class TxnState {
57
    NOT_FOUND = 0,
58
    PREPARED = 1,
59
    COMMITTED = 2,
60
    ROLLEDBACK = 3,
61
    ABORTED = 4,
62
    DELETED = 5,
63
};
64
65
struct TabletTxnInfo {
66
    PUniqueId load_id;
67
    RowsetSharedPtr rowset;
68
    PendingRowsetGuard pending_rs_guard;
69
    bool unique_key_merge_on_write {false};
70
    DeleteBitmapPtr delete_bitmap;
71
    // records rowsets calc in commit txn
72
    RowsetIdUnorderedSet rowset_ids;
73
    int64_t creation_time;
74
    bool ingest {false};
75
    std::shared_ptr<PartialUpdateInfo> partial_update_info;
76
    TxnState state {TxnState::PREPARED};
77
78
    TabletTxnInfo() = default;
79
80
    TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset)
81
29
            : load_id(load_id), rowset(rowset), creation_time(UnixSeconds()) {}
82
83
    TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool ingest_arg)
84
31
            : load_id(load_id), rowset(rowset), creation_time(UnixSeconds()), ingest(ingest_arg) {}
85
86
    TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool merge_on_write,
87
                  DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& ids)
88
            : load_id(load_id),
89
              rowset(rowset),
90
              unique_key_merge_on_write(merge_on_write),
91
              delete_bitmap(delete_bitmap),
92
              rowset_ids(ids),
93
0
              creation_time(UnixSeconds()) {}
94
95
31
    void prepare() { state = TxnState::PREPARED; }
96
29
    void commit() { state = TxnState::COMMITTED; }
97
0
    void rollback() { state = TxnState::ROLLEDBACK; }
98
0
    void abort() {
99
0
        if (state == TxnState::PREPARED) {
100
0
            state = TxnState::ABORTED;
101
0
        }
102
0
    }
103
};
104
105
struct CommitTabletTxnInfo {
106
    TTransactionId transaction_id {0};
107
    TPartitionId partition_id {0};
108
    DeleteBitmapPtr delete_bitmap;
109
    RowsetIdUnorderedSet rowset_ids;
110
    std::shared_ptr<PartialUpdateInfo> partial_update_info;
111
};
112
113
using CommitTabletTxnInfoVec = std::vector<CommitTabletTxnInfo>;
114
115
// txn manager is used to manage mapping between tablet and txns
116
class TxnManager {
117
public:
118
    TxnManager(StorageEngine& engine, int32_t txn_map_shard_size, int32_t txn_shard_size);
119
120
151
    ~TxnManager() {
121
151
        delete[] _txn_tablet_maps;
122
151
        delete[] _txn_partition_maps;
123
151
        delete[] _txn_map_locks;
124
151
        delete[] _txn_mutex;
125
151
        delete[] _txn_tablet_delta_writer_map;
126
151
        delete[] _txn_tablet_delta_writer_map_locks;
127
151
    }
128
129
    class CacheValue : public LRUCacheValueBase {
130
    public:
131
        int64_t value;
132
    };
133
134
    // add a txn to manager
135
    // partition id is useful in publish version stage because version is associated with partition
136
    Status prepare_txn(TPartitionId partition_id, const Tablet& tablet,
137
                       TTransactionId transaction_id, const PUniqueId& load_id,
138
                       bool is_ingest = false);
139
    // most used for ut
140
    Status prepare_txn(TPartitionId partition_id, TTransactionId transaction_id,
141
                       TTabletId tablet_id, TabletUid tablet_uid, const PUniqueId& load_id,
142
                       bool is_ingest = false);
143
144
    Status commit_txn(TPartitionId partition_id, const Tablet& tablet,
145
                      TTransactionId transaction_id, const PUniqueId& load_id,
146
                      const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard, bool is_recovery,
147
                      std::shared_ptr<PartialUpdateInfo> partial_update_info = nullptr);
148
149
    Status publish_txn(TPartitionId partition_id, const TabletSharedPtr& tablet,
150
                       TTransactionId transaction_id, const Version& version,
151
                       TabletPublishStatistics* stats,
152
                       std::shared_ptr<TabletTxnInfo>& extend_tablet_txn_info);
153
154
    // delete the txn from manager if it is not committed(not have a valid rowset)
155
    Status rollback_txn(TPartitionId partition_id, const Tablet& tablet,
156
                        TTransactionId transaction_id);
157
158
    Status delete_txn(TPartitionId partition_id, const TabletSharedPtr& tablet,
159
                      TTransactionId transaction_id);
160
161
    Status commit_txn(OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id,
162
                      TTabletId tablet_id, TabletUid tablet_uid, const PUniqueId& load_id,
163
                      const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard, bool is_recovery,
164
                      std::shared_ptr<PartialUpdateInfo> partial_update_info = nullptr);
165
166
    // remove a txn from txn manager
167
    // not persist rowset meta because
168
    Status publish_txn(OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id,
169
                       TTabletId tablet_id, TabletUid tablet_uid, const Version& version,
170
                       TabletPublishStatistics* stats,
171
                       std::shared_ptr<TabletTxnInfo>& extend_tablet_txn_info);
172
173
    // only abort not committed txn
174
    void abort_txn(TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id,
175
                   TabletUid tablet_uid);
176
177
    // delete the txn from manager if it is not committed(not have a valid rowset)
178
    Status rollback_txn(TPartitionId partition_id, TTransactionId transaction_id,
179
                        TTabletId tablet_id, TabletUid tablet_uid);
180
181
    // remove the txn from txn manager
182
    // delete the related rowset if it is not null
183
    // delete rowset related data if it is not null
184
    Status delete_txn(OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id,
185
                      TTabletId tablet_id, TabletUid tablet_uid);
186
187
    void get_tablet_related_txns(TTabletId tablet_id, TabletUid tablet_uid, int64_t* partition_id,
188
                                 std::set<int64_t>* transaction_ids);
189
190
    void get_txn_related_tablets(const TTransactionId transaction_id, TPartitionId partition_ids,
191
                                 std::map<TabletInfo, RowsetSharedPtr>* tablet_infos);
192
193
    void get_all_related_tablets(std::set<TabletInfo>* tablet_infos);
194
195
    // Get all expired txns and save them in expire_txn_map.
196
    // This is currently called before reporting all tablet info, to avoid iterating txn map for every tablets.
197
    void build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>* expire_txn_map);
198
199
    void force_rollback_tablet_related_txns(OlapMeta* meta, TTabletId tablet_id,
200
                                            TabletUid tablet_uid);
201
202
    void get_partition_ids(const TTransactionId transaction_id,
203
                           std::vector<TPartitionId>* partition_ids);
204
205
    void add_txn_tablet_delta_writer(int64_t transaction_id, int64_t tablet_id,
206
                                     DeltaWriter* delta_writer);
207
    void clear_txn_tablet_delta_writer(int64_t transaction_id);
208
    void finish_slave_tablet_pull_rowset(int64_t transaction_id, int64_t tablet_id, int64_t node_id,
209
                                         bool is_succeed);
210
211
    void set_txn_related_delete_bitmap(TPartitionId partition_id, TTransactionId transaction_id,
212
                                       TTabletId tablet_id, TabletUid tablet_uid,
213
                                       bool unique_key_merge_on_write,
214
                                       DeleteBitmapPtr delete_bitmap,
215
                                       const RowsetIdUnorderedSet& rowset_ids,
216
                                       std::shared_ptr<PartialUpdateInfo> partial_update_info);
217
    void get_all_commit_tablet_txn_info_by_tablet(
218
            const TabletSharedPtr& tablet, CommitTabletTxnInfoVec* commit_tablet_txn_info_vec);
219
220
    int64_t get_txn_by_tablet_version(int64_t tablet_id, int64_t version);
221
    void update_tablet_version_txn(int64_t tablet_id, int64_t version, int64_t txn_id);
222
223
    TxnState get_txn_state(TPartitionId partition_id, TTransactionId transaction_id,
224
                           TTabletId tablet_id, TabletUid tablet_uid);
225
226
    void remove_txn_tablet_info(TPartitionId partition_id, TTransactionId transaction_id,
227
                                TTabletId tablet_id, TabletUid tablet_uid);
228
229
private:
230
    using TxnKey = std::pair<int64_t, int64_t>; // partition_id, transaction_id;
231
232
    // Implement TxnKey hash function to support TxnKey as a key for `unordered_map`.
233
    struct TxnKeyHash {
234
        template <typename T, typename U>
235
221
        size_t operator()(const std::pair<T, U>& e) const {
236
221
            return std::hash<T>()(e.first) ^ std::hash<U>()(e.second);
237
221
        }
238
    };
239
240
    // Implement TxnKey equal function to support TxnKey as a key for `unordered_map`.
241
    struct TxnKeyEqual {
242
        template <class T, typename U>
243
126
        bool operator()(const std::pair<T, U>& l, const std::pair<T, U>& r) const {
244
126
            return l.first == r.first && l.second == r.second;
245
126
        }
246
    };
247
248
    using txn_tablet_map_t =
249
            std::unordered_map<TxnKey, std::map<TabletInfo, std::shared_ptr<TabletTxnInfo>>,
250
                               TxnKeyHash, TxnKeyEqual>;
251
    using txn_partition_map_t = std::unordered_map<int64_t, std::unordered_set<int64_t>>;
252
    using txn_tablet_delta_writer_map_t =
253
            std::unordered_map<int64_t, std::map<int64_t, DeltaWriter*>>;
254
255
    std::shared_mutex& _get_txn_map_lock(TTransactionId transactionId);
256
257
    txn_tablet_map_t& _get_txn_tablet_map(TTransactionId transactionId);
258
259
    txn_partition_map_t& _get_txn_partition_map(TTransactionId transactionId);
260
261
    inline std::shared_mutex& _get_txn_lock(TTransactionId transactionId);
262
263
    std::shared_mutex& _get_txn_tablet_delta_writer_map_lock(TTransactionId transactionId);
264
265
    txn_tablet_delta_writer_map_t& _get_txn_tablet_delta_writer_map(TTransactionId transactionId);
266
267
    // Insert or remove (transaction_id, partition_id) from _txn_partition_map
268
    // get _txn_map_lock before calling.
269
    void _insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id);
270
    void _clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id);
271
272
    void _remove_txn_tablet_info_unlocked(TPartitionId partition_id, TTransactionId transaction_id,
273
                                          TTabletId tablet_id, TabletUid tablet_uid,
274
                                          std::lock_guard<std::shared_mutex>& txn_lock,
275
                                          std::lock_guard<std::shared_mutex>& wrlock);
276
277
    class TabletVersionCache : public LRUCachePolicyTrackingManual {
278
    public:
279
        TabletVersionCache(size_t capacity)
280
                : LRUCachePolicyTrackingManual(CachePolicy::CacheType::TABLET_VERSION_CACHE,
281
                                               capacity, LRUCacheType::NUMBER, -1,
282
                                               DEFAULT_LRU_CACHE_NUM_SHARDS,
283
153
                                               DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY, false) {}
284
    };
285
286
private:
287
    StorageEngine& _engine;
288
289
    const int32_t _txn_map_shard_size;
290
291
    const int32_t _txn_shard_size;
292
293
    // _txn_map_locks[i] protect _txn_tablet_maps[i], i=0,1,2...,and i < _txn_map_shard_size
294
    txn_tablet_map_t* _txn_tablet_maps = nullptr;
295
    // transaction_id -> corresponding partition ids
296
    // This is mainly for the clear txn task received from FE, which may only has transaction id,
297
    // so we need this map to find out which partitions are corresponding to a transaction id.
298
    // The _txn_partition_maps[i] should be constructed/deconstructed/modified alongside with '_txn_tablet_maps[i]'
299
    txn_partition_map_t* _txn_partition_maps = nullptr;
300
301
    std::shared_mutex* _txn_map_locks = nullptr;
302
303
    std::shared_mutex* _txn_mutex = nullptr;
304
305
    txn_tablet_delta_writer_map_t* _txn_tablet_delta_writer_map = nullptr;
306
    std::unique_ptr<TabletVersionCache> _tablet_version_cache;
307
    std::shared_mutex* _txn_tablet_delta_writer_map_locks = nullptr;
308
    DISALLOW_COPY_AND_ASSIGN(TxnManager);
309
}; // TxnManager
310
311
161
inline std::shared_mutex& TxnManager::_get_txn_map_lock(TTransactionId transactionId) {
312
161
    return _txn_map_locks[transactionId & (_txn_map_shard_size - 1)];
313
161
}
314
315
159
inline TxnManager::txn_tablet_map_t& TxnManager::_get_txn_tablet_map(TTransactionId transactionId) {
316
159
    return _txn_tablet_maps[transactionId & (_txn_map_shard_size - 1)];
317
159
}
318
319
inline TxnManager::txn_partition_map_t& TxnManager::_get_txn_partition_map(
320
113
        TTransactionId transactionId) {
321
113
    return _txn_partition_maps[transactionId & (_txn_map_shard_size - 1)];
322
113
}
323
324
56
inline std::shared_mutex& TxnManager::_get_txn_lock(TTransactionId transactionId) {
325
56
    return _txn_mutex[transactionId & (_txn_shard_size - 1)];
326
56
}
327
328
inline std::shared_mutex& TxnManager::_get_txn_tablet_delta_writer_map_lock(
329
0
        TTransactionId transactionId) {
330
0
    return _txn_tablet_delta_writer_map_locks[transactionId & (_txn_map_shard_size - 1)];
331
0
}
332
333
inline TxnManager::txn_tablet_delta_writer_map_t& TxnManager::_get_txn_tablet_delta_writer_map(
334
0
        TTransactionId transactionId) {
335
0
    return _txn_tablet_delta_writer_map[transactionId & (_txn_map_shard_size - 1)];
336
0
}
337
338
} // namespace doris