Coverage Report

Created: 2026-05-12 22:35

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/txn/txn_manager.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <butil/macros.h>
21
#include <gen_cpp/Types_types.h>
22
#include <gen_cpp/types.pb.h>
23
#include <stddef.h>
24
#include <stdint.h>
25
26
#include <boost/container/detail/std_fwd.hpp>
27
#include <map>
28
#include <memory>
29
#include <mutex>
30
#include <set>
31
#include <shared_mutex>
32
#include <unordered_map>
33
#include <unordered_set>
34
#include <utility>
35
#include <vector>
36
37
#include "common/status.h"
38
#include "core/block/block.h"
39
#include "runtime/memory/lru_cache_policy.h"
40
#include "storage/olap_common.h"
41
#include "storage/rowset/pending_rowset_helper.h"
42
#include "storage/rowset/rowset.h"
43
#include "storage/rowset/rowset_meta.h"
44
#include "storage/segment/segment.h"
45
#include "storage/segment/segment_writer.h"
46
#include "storage/tablet/tablet.h"
47
#include "storage/tablet/tablet_meta.h"
48
#include "util/time.h"
49
50
namespace doris {
51
class DeltaWriter;
52
class OlapMeta;
53
struct TabletPublishStatistics;
54
struct PartialUpdateInfo;
55
56
enum class TxnState {
57
    NOT_FOUND = 0,
58
    PREPARED = 1,
59
    COMMITTED = 2,
60
    ROLLEDBACK = 3,
61
    ABORTED = 4,
62
    DELETED = 5,
63
};
64
enum class PublishStatus { INIT = 0, PREPARE = 1, SUCCEED = 2 };
65
66
struct TxnPublishInfo {
67
    int64_t publish_version {-1};
68
    int64_t base_compaction_cnt {-1};
69
    int64_t cumulative_compaction_cnt {-1};
70
    int64_t cumulative_point {-1};
71
};
72
73
struct TabletTxnInfo {
74
    PUniqueId load_id;
75
    RowsetSharedPtr rowset;
76
    // The list of rowsets committed along with the transaction rowset
77
    // currently contains only the binlog<Row> rowset.
78
    std::vector<RowsetSharedPtr> attach_rowsets;
79
    PendingRowsetGuard pending_rs_guard;
80
    bool unique_key_merge_on_write {false};
81
    DeleteBitmapPtr delete_bitmap;
82
    // copy delete_bitmap of data rowset to binlog
83
    DeleteBitmapPtr binlog_delvec;
84
    // records rowsets calc in commit txn
85
    RowsetIdUnorderedSet rowset_ids;
86
    int64_t creation_time;
87
    bool ingest {false};
88
    std::shared_ptr<PartialUpdateInfo> partial_update_info;
89
90
    // for cloud only, used to determine if a retry CloudTabletCalcDeleteBitmapTask
91
    // needs to re-calculate the delete bitmap
92
    std::shared_ptr<PublishStatus> publish_status;
93
    TxnPublishInfo publish_info;
94
95
    // for cloud only, used to calculate delete bitmap for txn load
96
    bool is_txn_load = false;
97
    std::vector<RowsetSharedPtr> invisible_rowsets;
98
    int64_t lock_id;
99
    int64_t next_visible_version;
100
101
    TxnState state {TxnState::PREPARED};
102
0
    TabletTxnInfo() = default;
103
104
    TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset)
105
36
            : load_id(std::move(load_id)),
106
36
              rowset(std::move(rowset)),
107
36
              creation_time(UnixSeconds()) {}
108
109
    TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool ingest_arg)
110
36
            : load_id(std::move(load_id)),
111
36
              rowset(std::move(rowset)),
112
36
              creation_time(UnixSeconds()),
113
36
              ingest(ingest_arg) {}
114
115
    TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool merge_on_write,
116
                  DeleteBitmapPtr delete_bitmap, RowsetIdUnorderedSet ids)
117
            : load_id(std::move(load_id)),
118
              rowset(std::move(rowset)),
119
              unique_key_merge_on_write(merge_on_write),
120
              delete_bitmap(std::move(delete_bitmap)),
121
              rowset_ids(std::move(ids)),
122
0
              creation_time(UnixSeconds()) {}
123
124
36
    void prepare() { state = TxnState::PREPARED; }
125
36
    void commit() { state = TxnState::COMMITTED; }
