Coverage Report

Created: 2026-03-30 16:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/txn/txn_manager.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/txn/txn_manager.h"
19
20
#include <bvar/bvar.h>
21
#include <fmt/format.h>
22
#include <fmt/ranges.h>
23
#include <thrift/protocol/TDebugProtocol.h>
24
#include <time.h>
25
26
#include <filesystem>
27
#include <iterator>
28
#include <list>
29
#include <new>
30
#include <ostream>
31
#include <queue>
32
#include <set>
33
#include <string>
34
35
#include "common/config.h"
36
#include "common/logging.h"
37
#include "common/status.h"
38
#include "load/delta_writer/delta_writer.h"
39
#include "storage/data_dir.h"
40
#include "storage/olap_common.h"
41
#include "storage/partial_update_info.h"
42
#include "storage/rowset/beta_rowset.h"
43
#include "storage/rowset/pending_rowset_helper.h"
44
#include "storage/rowset/rowset_meta.h"
45
#include "storage/rowset/rowset_meta_manager.h"
46
#include "storage/schema_change/schema_change.h"
47
#include "storage/segment/segment_loader.h"
48
#include "storage/storage_engine.h"
49
#include "storage/tablet/tablet_manager.h"
50
#include "storage/tablet/tablet_meta.h"
51
#include "storage/tablet/tablet_meta_manager.h"
52
#include "storage/task/engine_publish_version_task.h"
53
#include "util/debug_points.h"
54
#include "util/time.h"
55
56
namespace doris {
57
class OlapMeta;
58
} // namespace doris
59
60
using std::map;
61
using std::pair;
62
using std::set;
63
using std::string;
64
using std::stringstream;
65
using std::vector;
66
67
namespace doris {
68
using namespace ErrorCode;
69
70
bvar::Adder<int64_t> g_tablet_txn_info_txn_partitions_count("tablet_txn_info_txn_partitions_count");
71
72
TxnManager::TxnManager(StorageEngine& engine, int32_t txn_map_shard_size, int32_t txn_shard_size)
73
349
        : _engine(engine),
74
349
          _txn_map_shard_size(txn_map_shard_size),
75
349
          _txn_shard_size(txn_shard_size) {
76
349
    DCHECK_GT(_txn_map_shard_size, 0);
77
349
    DCHECK_GT(_txn_shard_size, 0);
78
349
    DCHECK_EQ(_txn_map_shard_size & (_txn_map_shard_size - 1), 0);
79
349
    DCHECK_EQ(_txn_shard_size & (_txn_shard_size - 1), 0);
80
349
    _txn_map_locks = new std::shared_mutex[_txn_map_shard_size];
81
349
    _txn_tablet_maps = new txn_tablet_map_t[_txn_map_shard_size];
82
349
    _txn_partition_maps = new txn_partition_map_t[_txn_map_shard_size];
83
349
    _txn_mutex = new std::shared_mutex[_txn_shard_size];
84
349
    _txn_tablet_delta_writer_map = new txn_tablet_delta_writer_map_t[_txn_map_shard_size];
85
349
    _txn_tablet_delta_writer_map_locks = new std::shared_mutex[_txn_map_shard_size];
86
    // For debugging
87
349
    _tablet_version_cache = std::make_unique<TabletVersionCache>(100000);
88
349
}
89
90
// prepare txn should always be allowed because ingest task will be retried
91
// could not distinguish rollup, schema change or base table, prepare txn successfully will allow
92
// ingest retried
93
Status TxnManager::prepare_txn(TPartitionId partition_id, const Tablet& tablet,
94
                               TTransactionId transaction_id, const PUniqueId& load_id,
95
0
                               bool ingest) {
96
    // check if the tablet has already been shutdown. If it has, it indicates that
97
    // it is an old tablet, and data should not be imported into the old tablet.
98
    // Otherwise, it may lead to data loss during migration.
99
0
    if (tablet.tablet_state() == TABLET_SHUTDOWN) {
100
0
        return Status::InternalError<false>(
101
0
                "The tablet's state is shutdown, tablet_id: {}. The tablet may have been dropped "
102
0
                "or migrationed. Please check if the table has been dropped or try again.",
103
0
                tablet.tablet_id());
104
0
    }
105
0
    return prepare_txn(partition_id, transaction_id, tablet.tablet_id(), tablet.tablet_uid(),
106
0
                       load_id, ingest);
107
0
}
108
109
// most used for ut
110
Status TxnManager::prepare_txn(TPartitionId partition_id, TTransactionId transaction_id,
111
                               TTabletId tablet_id, TabletUid tablet_uid, const PUniqueId& load_id,
112
15.9k
                               bool ingest) {
113
15.9k
    TxnKey key(partition_id, transaction_id);
114
15.9k
    TabletInfo tablet_info(tablet_id, tablet_uid);
115
15.9k
    std::lock_guard<std::shared_mutex> txn_wrlock(_get_txn_map_lock(transaction_id));
116
15.9k
    txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
117
118
15.9k
    DBUG_EXECUTE_IF("TxnManager.prepare_txn.random_failed", {
119
15.9k
        if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
120
15.9k
            LOG_WARNING("TxnManager.prepare_txn.random_failed random failed")
121
15.9k
                    .tag("txn_id", transaction_id)
122
15.9k
                    .tag("tablet_id", tablet_id);
123
15.9k
            return Status::InternalError("debug prepare txn random failed");
124
15.9k
        }
125
15.9k
    });
126
15.9k
    DBUG_EXECUTE_IF("TxnManager.prepare_txn.wait", {
127
15.9k
        if (auto wait = dp->param<int>("duration", 0); wait > 0) {
128
15.9k
            LOG_WARNING("TxnManager.prepare_txn.wait")
129
15.9k
                    .tag("txn_id", transaction_id)
130
15.9k
                    .tag("tablet_id", tablet_id)
131
15.9k
                    .tag("wait ms", wait);
132
15.9k
            std::this_thread::sleep_for(std::chrono::milliseconds(wait));
133
15.9k
        }
134
15.9k
    });
135
136
    /// Step 1: check if the transaction is already exist
137
15.9k
    do {
138
15.9k
        auto iter = txn_tablet_map.find(key);
139
15.9k
        if (iter == txn_tablet_map.end()) {
140
2.50k
            break;
141
2.50k
        }
142
143
        // exist TxnKey
144
13.4k
        auto& txn_tablet_info_map = iter->second;
145
13.4k
        auto load_itr = txn_tablet_info_map.find(tablet_info);
146
13.4k
        if (load_itr == txn_tablet_info_map.end()) {
147
13.4k
            break;
148
13.4k
        }
149
150
        // found load for txn,tablet
151
2
        auto& load_info = load_itr->second;
152
        // case 1: user commit rowset, then the load id must be equal
153
        // check if load id is equal
154
2
        if (load_info->load_id.hi() == load_id.hi() && load_info->load_id.lo() == load_id.lo() &&
155
2
            load_info->rowset != nullptr) {
156
1
            LOG(WARNING) << "find transaction exists when add to engine."
157
1
                         << "partition_id: " << key.first << ", transaction_id: " << key.second
158
1
                         << ", tablet: " << tablet_info.to_string();
159
1
            return Status::OK();
160
1
        }
161
2
    } while (false);
162
163
    /// Step 2: check if there are too many transactions on running.
164
    // check if there are too many transactions on running.
165
    // if yes, reject the request.
166
15.9k
    txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id);