126
0
    void rollback() { state = TxnState::ROLLEDBACK; }
127
0
    void abort() {
128
0
        if (state == TxnState::PREPARED) {
129
0
            state = TxnState::ABORTED;
130
0
        }
131
0
    }
132
};
133
134
struct CommitTabletTxnInfo {
135
    TTransactionId transaction_id {0};
136
    TPartitionId partition_id {0};
137
    DeleteBitmapPtr delete_bitmap;
138
    RowsetIdUnorderedSet rowset_ids;
139
    std::shared_ptr<PartialUpdateInfo> partial_update_info;
140
};
141
142
using CommitTabletTxnInfoVec = std::vector<CommitTabletTxnInfo>;
143
144
// txn manager is used to manage mapping between tablet and txns
145
class TxnManager {
146
public:
147
    TxnManager(StorageEngine& engine, int32_t txn_map_shard_size, int32_t txn_shard_size);
148
149
375
    ~TxnManager() {
150
375
        delete[] _txn_tablet_maps;
151
375
        delete[] _txn_partition_maps;
152
375
        delete[] _txn_map_locks;
153
375
        delete[] _txn_mutex;
154
375
        delete[] _txn_tablet_delta_writer_map;
155
375
        delete[] _txn_tablet_delta_writer_map_locks;
156
375
    }
157
158
    class CacheValue : public LRUCacheValueBase {
159
    public:
160
        int64_t value;
161
    };
162
163
    // add a txn to manager
164
    // partition id is useful in publish version stage because version is associated with partition
165
    Status prepare_txn(TPartitionId partition_id, const Tablet& tablet,
166
                       TTransactionId transaction_id, const PUniqueId& load_id,
167
                       bool is_ingest = false);
168
    // most used for ut
169
    Status prepare_txn(TPartitionId partition_id, TTransactionId transaction_id,
170
                       TTabletId tablet_id, TabletUid tablet_uid, const PUniqueId& load_id,
171
                       bool is_ingest = false);
172
173
    Status commit_txn(TPartitionId partition_id, const Tablet& tablet,
174
                      TTransactionId transaction_id, const PUniqueId& load_id,
175
                      const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard, bool is_recovery,
176
                      std::shared_ptr<PartialUpdateInfo> partial_update_info = nullptr,
177
                      std::vector<RowsetSharedPtr>* attach_rowsets = nullptr);
178
179
    Status publish_txn(TPartitionId partition_id, const TabletSharedPtr& tablet,
180
                       TTransactionId transaction_id, const Version& version,
181
                       TabletPublishStatistics* stats,
182
                       std::shared_ptr<TabletTxnInfo>& extend_tablet_txn_info,
183
                       const int64_t commit_tso = -1);
184
185
    // delete the txn from manager if it is not committed(not have a valid rowset)
186
    Status rollback_txn(TPartitionId partition_id, const Tablet& tablet,
187
                        TTransactionId transaction_id);
188
189
    Status delete_txn(TPartitionId partition_id, const TabletSharedPtr& tablet,
190
                      TTransactionId transaction_id);
191
192
    Status commit_txn(OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id,
193
                      TTabletId tablet_id, TabletUid tablet_uid, const PUniqueId& load_id,
194
                      const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard, bool is_recovery,
195
                      std::shared_ptr<PartialUpdateInfo> partial_update_info = nullptr,
196
                      std::vector<RowsetSharedPtr>* attach_rowsets = nullptr);
197
198
    // remove a txn from txn manager
199
    // not persist rowset meta because
200
    Status publish_txn(OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id,
201
                       TTabletId tablet_id, TabletUid tablet_uid, const Version& version,
202
                       TabletPublishStatistics* stats,
203
                       std::shared_ptr<TabletTxnInfo>& extend_tablet_txn_info,
204
                       const int64_t commit_tso = -1);
205
206
    // only abort not committed txn
207
    void abort_txn(TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id,
208
                   TabletUid tablet_uid);
209
210
    // delete the txn from manager if it is not committed(not have a valid rowset)
211
    Status rollback_txn(TPartitionId partition_id, TTransactionId transaction_id,
212
                        TTabletId tablet_id, TabletUid tablet_uid);
213
214
    // remove the txn from txn manager
215
    // delete the related rowset if it is not null
216
    // delete rowset related data if it is not null
217
    Status delete_txn(OlapMeta* meta, TPartitionId partition_id, TTransactionId transaction_id,
218
                      TTabletId tablet_id, TabletUid tablet_uid);
219
220
    void get_tablet_related_txns(TTabletId tablet_id, TabletUid tablet_uid, int64_t* partition_id,
221
                                 std::set<int64_t>* transaction_ids);
222
223
    void get_txn_related_tablets(
224
            const TTransactionId transaction_id, TPartitionId partition_ids,
225
            std::map<TabletInfo, RowsetSharedPtr>* tablet_infos,
226
            std::map<TabletInfo, std::vector<RowsetSharedPtr>>* tablet_attach_rowsets = nullptr);
227
228
    void get_all_related_tablets(std::set<TabletInfo>* tablet_infos);
229
230
    // Get all expired txns and save them in expire_txn_map.
231
    // This is currently called before reporting all tablet info, to avoid iterating txn map for every tablets.
232
    void build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>* expire_txn_map);