167
15.9k
    if (txn_partition_map.size() > config::max_runnings_transactions_per_txn_map) {
168
0
        return Status::Error<TOO_MANY_TRANSACTIONS>("too many transactions: {}, limit: {}",
169
0
                                                    txn_tablet_map.size(),
170
0
                                                    config::max_runnings_transactions_per_txn_map);
171
0
    }
172
173
    /// Step 3: Add transaction to engine
174
    // not found load id
175
    // case 1: user start a new txn, rowset = null
176
    // case 2: loading txn from meta env
177
15.9k
    auto load_info = std::make_shared<TabletTxnInfo>(load_id, nullptr, ingest);
178
15.9k
    load_info->prepare();
179
15.9k
    if (!txn_tablet_map.contains(key)) {
180
2.50k
        g_tablet_txn_info_txn_partitions_count << 1;
181
2.50k
    }
182
15.9k
    txn_tablet_map[key][tablet_info] = std::move(load_info);
183
15.9k
    _insert_txn_partition_map_unlocked(transaction_id, partition_id);
184
15.9k
    VLOG_NOTICE << "add transaction to engine successfully."
185
11
                << "partition_id: " << key.first << ", transaction_id: " << key.second
186
11
                << ", tablet: " << tablet_info.to_string();
187
15.9k
    return Status::OK();
188
15.9k
}
189
190
Status TxnManager::commit_txn(TPartitionId partition_id, const Tablet& tablet,
191
                              TTransactionId transaction_id, const PUniqueId& load_id,
192
                              const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard,
193
                              bool is_recovery,
194
15.8k
                              std::shared_ptr<PartialUpdateInfo> partial_update_info) {
195
15.8k
    return commit_txn(tablet.data_dir()->get_meta(), partition_id, transaction_id,
196
15.8k
                      tablet.tablet_id(), tablet.tablet_uid(), load_id, rowset_ptr,
197
15.8k
                      std::move(guard), is_recovery, partial_update_info);
198
15.8k
}
199
200
Status TxnManager::publish_txn(TPartitionId partition_id, const TabletSharedPtr& tablet,
201
                               TTransactionId transaction_id, const Version& version,
202
                               TabletPublishStatistics* stats,
203
                               std::shared_ptr<TabletTxnInfo>& extend_tablet_txn_info,
204
15.5k
                               const int64_t commit_tso) {
205
15.5k
    return publish_txn(tablet->data_dir()->get_meta(), partition_id, transaction_id,
206
15.5k
                       tablet->tablet_id(), tablet->tablet_uid(), version, stats,
207
15.5k
                       extend_tablet_txn_info, commit_tso);
208
15.5k
}
209
210
void TxnManager::abort_txn(TPartitionId partition_id, TTransactionId transaction_id,
211
0
                           TTabletId tablet_id, TabletUid tablet_uid) {
212
0
    pair<int64_t, int64_t> key(partition_id, transaction_id);
213
0
    TabletInfo tablet_info(tablet_id, tablet_uid);
214
215
0
    std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
216
217
0
    auto& txn_tablet_map = _get_txn_tablet_map(transaction_id);
218
0
    auto it = txn_tablet_map.find(key);
219
0
    if (it == txn_tablet_map.end()) {
220
0
        return;
221
0
    }
222
223
0
    auto& tablet_txn_info_map = it->second;
224
0
    auto tablet_txn_info_iter = tablet_txn_info_map.find(tablet_info);
225
0
    if (tablet_txn_info_iter == tablet_txn_info_map.end()) {
226
0
        return;
227
0
    }
228
229
0
    auto& txn_info = tablet_txn_info_iter->second;
230
0
    txn_info->abort();
231
0
}
232
233
// delete the txn from manager if it is not committed(not have a valid rowset)
234
Status TxnManager::rollback_txn(TPartitionId partition_id, const Tablet& tablet,
235
5
                                TTransactionId transaction_id) {
236
5
    return rollback_txn(partition_id, transaction_id, tablet.tablet_id(), tablet.tablet_uid());
237
5
}
238
239
Status TxnManager::delete_txn(TPartitionId partition_id, const TabletSharedPtr& tablet,
240
272
                              TTransactionId transaction_id) {
241
272
    return delete_txn(tablet->data_dir()->get_meta(), partition_id, transaction_id,
242
272
                      tablet->tablet_id(), tablet->tablet_uid());
243
272
}
244
245
void TxnManager::set_txn_related_delete_bitmap(
246
        TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id,
247
        TabletUid tablet_uid, bool unique_key_merge_on_write, DeleteBitmapPtr delete_bitmap,
248
        const RowsetIdUnorderedSet& rowset_ids,
249
6.04k
        std::shared_ptr<PartialUpdateInfo> partial_update_info) {
250
6.04k
    pair<int64_t, int64_t> key(partition_id, transaction_id);
251
6.04k
    TabletInfo tablet_info(tablet_id, tablet_uid);
252
253
6.04k
    std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id));
254
6.04k
    {
255
        // get tx
256
6.04k
        std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id));
257
6.04k
        txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
258
6.04k
        auto it = txn_tablet_map.find(key);
259
6.04k
        if (it == txn_tablet_map.end()) {
260
1
            LOG(WARNING) << "transaction_id: " << transaction_id
261
1
                         << " partition_id: " << partition_id << " may be cleared";
262
1
            return;
263
1
        }
264
6.04k
        auto load_itr = it->second.find(tablet_info);
265
6.04k
        if (load_itr == it->second.end()) {
266
0
            LOG(WARNING) << "transaction_id: " << transaction_id
267
0
                         << " partition_id: " << partition_id << " tablet_id: " << tablet_id
268
0
                         << " may be cleared";
269
0
            return;
270
0
        }
271
6.04k
        auto& load_info = load_itr->second;
272
6.04k
        load_info->unique_key_merge_on_write = unique_key_merge_on_write;
273
6.04k
        load_info->delete_bitmap = delete_bitmap;
274
6.04k
        load_info->rowset_ids = rowset_ids;
275
6.04k
        load_info->partial_update_info = partial_update_info;
276
6.04k
    }
277
6.04k
}
278
279
Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id,
280
                              TTransactionId transaction_id, TTabletId tablet_id,
281
                              TabletUid tablet_uid, const PUniqueId& load_id,
282
                              const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard,
283
                              bool is_recovery,
284
15.9k
                              std::shared_ptr<PartialUpdateInfo> partial_update_info) {
285
15.9k
    if (partition_id < 1 || transaction_id < 1 || tablet_id < 1) {
286
0
        LOG(WARNING) << "invalid commit req "
287
0
                     << " partition_id=" << partition_id << " transaction_id=" << transaction_id
288
0
                     << " tablet_id=" << tablet_id;
289
0
        return Status::InternalError("invalid partition id");
290
0
    }
291
292
15.9k
    pair<int64_t, int64_t> key(partition_id, transaction_id);
293
15.9k
    TabletInfo tablet_info(tablet_id, tablet_uid);
294
15.9k
    if (rowset_ptr == nullptr) {
295
0
        return Status::Error<ROWSET_INVALID>(
296
0
                "could not commit txn because rowset ptr is null. partition_id: {}, "
297
0
                "transaction_id: {}, tablet: {}",
298
0
                key.first, key.second, tablet_info.to_string());
299
0
    }
300
301
15.9k
    DBUG_EXECUTE_IF("TxnManager.commit_txn.random_failed", {
302
15.9k
        if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
303
15.9k
            LOG_WARNING("TxnManager.commit_txn.random_failed")
304
15.9k
                    .tag("txn_id", transaction_id)
305
15.9k
                    .tag("tablet_id", tablet_id);
306
15.9k
            return Status::InternalError("debug commit txn random failed");
307
15.9k
        }
308
15.9k
    });
309
15.9k
    DBUG_EXECUTE_IF("TxnManager.commit_txn.wait", {
310
15.9k
        if (auto wait = dp->param<int>("duration", 0); wait > 0) {
311
15.9k
            LOG_WARNING("TxnManager.commit_txn.wait")
312
15.9k
                    .tag("txn_id", transaction_id)
313
15.9k
                    .tag("tablet_id", tablet_id)
314
15.9k
                    .tag("wait ms", wait);
315
15.9k
            std::this_thread::sleep_for(std::chrono::milliseconds(wait));
316
15.9k
        }
317
15.9k
    });
318
319
15.9k
    std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id));
320
    // this while loop just run only once, just for if break
321
15.9k
    do {
322
        // get tx
323
15.9k
        std::shared_lock rdlock(_get_txn_map_lock(transaction_id));
324
15.9k
        auto rs_pb = rowset_ptr->rowset_meta()->get_rowset_pb();
325
        // TODO(dx): remove log after fix partition id eq 0 bug
326
15.9k
        if (!rs_pb.has_partition_id() || rs_pb.partition_id() == 0) {
327
1
            rowset_ptr->rowset_meta()->set_partition_id(partition_id);
328
1
            LOG(WARNING) << "cant get partition id from rs pb, get from func arg partition_id="
329
1
                         << partition_id;
330
1
        }
331
15.9k
        txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
332
15.9k
        auto it = txn_tablet_map.find(key);
333
15.9k
        if (it == txn_tablet_map.end()) {
334
20
            break;
335
20
        }
336
337
15.9k
        auto load_itr = it->second.find(tablet_info);
338
15.9k
        if (load_itr == it->second.end()) {
339
58
            break;
340
58
        }
341
342
        // found load for txn,tablet
343
        // case 1: user commit rowset, then the load id must be equal
344
15.9k
        auto& load_info = load_itr->second;
345
        // check if load id is equal
346
15.9k
        if (load_info->rowset == nullptr) {
347
15.8k
            break;
348
15.8k
        }
349
350
2
        if (load_info->load_id.hi() != load_id.hi() || load_info->load_id.lo() != load_id.lo()) {
351
0
            break;
352
0
        }
353
354
        // find a rowset with same rowset id, then it means a duplicate call
355
2
        if (load_info->rowset->rowset_id() == rowset_ptr->rowset_id()) {
356
1
            LOG(INFO) << "find rowset exists when commit transaction to engine."
357
1
                      << "partition_id: " << key.first << ", transaction_id: " << key.second
358
1
                      << ", tablet: " << tablet_info.to_string()
359
1
                      << ", rowset_id: " << load_info->rowset->rowset_id();
360
            // Should not remove this rowset from pending rowsets
361
1
            load_info->pending_rs_guard = std::move(guard);
362
1
            return Status::OK();
363
1
        }
364
365
        // find a rowset with different rowset id, then it should not happen, just return errors
366
1
        return Status::Error<PUSH_TRANSACTION_ALREADY_EXIST>(
367
1
                "find rowset exists when commit transaction to engine. but rowset ids are not "
368
1
                "same. partition_id: {}, transaction_id: {}, tablet: {}, exist rowset_id: {}, new "
369
1
                "rowset_id: {}",
370
1
                key.first, key.second, tablet_info.to_string(),
371
1
                load_info->rowset->rowset_id().to_string(), rowset_ptr->rowset_id().to_string());
372
2
    } while (false);
373
374
    // if not in recovery mode, then should persist the meta to meta env
375
    // save meta need access disk, it maybe very slow, so that it is not in global txn lock
376
    // it is under a single txn lock
377
15.9k
    if (!is_recovery) {
378
15.9k
        Status save_status =
379
15.9k
                RowsetMetaManager::save(meta, tablet_uid, rowset_ptr->rowset_id(),
380
15.9k
                                        rowset_ptr->rowset_meta()->get_rowset_pb(), false);
381
15.9k
        DBUG_EXECUTE_IF("TxnManager.RowsetMetaManager.save_wait", {
382
15.9k
            if (auto wait = dp->param<int>("duration", 0); wait > 0) {
383
15.9k
                LOG_WARNING("TxnManager.RowsetMetaManager.save_wait")
384
15.9k
                        .tag("txn_id", transaction_id)
385
15.9k
                        .tag("tablet_id", tablet_id)
386
15.9k
                        .tag("wait ms", wait);
387
15.9k
                std::this_thread::sleep_for(std::chrono::milliseconds(wait));
388
15.9k
            }
389
15.9k
        });
390
15.9k
        if (!save_status.ok()) {
391
0
            save_status.append(fmt::format(", txn id: {}", transaction_id));
392
0
            return save_status;
393
0
        }
394
395
15.9k
        if (partial_update_info && partial_update_info->is_partial_update()) {
396
200
            PartialUpdateInfoPB partial_update_info_pb;
397
200
            partial_update_info->to_pb(&partial_update_info_pb);
398
200
            save_status = RowsetMetaManager::save_partial_update_info(
399
200
                    meta, tablet_id, partition_id, transaction_id, partial_update_info_pb);
400
200
            if (!save_status.ok()) {
401
0
                save_status.append(fmt::format(", txn_id: {}", transaction_id));
402
0
                return save_status;
403
0
            }
404
200
        }
405
15.9k
    }
406
407
15.9k
    TabletSharedPtr tablet;
408
15.9k
    std::shared_ptr<PartialUpdateInfo> decoded_partial_update_info {nullptr};