233
234
    void force_rollback_tablet_related_txns(OlapMeta* meta, TTabletId tablet_id,
235
                                            TabletUid tablet_uid);
236
237
    void get_partition_ids(const TTransactionId transaction_id,
238
                           std::vector<TPartitionId>* partition_ids);
239
240
    void add_txn_tablet_delta_writer(int64_t transaction_id, int64_t tablet_id,
241
                                     DeltaWriter* delta_writer);
242
    void clear_txn_tablet_delta_writer(int64_t transaction_id);
243
    void finish_slave_tablet_pull_rowset(int64_t transaction_id, int64_t tablet_id, int64_t node_id,
244
                                         bool is_succeed);
245
246
    void set_txn_related_delete_bitmap(TPartitionId partition_id, TTransactionId transaction_id,
247
                                       TTabletId tablet_id, TabletUid tablet_uid,
248
                                       bool unique_key_merge_on_write,
249
                                       DeleteBitmapPtr delete_bitmap,
250
                                       const RowsetIdUnorderedSet& rowset_ids,
251
                                       std::shared_ptr<PartialUpdateInfo> partial_update_info);
252
    void get_all_commit_tablet_txn_info_by_tablet(
253
            const Tablet& tablet, CommitTabletTxnInfoVec* commit_tablet_txn_info_vec);
254
255
    int64_t get_txn_by_tablet_version(int64_t tablet_id, int64_t version);
256
    void update_tablet_version_txn(int64_t tablet_id, int64_t version, int64_t txn_id);
257
258
    TxnState get_txn_state(TPartitionId partition_id, TTransactionId transaction_id,
259
                           TTabletId tablet_id, TabletUid tablet_uid);
260
261
    void remove_txn_tablet_info(TPartitionId partition_id, TTransactionId transaction_id,
262
                                TTabletId tablet_id, TabletUid tablet_uid);
263
264
private:
265
    using TxnKey = std::pair<int64_t, int64_t>; // partition_id, transaction_id;
266
267
    // Implement TxnKey hash function to support TxnKey as a key for `unordered_map`.
268
    struct TxnKeyHash {
269
        template <typename T, typename U>
270
269
        size_t operator()(const std::pair<T, U>& e) const {
271
269
            return std::hash<T>()(e.first) ^ std::hash<U>()(e.second);
272
269
        }
273
    };
274
275
    // Implement TxnKey equal function to support TxnKey as a key for `unordered_map`.
276
    struct TxnKeyEqual {
277
        template <class T, typename U>
278
142
        bool operator()(const std::pair<T, U>& l, const std::pair<T, U>& r) const {
279
142
            return l.first == r.first && l.second == r.second;
280
142
        }
281
    };
282
283
    using txn_tablet_map_t =
284
            std::unordered_map<TxnKey, std::map<TabletInfo, std::shared_ptr<TabletTxnInfo>>,
285
                               TxnKeyHash, TxnKeyEqual>;
286
    using txn_partition_map_t = std::unordered_map<int64_t, std::unordered_set<int64_t>>;
287
    using txn_tablet_delta_writer_map_t =
288
            std::unordered_map<int64_t, std::map<int64_t, DeltaWriter*>>;
289
290
    std::shared_mutex& _get_txn_map_lock(TTransactionId transactionId);
291
292
    txn_tablet_map_t& _get_txn_tablet_map(TTransactionId transactionId);