409
15.9k
    if (is_recovery) {
410
68
        tablet = _engine.tablet_manager()->get_tablet(tablet_id, tablet_uid);
411
68
        if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) {
412
0
            PartialUpdateInfoPB partial_update_info_pb;
413
0
            auto st = RowsetMetaManager::try_get_partial_update_info(
414
0
                    meta, tablet_id, partition_id, transaction_id, &partial_update_info_pb);
415
0
            if (st.ok()) {
416
0
                decoded_partial_update_info = std::make_shared<PartialUpdateInfo>();
417
0
                decoded_partial_update_info->from_pb(&partial_update_info_pb);
418
0
                DCHECK(decoded_partial_update_info->is_partial_update());
419
0
            } else if (!st.is<META_KEY_NOT_FOUND>()) {
420
                // the load is not a partial update
421
0
                return st;
422
0
            }
423
0
        }
424
68
    }
425
426
15.9k
    {
427
15.9k
        std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id));
428
15.9k
        auto load_info = std::make_shared<TabletTxnInfo>(load_id, rowset_ptr);
429
15.9k
        load_info->pending_rs_guard = std::move(guard);
430
15.9k
        if (is_recovery) {
431
68
            if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) {
432
0
                load_info->unique_key_merge_on_write = true;
433
0
                load_info->delete_bitmap.reset(new DeleteBitmap(tablet->tablet_id()));
434
0
                if (decoded_partial_update_info) {
435
0
                    LOG_INFO(
436
0
                            "get partial update info from RocksDB during recovery. txn_id={}, "
437
0
                            "partition_id={}, tablet_id={}, partial_update_info=[{}]",
438
0
                            transaction_id, partition_id, tablet_id,
439
0
                            decoded_partial_update_info->summary());
440
0
                    load_info->partial_update_info = decoded_partial_update_info;
441
0
                }
442
0
            }
443
68
        }
444
15.9k
        load_info->commit();
445
446
15.9k
        txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
447
15.9k
        txn_tablet_map[key][tablet_info] = std::move(load_info);
448
15.9k
        _insert_txn_partition_map_unlocked(transaction_id, partition_id);
449
15.9k
        VLOG_NOTICE << "commit transaction to engine successfully."
450
15
                    << " partition_id: " << key.first << ", transaction_id: " << key.second
451
15
                    << ", tablet: " << tablet_info.to_string()
452
15
                    << ", rowsetid: " << rowset_ptr->rowset_id()
453
15
                    << ", version: " << rowset_ptr->version().first;
454
15.9k
    }
455
15.9k
    return Status::OK();
456
15.9k
}
457
458
// remove a txn from txn manager
459
Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,
460
                               TTransactionId transaction_id, TTabletId tablet_id,
461
                               TabletUid tablet_uid, const Version& version,
462
                               TabletPublishStatistics* stats,
463
                               std::shared_ptr<TabletTxnInfo>& extend_tablet_txn_info,
464
15.5k
                               const int64_t commit_tso) {
465
15.5k
    auto tablet = _engine.tablet_manager()->get_tablet(tablet_id);
466
15.5k
    if (tablet == nullptr) {
467
0
        return Status::OK();
468
0
    }
469
15.5k
    DCHECK(stats != nullptr);
470
471
15.5k
    pair<int64_t, int64_t> key(partition_id, transaction_id);
472
15.5k
    TabletInfo tablet_info(tablet_id, tablet_uid);
473
15.5k
    RowsetSharedPtr rowset;
474
15.5k
    std::shared_ptr<TabletTxnInfo> tablet_txn_info;
475
15.5k
    int64_t t1 = MonotonicMicros();
476
    /// Step 1: get rowset, tablet_txn_info by key
477
15.5k
    {
478
15.5k
        std::shared_lock txn_rlock(_get_txn_lock(transaction_id));
479
15.5k
        std::shared_lock txn_map_rlock(_get_txn_map_lock(transaction_id));
480
15.5k
        stats->lock_wait_time_us += MonotonicMicros() - t1;
481
482
15.5k
        txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
483
15.5k
        if (auto it = txn_tablet_map.find(key); it != txn_tablet_map.end()) {
484
15.5k
            auto& tablet_map = it->second;
485
15.5k
            if (auto txn_info_iter = tablet_map.find(tablet_info);
486
15.5k
                txn_info_iter != tablet_map.end()) {
487
                // found load for txn,tablet
488
                // case 1: user commit rowset, then the load id must be equal
489
15.5k
                tablet_txn_info = txn_info_iter->second;
490
15.5k
                extend_tablet_txn_info = tablet_txn_info;
491
15.5k
                rowset = tablet_txn_info->rowset;
492
15.5k
            }
493
15.5k
        }
494
15.5k
    }
495
15.5k
    if (rowset == nullptr) {
496
1
        return Status::Error<TRANSACTION_NOT_EXIST>(
497
1
                "publish txn failed, rowset not found. partition_id={}, transaction_id={}, "
498
1
                "tablet={}, commit_tso={}",
499
1
                partition_id, transaction_id, tablet_info.to_string(), commit_tso);
500
1
    }
501
15.5k
    DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_before_save_rs_meta", {
502
15.5k
        if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
503
15.5k
            LOG_WARNING("TxnManager.publish_txn.random_failed_before_save_rs_meta")
504
15.5k
                    .tag("txn_id", transaction_id)
505
15.5k
                    .tag("tablet_id", tablet_id);
506
15.5k
            return Status::InternalError("debug publish txn before save rs meta random failed");
507
15.5k
        }
508
15.5k
    });
509
15.5k
    DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_before_save_rs_meta", {
510
15.5k
        if (auto wait = dp->param<int>("duration", 0); wait > 0) {
511
15.5k
            LOG_WARNING("TxnManager.publish_txn.wait_before_save_rs_meta")
512
15.5k
                    .tag("txn_id", transaction_id)
513
15.5k
                    .tag("tablet_id", tablet_id)
514
15.5k
                    .tag("wait ms", wait);
515
15.5k
            std::this_thread::sleep_for(std::chrono::milliseconds(wait));
516
15.5k
        }
517
15.5k
    });
518
519
    /// Step 2: make rowset visible
520
    // save meta need access disk, it maybe very slow, so that it is not in global txn lock
521
    // it is under a single txn lock
522
    // TODO(ygl): rowset is already set version here, memory is changed, if save failed
523
    // it maybe a fatal error
524
15.5k
    rowset->make_visible(version, commit_tso);
525
526
15.5k
    DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_after_save_rs_meta", {
527
15.5k
        if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
528
15.5k
            LOG_WARNING("TxnManager.publish_txn.random_failed_after_save_rs_meta")
529
15.5k
                    .tag("txn_id", transaction_id)
530
15.5k
                    .tag("tablet_id", tablet_id);
531
15.5k
            return Status::InternalError("debug publish txn after save rs meta random failed");
532
15.5k
        }
533
15.5k
    });
534
15.5k
    DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_after_save_rs_meta", {
535
15.5k
        if (auto wait = dp->param<int>("duration", 0); wait > 0) {
536
15.5k
            LOG_WARNING("TxnManager.publish_txn.wait_after_save_rs_meta")
537
15.5k
                    .tag("txn_id", transaction_id)
538
15.5k
                    .tag("tablet_id", tablet_id)
539
15.5k
                    .tag("wait ms", wait);
540
15.5k
            std::this_thread::sleep_for(std::chrono::milliseconds(wait));
541
15.5k
        }
542
15.5k
    });
543
    // update delete_bitmap
544
15.5k
    if (tablet_txn_info->unique_key_merge_on_write) {
545
5.95k
        int64_t t2 = MonotonicMicros();
546
5.95k
        if (rowset->num_segments() > 1 &&
547
5.95k
            !tablet_txn_info->delete_bitmap->has_calculated_for_multi_segments(
548
0
                    rowset->rowset_id())) {
549
            // delete bitmap is empty, should re-calculate delete bitmaps between segments
550
0
            std::vector<segment_v2::SegmentSharedPtr> segments;
551
0
            RETURN_IF_ERROR(std::static_pointer_cast<BetaRowset>(rowset)->load_segments(&segments));
552
0
            RETURN_IF_ERROR(tablet->calc_delete_bitmap_between_segments(
553
0
                    rowset->tablet_schema(), rowset->rowset_id(), segments,
554
0
                    tablet_txn_info->delete_bitmap));
555
0
        }
556
557
5.95k
        RETURN_IF_ERROR(
558
5.95k
                Tablet::update_delete_bitmap(tablet, tablet_txn_info.get(), transaction_id));
559
5.95k
        int64_t t3 = MonotonicMicros();
560
5.95k
        stats->calc_delete_bitmap_time_us = t3 - t2;
561
5.95k
        RETURN_IF_ERROR(TabletMetaManager::save_delete_bitmap(
562
5.95k
                tablet->data_dir(), tablet->tablet_id(), tablet_txn_info->delete_bitmap,
563
5.95k
                version.second));
564
5.95k
        stats->save_meta_time_us = MonotonicMicros() - t3;
565
5.95k
    }
566
567
    /// Step 3:  add to binlog
568
15.5k
    auto enable_binlog = tablet->is_enable_binlog();
569
15.5k
    if (enable_binlog) {
570
0
        auto status = rowset->add_to_binlog();
571
0
        if (!status.ok()) {
572
0
            return Status::Error<ROWSET_ADD_TO_BINLOG_FAILED>(
573
0
                    "add rowset to binlog failed. when publish txn rowset_id: {}, tablet id: {}, "
574
0
                    "txn id: {}, status: {}",
575
0
                    rowset->rowset_id().to_string(), tablet_id, transaction_id,
576
0
                    status.to_string_no_stack());
577
0
        }
578
0
    }
579
580
    /// Step 4: save meta
581
15.5k
    int64_t t5 = MonotonicMicros();
582
15.5k
    auto status = RowsetMetaManager::save(meta, tablet_uid, rowset->rowset_id(),
583
15.5k
                                          rowset->rowset_meta()->get_rowset_pb(), enable_binlog);
584
15.5k
    stats->save_meta_time_us += MonotonicMicros() - t5;
585
15.5k
    if (!status.ok()) {
586
0
        status.append(fmt::format(", txn id: {}", transaction_id));
587
0
        return status;
588
0
    }
589
590
15.5k
    if (tablet_txn_info->unique_key_merge_on_write && tablet_txn_info->partial_update_info &&
591
15.5k
        tablet_txn_info->partial_update_info->is_partial_update()) {
592
200
        status = RowsetMetaManager::remove_partial_update_info(meta, tablet_id, partition_id,
593
200
                                                               transaction_id);
594
200
        if (!status) {
595
            // discard the error status and print the warning log
596
0
            LOG_WARNING(
597
0
                    "fail to remove partial update info from RocksDB. txn_id={}, rowset_id={}, "
598
0
                    "tablet_id={}, tablet_uid={}",
599
0
                    transaction_id, rowset->rowset_id().to_string(), tablet_id,
600
0
                    tablet_uid.to_string());
601
0
        }
602
200
    }
603
604
    // TODO(Drogon): remove these test codes
605
15.5k
    if (enable_binlog) {
606
0
        auto version_str = fmt::format("{}", version.first);
607
0
        VLOG_DEBUG << fmt::format("tabletid: {}, version: {}, binlog filepath: {}", tablet_id,
608
0
                                  version_str, tablet->get_binlog_filepath(version_str));
609
0
    }
610
611
    /// Step 5: remove tablet_info from tnx_tablet_map
612
    // txn_tablet_map[key] empty, remove key from txn_tablet_map
613
15.5k
    int64_t t6 = MonotonicMicros();
614
15.5k
    std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id));
615
15.5k
    std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id));
616
15.5k
    stats->lock_wait_time_us += MonotonicMicros() - t6;
617
15.5k
    _remove_txn_tablet_info_unlocked(partition_id, transaction_id, tablet_id, tablet_uid, txn_lock,
618
15.5k
                                     wrlock);
619
15.5k
    VLOG_NOTICE << "publish txn successfully."
620
5
                << " partition_id: " << key.first << ", txn_id: " << key.second
621
5
                << ", tablet_id: " << tablet_info.tablet_id << ", rowsetid: " << rowset->rowset_id()
622
5
                << ", version: " << version.first << "," << version.second;
623
15.5k
    return status;
624
15.5k
}
625
626
void TxnManager::_remove_txn_tablet_info_unlocked(TPartitionId partition_id,
627
                                                  TTransactionId transaction_id,
628
                                                  TTabletId tablet_id, TabletUid tablet_uid,
629
                                                  std::lock_guard<std::shared_mutex>& txn_lock,
630
15.5k
                                                  std::lock_guard<std::shared_mutex>& wrlock) {
631
15.5k
    std::pair<int64_t, int64_t> key {partition_id, transaction_id};
632
15.5k
    TabletInfo tablet_info {tablet_id, tablet_uid};
633
15.5k
    txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
634
15.5k
    if (auto it = txn_tablet_map.find(key); it != txn_tablet_map.end()) {
635
15.5k
        it->second.erase(tablet_info);
636
15.5k
        if (it->second.empty()) {
637
2.46k
            txn_tablet_map.erase(it);
638
2.46k
            g_tablet_txn_info_txn_partitions_count << -1;
639
2.46k
            _clear_txn_partition_map_unlocked(transaction_id, partition_id);
640
2.46k
        }
641
15.5k
    }
642
15.5k
}
643
644
void TxnManager::remove_txn_tablet_info(TPartitionId partition_id, TTransactionId transaction_id,
645
0
                                        TTabletId tablet_id, TabletUid tablet_uid) {
646
0
    std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id));
647
0
    std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id));
648
0
    _remove_txn_tablet_info_unlocked(partition_id, transaction_id, tablet_id, tablet_uid, txn_lock,
649
0
                                     wrlock);