293
294
    txn_partition_map_t& _get_txn_partition_map(TTransactionId transactionId);
295
296
    inline std::shared_mutex& _get_txn_lock(TTransactionId transactionId);
297
298
    std::shared_mutex& _get_txn_tablet_delta_writer_map_lock(TTransactionId transactionId);
299
300
    txn_tablet_delta_writer_map_t& _get_txn_tablet_delta_writer_map(TTransactionId transactionId);
301
302
    // Insert or remove (transaction_id, partition_id) from _txn_partition_map
303
    // get _txn_map_lock before calling.
304
    void _insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id);
305
    void _clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id);
306
307
    void _remove_txn_tablet_info_unlocked(TPartitionId partition_id, TTransactionId transaction_id,
308
                                          TTabletId tablet_id, TabletUid tablet_uid,
309
                                          std::lock_guard<std::shared_mutex>& txn_lock,
310
                                          std::lock_guard<std::shared_mutex>& wrlock);
311
312
    class TabletVersionCache : public LRUCachePolicy {
313
    public:
314
        TabletVersionCache(size_t capacity)
315
375
                : LRUCachePolicy(CachePolicy::CacheType::TABLET_VERSION_CACHE, capacity,
316
375
                                 LRUCacheType::NUMBER, /*sweeptime*/ -1,
317
375
                                 /*num_shards*/ 32,
318
375
                                 /*element_count_capacity*/ 0, /*enable_prune*/ false,
319
375
                                 /*is_lru_k*/ false) {}
320
    };
321
322
private:
323
    StorageEngine& _engine;
324
325
    const int32_t _txn_map_shard_size;
326
327
    const int32_t _txn_shard_size;
328
329
    // _txn_map_locks[i] protect _txn_tablet_maps[i], i=0,1,2...,and i < _txn_map_shard_size
330
    txn_tablet_map_t* _txn_tablet_maps = nullptr;
331
    // transaction_id -> corresponding partition ids
332
    // This is mainly for the clear txn task received from FE, which may only has transaction id,
333
    // so we need this map to find out which partitions are corresponding to a transaction id.
334
    // The _txn_partition_maps[i] should be constructed/deconstructed/modified alongside with '_txn_tablet_maps[i]'
335
    txn_partition_map_t* _txn_partition_maps = nullptr;
336
337
    std::shared_mutex* _txn_map_locks = nullptr;
338
339
    std::shared_mutex* _txn_mutex = nullptr;
340
341
    txn_tablet_delta_writer_map_t* _txn_tablet_delta_writer_map = nullptr;
342
    std::unique_ptr<TabletVersionCache> _tablet_version_cache;
343
    std::shared_mutex* _txn_tablet_delta_writer_map_locks = nullptr;
344
    DISALLOW_COPY_AND_ASSIGN(TxnManager);
345
}; // TxnManager
346
347
199
inline std::shared_mutex& TxnManager::_get_txn_map_lock(TTransactionId transactionId) {
348
199
    return _txn_map_locks[transactionId & (_txn_map_shard_size - 1)];
349
199
}
350
351
197
inline TxnManager::txn_tablet_map_t& TxnManager::_get_txn_tablet_map(TTransactionId transactionId) {
352
197
    return _txn_tablet_maps[transactionId & (_txn_map_shard_size - 1)];
353
197
}
354
355
inline TxnManager::txn_partition_map_t& TxnManager::_get_txn_partition_map(
356
138
        TTransactionId transactionId) {
357
138
    return _txn_partition_maps[transactionId & (_txn_map_shard_size - 1)];
358
138
}
359
360
76
inline std::shared_mutex& TxnManager::_get_txn_lock(TTransactionId transactionId) {
361
76
    return _txn_mutex[transactionId & (_txn_shard_size - 1)];
362
76
}
363
364
inline std::shared_mutex& TxnManager::_get_txn_tablet_delta_writer_map_lock(
365
0
        TTransactionId transactionId) {
366
0
    return _txn_tablet_delta_writer_map_locks[transactionId & (_txn_map_shard_size - 1)];
367
0
}
368
369
inline TxnManager::txn_tablet_delta_writer_map_t& TxnManager::_get_txn_tablet_delta_writer_map(
370
0
        TTransactionId transactionId) {
371
0
    return _txn_tablet_delta_writer_map[transactionId & (_txn_map_shard_size - 1)];
372
0
}
373
374
} // namespace doris