650
0
}
651
652
// txn could be rollbacked if it does not have related rowset
653
// if the txn has related rowset then could not rollback it, because it
654
// may be committed in another thread and our current thread meets errors when writing to data file
655
// BE has to wait for fe call clear txn api
656
Status TxnManager::rollback_txn(TPartitionId partition_id, TTransactionId transaction_id,
657
7
                                TTabletId tablet_id, TabletUid tablet_uid) {
658
7
    pair<int64_t, int64_t> key(partition_id, transaction_id);
659
7
    TabletInfo tablet_info(tablet_id, tablet_uid);
660
661
7
    std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id));
662
7
    txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
663
664
7
    auto it = txn_tablet_map.find(key);
665
7
    if (it == txn_tablet_map.end()) {
666
0
        return Status::OK();
667
0
    }
668
669
7
    auto& tablet_txn_info_map = it->second;
670
7
    if (auto load_itr = tablet_txn_info_map.find(tablet_info);
671
7
        load_itr != tablet_txn_info_map.end()) {
672
        // found load for txn,tablet
673
        // case 1: user commit rowset, then the load id must be equal
674
7
        const auto& load_info = load_itr->second;
675
7
        if (load_info->rowset != nullptr) {
676
1
            return Status::Error<TRANSACTION_ALREADY_COMMITTED>(
677
1
                    "if rowset is not null, it means other thread may commit the rowset should "
678
1
                    "not delete txn any more");
679
1
        }
680
7
    }
681
682
6
    tablet_txn_info_map.erase(tablet_info);
683
6
    LOG(INFO) << "rollback transaction from engine successfully."
684
6
              << " partition_id: " << key.first << ", transaction_id: " << key.second
685
6
              << ", tablet: " << tablet_info.to_string();
686
6
    if (tablet_txn_info_map.empty()) {
687
6
        txn_tablet_map.erase(it);
688
6
        g_tablet_txn_info_txn_partitions_count << -1;
689
6
        _clear_txn_partition_map_unlocked(transaction_id, partition_id);
690
6
    }
691
6
    return Status::OK();
692
7
}
693
694
// fe call this api to clear unused rowsets in be
695
// could not delete the rowset if it already has a valid version
696
Status TxnManager::delete_txn(OlapMeta* meta, TPartitionId partition_id,
697
                              TTransactionId transaction_id, TTabletId tablet_id,
698
276
                              TabletUid tablet_uid) {
699
276
    pair<int64_t, int64_t> key(partition_id, transaction_id);
700
276
    TabletInfo tablet_info(tablet_id, tablet_uid);
701
276
    std::lock_guard<std::shared_mutex> txn_wrlock(_get_txn_map_lock(transaction_id));
702
276
    txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
703
276
    auto it = txn_tablet_map.find(key);
704
276
    if (it == txn_tablet_map.end()) {
705
0
        return Status::Error<TRANSACTION_NOT_EXIST>("key not founded from txn_tablet_map");
706
0
    }
707
276
    Status st = Status::OK();
708
276
    auto load_itr = it->second.find(tablet_info);
709
276
    if (load_itr != it->second.end()) {
710
        // found load for txn,tablet
711
        // case 1: user commit rowset, then the load id must be equal
712
276
        auto& load_info = load_itr->second;
713
276
        auto& rowset = load_info->rowset;
714
276
        if (rowset != nullptr && meta != nullptr) {
715
275
            if (!rowset->is_pending()) {
716
1
                st = Status::Error<TRANSACTION_ALREADY_COMMITTED>(
717
1
                        "could not delete transaction from engine, just remove it from memory not "
718
1
                        "delete from disk, because related rowset already published. partition_id: "
719
1
                        "{}, transaction_id: {}, tablet: {}, rowset id: {}, version: {}, state: {}",
720
1
                        key.first, key.second, tablet_info.to_string(),
721
1
                        rowset->rowset_id().to_string(), rowset->version().to_string(),
722
1
                        RowsetStatePB_Name(rowset->rowset_meta_state()));
723
274
            } else {
724
274
                static_cast<void>(RowsetMetaManager::remove(meta, tablet_uid, rowset->rowset_id()));
725
274
#ifndef BE_TEST
726
274
                _engine.add_unused_rowset(rowset);
727
274
#endif
728
274
                VLOG_NOTICE << "delete transaction from engine successfully."
729
2
                            << " partition_id: " << key.first << ", transaction_id: " << key.second
730
2
                            << ", tablet: " << tablet_info.to_string() << ", rowset: "
731
2
                            << (rowset != nullptr ? rowset->rowset_id().to_string() : "0");
732
274
            }
733
275
        }
734
276
        it->second.erase(load_itr);
735
276
    }
736
276
    if (it->second.empty()) {
737
18
        txn_tablet_map.erase(it);
738
18
        g_tablet_txn_info_txn_partitions_count << -1;
739
18
        _clear_txn_partition_map_unlocked(transaction_id, partition_id);
740
18
    }
741
276
    return st;
742
276
}
743
744
void TxnManager::get_tablet_related_txns(TTabletId tablet_id, TabletUid tablet_uid,
745
                                         int64_t* partition_id,
746
6
                                         std::set<int64_t>* transaction_ids) {
747
6
    if (partition_id == nullptr || transaction_ids == nullptr) {
748
0
        LOG(WARNING) << "parameter is null when get transactions by tablet";
749
0
        return;
750
0
    }
751
752
6
    TabletInfo tablet_info(tablet_id, tablet_uid);
753
12
    for (int32_t i = 0; i < _txn_map_shard_size; i++) {
754
6
        std::shared_lock txn_rdlock(_txn_map_locks[i]);
755
6
        txn_tablet_map_t& txn_tablet_map = _txn_tablet_maps[i];
756
6
        for (auto& it : txn_tablet_map) {
757
1
            if (it.second.find(tablet_info) != it.second.end()) {
758
1
                *partition_id = it.first.first;
759
1
                transaction_ids->insert(it.first.second);
760
1
                VLOG_NOTICE << "find transaction on tablet."
761
1
                            << "partition_id: " << it.first.first
762
1
                            << ", transaction_id: " << it.first.second
763
1
                            << ", tablet: " << tablet_info.to_string();
764
1
            }
765
1
        }
766
6
    }
767
6
}
768
769
// force drop all txns related with the tablet
770
// maybe lock error, because not get txn lock before remove from meta
771
void TxnManager::force_rollback_tablet_related_txns(OlapMeta* meta, TTabletId tablet_id,
772
3.28k
                                                    TabletUid tablet_uid) {
773
3.28k
    TabletInfo tablet_info(tablet_id, tablet_uid);
774
3.27M
    for (int32_t i = 0; i < _txn_map_shard_size; i++) {
775
3.27M
        std::lock_guard<std::shared_mutex> txn_wrlock(_txn_map_locks[i]);
776
3.27M
        txn_tablet_map_t& txn_tablet_map = _txn_tablet_maps[i];
777
3.28M
        for (auto it = txn_tablet_map.begin(); it != txn_tablet_map.end();) {
778
5.58k
            auto load_itr = it->second.find(tablet_info);
779
5.58k
            if (load_itr != it->second.end()) {
780
22
                auto& load_info = load_itr->second;
781
22
                auto& rowset = load_info->rowset;
782
22
                if (rowset != nullptr && meta != nullptr) {
783
22
                    LOG(INFO) << " delete transaction from engine "
784
22
                              << ", tablet: " << tablet_info.to_string()
785
22
                              << ", rowset id: " << rowset->rowset_id();
786
22
                    static_cast<void>(
787
22
                            RowsetMetaManager::remove(meta, tablet_uid, rowset->rowset_id()));
788
22
                }
789
22
                LOG(INFO) << "remove tablet related txn."
790
22
                          << " partition_id: " << it->first.first
791
22
                          << ", transaction_id: " << it->first.second
792
22
                          << ", tablet: " << tablet_info.to_string() << ", rowset: "
793
22
                          << (rowset != nullptr ? rowset->rowset_id().to_string() : "0");
794
22
                it->second.erase(load_itr);
795
22
            }
796
5.58k
            if (it->second.empty()) {
797
4
                _clear_txn_partition_map_unlocked(it->first.second, it->first.first);
798
4
                it = txn_tablet_map.erase(it);
799
4
                g_tablet_txn_info_txn_partitions_count << -1;
800
5.57k
            } else {
801
5.57k
                ++it;
802
5.57k
            }
803
5.58k
        }
804
3.27M
    }
805
3.28k
    if (meta != nullptr) {
806
3.28k
        Status st = RowsetMetaManager::remove_tablet_related_partial_update_info(meta, tablet_id);
807
3.28k
        if (!st.ok()) {
808
0
            LOG_WARNING("failed to partial update info, tablet_id={}, err={}", tablet_id,
809
0
                        st.to_string());
810
0
        }
811
3.28k
    }
812
3.28k
}
813
814
void TxnManager::get_txn_related_tablets(const TTransactionId transaction_id,
815
                                         TPartitionId partition_id,
816
2.49k
                                         std::map<TabletInfo, RowsetSharedPtr>* tablet_infos) {
817
    // get tablets in this transaction
818
2.49k
    pair<int64_t, int64_t> key(partition_id, transaction_id);
819
2.49k
    std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
820
2.49k
    txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
821
2.49k
    auto it = txn_tablet_map.find(key);
822
2.49k
    if (it == txn_tablet_map.end()) {
823
2
        VLOG_NOTICE << "could not find tablet for"
824
1
                    << " partition_id=" << partition_id << ", transaction_id=" << transaction_id;
825
2
        return;
826
2
    }
827
2.49k
    auto& load_info_map = it->second;
828
829
    // each tablet
830
15.8k
    for (auto& load_info : load_info_map) {
831
15.8k
        const TabletInfo& tablet_info = load_info.first;
832
        // must not check rowset == null here, because if rowset == null
833
        // publish version should failed
834
15.8k
        tablet_infos->emplace(tablet_info, load_info.second->rowset);
835
15.8k
    }
836
2.49k
}
837
838
52
void TxnManager::get_all_related_tablets(std::set<TabletInfo>* tablet_infos) {
839
53.3k
    for (int32_t i = 0; i < _txn_map_shard_size; i++) {
840
53.2k
        std::shared_lock txn_rdlock(_txn_map_locks[i]);
841
53.2k
        for (auto& it : _txn_tablet_maps[i]) {
842
912
            for (auto& tablet_load_it : it.second) {
843
912
                tablet_infos->emplace(tablet_load_it.first);
844
912
            }
845
58
        }
846
53.2k
    }
847
52
}
848
849
void TxnManager::get_all_commit_tablet_txn_info_by_tablet(
850
690
        const Tablet& tablet, CommitTabletTxnInfoVec* commit_tablet_txn_info_vec) {
851
700k
    for (int32_t i = 0; i < _txn_map_shard_size; i++) {
852
699k
        std::shared_lock txn_rdlock(_txn_map_locks[i]);
853
699k
        for (const auto& [txn_key, load_info_map] : _txn_tablet_maps[i]) {
854
1.01k
            auto tablet_load_it = load_info_map.find(tablet.get_tablet_info());
855
1.01k
            if (tablet_load_it != load_info_map.end()) {
856
18
                const auto& [_, load_info] = *tablet_load_it;
857
18
                const auto& rowset = load_info->rowset;
858
18
                const auto& delete_bitmap = load_info->delete_bitmap;
859
18
                if (!rowset || !delete_bitmap) {
860
11
                    continue;
861
11
                }
862
7
                commit_tablet_txn_info_vec->push_back({
863
7
                        .transaction_id = txn_key.second,
864
7
                        .partition_id = txn_key.first,
865
7
                        .delete_bitmap = delete_bitmap,
866
7
                        .rowset_ids = load_info->rowset_ids,
867
7
                        .partial_update_info = load_info->partial_update_info,
868
7
                });
869
7
            }
870
1.01k
        }
871
699k
    }
872
690
}
873
874
143
void TxnManager::build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>* expire_txn_map) {
875
143
    int64_t now = UnixSeconds();
876
    // traverse the txn map, and get all expired txns
877
146k
    for (int32_t i = 0; i < _txn_map_shard_size; i++) {
878
146k
        std::shared_lock txn_rdlock(_txn_map_locks[i]);
879
146k
        for (auto&& [txn_key, tablet_txn_infos] : _txn_tablet_maps[i]) {
880
153
            auto txn_id = txn_key.second;
881
2.47k
            for (auto&& [tablet_info, txn_info] : tablet_txn_infos) {
882
2.47k
                double diff = difftime(now, txn_info->creation_time);
883
2.47k
                if (diff < config::pending_data_expire_time_sec) {
884
2.45k
                    continue;
885
2.45k
                }
886
887
20
                (*expire_txn_map)[tablet_info].push_back(txn_id);
888
20
                if (VLOG_IS_ON(3)) {
889
0
                    VLOG_NOTICE << "find expired txn."
890
0
                                << " tablet=" << tablet_info.to_string()
891
0
                                << " transaction_id=" << txn_id << " exist_sec=" << diff;
892
0
                }
893
20
            }
894
153
        }
895
146k
    }
896
143
}
897
898
void TxnManager::get_partition_ids(const TTransactionId transaction_id,
899
14
                                   std::vector<TPartitionId>* partition_ids) {
900
14
    std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
901
14
    txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id);
902
14
    auto it = txn_partition_map.find(transaction_id);
903
14
    if (it != txn_partition_map.end()) {
904
13
        for (int64_t partition_id : it->second) {
905
13
            partition_ids->push_back(partition_id);
906
13
        }
907
13
    }
908
14
}
909
910
31.8k
void TxnManager::_insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id) {
911
31.8k
    txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id);
912
31.8k
    auto find = txn_partition_map.find(transaction_id);
913
31.8k
    if (find == txn_partition_map.end()) {
914
2.49k
        txn_partition_map[transaction_id] = std::unordered_set<int64_t>();
915
2.49k
    }
916
31.8k
    txn_partition_map[transaction_id].insert(partition_id);
917
31.8k
}
918
919
2.49k
void TxnManager::_clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id) {
920
2.49k
    txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id);
921
2.49k
    auto it = txn_partition_map.find(transaction_id);
922
2.49k
    if (it != txn_partition_map.end()) {
923
2.49k
        it->second.erase(partition_id);
924
2.49k
        if (it->second.empty()) {
925
2.46k
            txn_partition_map.erase(it);
926
2.46k
        }
927
2.49k
    }
928
2.49k
}
929
930
void TxnManager::add_txn_tablet_delta_writer(int64_t transaction_id, int64_t tablet_id,
931
0
                                             DeltaWriter* delta_writer) {
932
0
    std::lock_guard<std::shared_mutex> txn_wrlock(
933
0
            _get_txn_tablet_delta_writer_map_lock(transaction_id));
934
0
    txn_tablet_delta_writer_map_t& txn_tablet_delta_writer_map =
935
0
            _get_txn_tablet_delta_writer_map(transaction_id);
936
0
    auto find = txn_tablet_delta_writer_map.find(transaction_id);
937
0
    if (find == txn_tablet_delta_writer_map.end()) {
938
0
        txn_tablet_delta_writer_map[transaction_id] = std::map<int64_t, DeltaWriter*>();
939
0
    }
940
0
    txn_tablet_delta_writer_map[transaction_id][tablet_id] = delta_writer;
941
0
}
942
943
void TxnManager::finish_slave_tablet_pull_rowset(int64_t transaction_id, int64_t tablet_id,
944
0
                                                 int64_t node_id, bool is_succeed) {
945
0
    std::lock_guard<std::shared_mutex> txn_wrlock(
946
0
            _get_txn_tablet_delta_writer_map_lock(transaction_id));
947
0
    txn_tablet_delta_writer_map_t& txn_tablet_delta_writer_map =
948
0
            _get_txn_tablet_delta_writer_map(transaction_id);
949
0
    auto find_txn = txn_tablet_delta_writer_map.find(transaction_id);
950
0
    if (find_txn == txn_tablet_delta_writer_map.end()) {
951
0
        LOG(WARNING) << "delta writer manager is not exist, txn_id=" << transaction_id
952
0
                     << ", tablet_id=" << tablet_id;
953
0
        return;
954
0
    }
955
0
    auto find_tablet = txn_tablet_delta_writer_map[transaction_id].find(tablet_id);
956
0
    if (find_tablet == txn_tablet_delta_writer_map[transaction_id].end()) {
957
0
        LOG(WARNING) << "delta writer is not exist, txn_id=" << transaction_id
958
0
                     << ", tablet_id=" << tablet_id;
959
0
        return;
960
0
    }
961
0
    DeltaWriter* delta_writer = txn_tablet_delta_writer_map[transaction_id][tablet_id];
962
0
    delta_writer->finish_slave_tablet_pull_rowset(node_id, is_succeed);
963
0
}
964
965
0
void TxnManager::clear_txn_tablet_delta_writer(int64_t transaction_id) {
966
0
    std::lock_guard<std::shared_mutex> txn_wrlock(
967
0
            _get_txn_tablet_delta_writer_map_lock(transaction_id));
968
0
    txn_tablet_delta_writer_map_t& txn_tablet_delta_writer_map =
969
0
            _get_txn_tablet_delta_writer_map(transaction_id);
970
0
    auto it = txn_tablet_delta_writer_map.find(transaction_id);
971
0
    if (it != txn_tablet_delta_writer_map.end()) {
972
0
        txn_tablet_delta_writer_map.erase(it);
973
0
    }
974
0
    VLOG_CRITICAL << "remove delta writer manager, txn_id=" << transaction_id;
975
0
}
976
977
5.96k
int64_t TxnManager::get_txn_by_tablet_version(int64_t tablet_id, int64_t version) {
978
5.96k
    char key[16];
979
5.96k
    memcpy(key, &tablet_id, sizeof(int64_t));
980
5.96k
    memcpy(key + sizeof(int64_t), &version, sizeof(int64_t));
981
5.96k
    CacheKey cache_key((const char*)&key, sizeof(key));
982
983
5.96k
    auto* handle = _tablet_version_cache->lookup(cache_key);
984
5.96k
    if (handle == nullptr) {
985
5.95k
        return -1;
986
5.95k
    }
987
13
    int64_t res = ((CacheValue*)_tablet_version_cache->value(handle))->value;
988
13
    _tablet_version_cache->release(handle);
989
13
    return res;
990
5.96k
}
991
992
5.95k
void TxnManager::update_tablet_version_txn(int64_t tablet_id, int64_t version, int64_t txn_id) {
993
5.95k
    char key[16];
994
5.95k
    memcpy(key, &tablet_id, sizeof(int64_t));
995
5.95k
    memcpy(key + sizeof(int64_t), &version, sizeof(int64_t));
996
5.95k
    CacheKey cache_key((const char*)&key, sizeof(key));
997
998
5.95k
    auto* value = new CacheValue;
999
5.95k
    value->value = txn_id;
1000
5.95k
    auto* handle = _tablet_version_cache->insert(cache_key, value, 1, sizeof(txn_id),
1001
5.95k
                                                 CachePriority::NORMAL);
1002
5.95k
    _tablet_version_cache->release(handle);
1003
5.95k
}
1004
1005
TxnState TxnManager::get_txn_state(TPartitionId partition_id, TTransactionId transaction_id,
1006
0
                                   TTabletId tablet_id, TabletUid tablet_uid) {
1007
0
    pair<int64_t, int64_t> key(partition_id, transaction_id);
1008
0
    TabletInfo tablet_info(tablet_id, tablet_uid);
1009
1010
0
    std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
1011
1012
0
    auto& txn_tablet_map = _get_txn_tablet_map(transaction_id);
1013
0
    auto it = txn_tablet_map.find(key);
1014
0
    if (it == txn_tablet_map.end()) {
1015
0
        return TxnState::NOT_FOUND;
1016
0
    }
1017
1018
0
    auto& tablet_txn_info_map = it->second;
1019
0
    auto tablet_txn_info_iter = tablet_txn_info_map.find(tablet_info);
1020
0
    if (tablet_txn_info_iter == tablet_txn_info_map.end()) {
1021
0
        return TxnState::NOT_FOUND;
1022
0
    }
1023
1024
0
    const auto& txn_info = tablet_txn_info_iter->second;
1025
0
    return txn_info->state;
1026
0
}
1027
1028
} // namespace doris