Coverage Report

Created: 2024-11-20 21:49

/root/doris/be/src/olap/txn_manager.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "txn_manager.h"
19
20
#include <fmt/format.h>
21
#include <fmt/ranges.h>
22
#include <thrift/protocol/TDebugProtocol.h>
23
#include <time.h>
24
25
#include <filesystem>
26
#include <iterator>
27
#include <list>
28
#include <new>
29
#include <ostream>
30
#include <queue>
31
#include <set>
32
#include <string>
33
34
#include "common/config.h"
35
#include "common/logging.h"
36
#include "common/status.h"
37
#include "olap/data_dir.h"
38
#include "olap/delta_writer.h"
39
#include "olap/olap_common.h"
40
#include "olap/partial_update_info.h"
41
#include "olap/rowset/pending_rowset_helper.h"
42
#include "olap/rowset/rowset_meta.h"
43
#include "olap/rowset/rowset_meta_manager.h"
44
#include "olap/schema_change.h"
45
#include "olap/segment_loader.h"
46
#include "olap/storage_engine.h"
47
#include "olap/tablet_manager.h"
48
#include "olap/tablet_meta.h"
49
#include "olap/tablet_meta_manager.h"
50
#include "olap/task/engine_publish_version_task.h"
51
#include "util/debug_points.h"
52
#include "util/time.h"
53
54
namespace doris {
55
class OlapMeta;
56
} // namespace doris
57
58
using std::map;
59
using std::pair;
60
using std::set;
61
using std::string;
62
using std::stringstream;
63
using std::vector;
64
65
namespace doris {
66
using namespace ErrorCode;
67
68
TxnManager::TxnManager(StorageEngine& engine, int32_t txn_map_shard_size, int32_t txn_shard_size)
69
        : _engine(engine),
70
          _txn_map_shard_size(txn_map_shard_size),
71
151
          _txn_shard_size(txn_shard_size) {
72
151
    DCHECK_GT(_txn_map_shard_size, 0);
73
151
    DCHECK_GT(_txn_shard_size, 0);
74
151
    DCHECK_EQ(_txn_map_shard_size & (_txn_map_shard_size - 1), 0);
75
151
    DCHECK_EQ(_txn_shard_size & (_txn_shard_size - 1), 0);
76
151
    _txn_map_locks = new std::shared_mutex[_txn_map_shard_size];
77
151
    _txn_tablet_maps = new txn_tablet_map_t[_txn_map_shard_size];
78
151
    _txn_partition_maps = new txn_partition_map_t[_txn_map_shard_size];
79
151
    _txn_mutex = new std::shared_mutex[_txn_shard_size];
80
151
    _txn_tablet_delta_writer_map = new txn_tablet_delta_writer_map_t[_txn_map_shard_size];
81
151
    _txn_tablet_delta_writer_map_locks = new std::shared_mutex[_txn_map_shard_size];
82
    // For debugging
83
151
    _tablet_version_cache = std::make_unique<TabletVersionCache>(100000);
84
151
}
85
86
// prepare txn should always be allowed because ingest task will be retried
87
// could not distinguish rollup, schema change or base table, prepare txn successfully will allow
88
// ingest retried
89
Status TxnManager::prepare_txn(TPartitionId partition_id, const Tablet& tablet,
90
                               TTransactionId transaction_id, const PUniqueId& load_id,
91
25
                               bool ingest) {
92
    // check if the tablet has already been shutdown. If it has, it indicates that
93
    // it is an old tablet, and data should not be imported into the old tablet.
94
    // Otherwise, it may lead to data loss during migration.
95
25
    if (tablet.tablet_state() == TABLET_SHUTDOWN) {
96
0
        return Status::InternalError<false>(
97
0
                "The tablet's state is shutdown, tablet_id: {}. The tablet may have been dropped "
98
0
                "or migrationed. Please check if the table has been dropped or try again.",
99
0
                tablet.tablet_id());
100
0
    }
101
25
    return prepare_txn(partition_id, transaction_id, tablet.tablet_id(), tablet.tablet_uid(),
102
25
                       load_id, ingest);
103
25
}
104
105
// most used for ut
106
Status TxnManager::prepare_txn(TPartitionId partition_id, TTransactionId transaction_id,
107
                               TTabletId tablet_id, TabletUid tablet_uid, const PUniqueId& load_id,
108
33
                               bool ingest) {
109
33
    TxnKey key(partition_id, transaction_id);
110
33
    TabletInfo tablet_info(tablet_id, tablet_uid);
111
33
    std::lock_guard<std::shared_mutex> txn_wrlock(_get_txn_map_lock(transaction_id));
112
33
    txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
113
114
33
    DBUG_EXECUTE_IF("TxnManager.prepare_txn.random_failed", {
115
33
        if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
116
33
            LOG_WARNING("TxnManager.prepare_txn.random_failed random failed")
117
33
                    .tag("txn_id", transaction_id)
118
33
                    .tag("tablet_id", tablet_id);
119
33
            return Status::InternalError("debug prepare txn random failed");
120
33
        }
121
33
    });
122
33
    DBUG_EXECUTE_IF("TxnManager.prepare_txn.wait", {
123
33
        if (auto wait = dp->param<int>("duration", 0); wait > 0) {
124
33
            LOG_WARNING("TxnManager.prepare_txn.wait")
125
33
                    .tag("txn_id", transaction_id)
126
33
                    .tag("tablet_id", tablet_id)
127
33
                    .tag("wait ms", wait);
128
33
            std::this_thread::sleep_for(std::chrono::milliseconds(wait));
129
33
        }
130
33
    });
131
132
    /// Step 1: check if the transaction is already exist
133
33
    do {
134
33
        auto iter = txn_tablet_map.find(key);
135
33
        if (iter == txn_tablet_map.end()) {
136
29
            break;
137
29
        }
138
139
        // exist TxnKey
140
4
        auto& txn_tablet_info_map = iter->second;
141
4
        auto load_itr = txn_tablet_info_map.find(tablet_info);
142
4
        if (load_itr == txn_tablet_info_map.end()) {
143
2
            break;
144
2
        }
145
146
        // found load for txn,tablet
147
2
        auto& load_info = load_itr->second;
148
        // case 1: user commit rowset, then the load id must be equal
149
        // check if load id is equal
150
2
        auto& load_id = load_info->load_id;
151
2
        if (load_info->load_id.hi() == load_id.hi() && load_info->load_id.lo() == load_id.lo() &&
152
2
            load_info->rowset != nullptr) {
153
1
            LOG(WARNING) << "find transaction exists when add to engine."
154
1
                         << "partition_id: " << key.first << ", transaction_id: " << key.second
155
1
                         << ", tablet: " << tablet_info.to_string();
156
1
            return Status::OK();
157
1
        }
158
2
    } while (false);
159
160
    /// Step 2: check if there are too many transactions on running.
161
    // check if there are too many transactions on running.
162
    // if yes, reject the request.
163
32
    txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id);
164
32
    if (txn_partition_map.size() > config::max_runnings_transactions_per_txn_map) {
165
0
        return Status::Error<TOO_MANY_TRANSACTIONS>("too many transactions: {}, limit: {}",
166
0
                                                    txn_tablet_map.size(),
167
0
                                                    config::max_runnings_transactions_per_txn_map);
168
0
    }
169
170
    /// Step 3: Add transaction to engine
171
    // not found load id
172
    // case 1: user start a new txn, rowset = null
173
    // case 2: loading txn from meta env
174
32
    auto load_info = std::make_shared<TabletTxnInfo>(load_id, nullptr, ingest);
175
32
    load_info->prepare();
176
32
    txn_tablet_map[key][tablet_info] = std::move(load_info);
177
32
    _insert_txn_partition_map_unlocked(transaction_id, partition_id);
178
32
    VLOG_NOTICE << "add transaction to engine successfully."
179
0
                << "partition_id: " << key.first << ", transaction_id: " << key.second
180
0
                << ", tablet: " << tablet_info.to_string();
181
32
    return Status::OK();
182
32
}
183
184
Status TxnManager::commit_txn(TPartitionId partition_id, const Tablet& tablet,
185
                              TTransactionId transaction_id, const PUniqueId& load_id,
186
                              const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard,
187
                              bool is_recovery,
188
20
                              std::shared_ptr<PartialUpdateInfo> partial_update_info) {
189
20
    return commit_txn(tablet.data_dir()->get_meta(), partition_id, transaction_id,
190
20
                      tablet.tablet_id(), tablet.tablet_uid(), load_id, rowset_ptr,
191
20
                      std::move(guard), is_recovery, partial_update_info);
192
20
}
193
194
Status TxnManager::publish_txn(TPartitionId partition_id, const TabletSharedPtr& tablet,
195
                               TTransactionId transaction_id, const Version& version,
196
0
                               TabletPublishStatistics* stats) {
197
0
    return publish_txn(tablet->data_dir()->get_meta(), partition_id, transaction_id,
198
0
                       tablet->tablet_id(), tablet->tablet_uid(), version, stats);
199
0
}
200
201
void TxnManager::abort_txn(TPartitionId partition_id, TTransactionId transaction_id,
202
0
                           TTabletId tablet_id, TabletUid tablet_uid) {
203
0
    pair<int64_t, int64_t> key(partition_id, transaction_id);
204
0
    TabletInfo tablet_info(tablet_id, tablet_uid);
205
206
0
    std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
207
208
0
    auto& txn_tablet_map = _get_txn_tablet_map(transaction_id);
209
0
    auto it = txn_tablet_map.find(key);
210
0
    if (it == txn_tablet_map.end()) {
211
0
        return;
212
0
    }
213
214
0
    auto& tablet_txn_info_map = it->second;
215
0
    auto tablet_txn_info_iter = tablet_txn_info_map.find(tablet_info);
216
0
    if (tablet_txn_info_iter == tablet_txn_info_map.end()) {
217
0
        return;
218
0
    }
219
220
0
    auto& txn_info = tablet_txn_info_iter->second;
221
0
    txn_info->abort();
222
0
}
223
224
// delete the txn from manager if it is not committed(not have a valid rowset)
225
Status TxnManager::rollback_txn(TPartitionId partition_id, const Tablet& tablet,
226
5
                                TTransactionId transaction_id) {
227
5
    return rollback_txn(partition_id, transaction_id, tablet.tablet_id(), tablet.tablet_uid());
228
5
}
229
230
Status TxnManager::delete_txn(TPartitionId partition_id, const TabletSharedPtr& tablet,
231
0
                              TTransactionId transaction_id) {
232
0
    return delete_txn(tablet->data_dir()->get_meta(), partition_id, transaction_id,
233
0
                      tablet->tablet_id(), tablet->tablet_uid());
234
0
}
235
236
void TxnManager::set_txn_related_delete_bitmap(
237
        TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id,
238
        TabletUid tablet_uid, bool unique_key_merge_on_write, DeleteBitmapPtr delete_bitmap,
239
        const RowsetIdUnorderedSet& rowset_ids,
240
5
        std::shared_ptr<PartialUpdateInfo> partial_update_info) {
241
5
    pair<int64_t, int64_t> key(partition_id, transaction_id);
242
5
    TabletInfo tablet_info(tablet_id, tablet_uid);
243
244
5
    std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id));
245
5
    {
246
        // get tx
247
5
        std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id));
248
5
        txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
249
5
        auto it = txn_tablet_map.find(key);
250
5
        if (it == txn_tablet_map.end()) {
251
1
            LOG(WARNING) << "transaction_id: " << transaction_id
252
1
                         << " partition_id: " << partition_id << " may be cleared";
253
1
            return;
254
1
        }
255
4
        auto load_itr = it->second.find(tablet_info);
256
4
        if (load_itr == it->second.end()) {
257
0
            LOG(WARNING) << "transaction_id: " << transaction_id
258
0
                         << " partition_id: " << partition_id << " tablet_id: " << tablet_id
259
0
                         << " may be cleared";
260
0
            return;
261
0
        }
262
4
        auto& load_info = load_itr->second;
263
4
        load_info->unique_key_merge_on_write = unique_key_merge_on_write;
264
4
        load_info->delete_bitmap = delete_bitmap;
265
4
        load_info->rowset_ids = rowset_ids;
266
4
        load_info->partial_update_info = partial_update_info;
267
4
    }
268
4
}
269
270
Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id,
271
                              TTransactionId transaction_id, TTabletId tablet_id,
272
                              TabletUid tablet_uid, const PUniqueId& load_id,
273
                              const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard,
274
                              bool is_recovery,
275
31
                              std::shared_ptr<PartialUpdateInfo> partial_update_info) {
276
31
    if (partition_id < 1 || transaction_id < 1 || tablet_id < 1) {
277
0
        LOG(WARNING) << "invalid commit req "
278
0
                     << " partition_id=" << partition_id << " transaction_id=" << transaction_id
279
0
                     << " tablet_id=" << tablet_id;
280
0
        return Status::InternalError("invalid partition id");
281
0
    }
282
283
31
    pair<int64_t, int64_t> key(partition_id, transaction_id);
284
31
    TabletInfo tablet_info(tablet_id, tablet_uid);
285
31
    if (rowset_ptr == nullptr) {
286
0
        return Status::Error<ROWSET_INVALID>(
287
0
                "could not commit txn because rowset ptr is null. partition_id: {}, "
288
0
                "transaction_id: {}, tablet: {}",
289
0
                key.first, key.second, tablet_info.to_string());
290
0
    }
291
292
31
    DBUG_EXECUTE_IF("TxnManager.commit_txn.random_failed", {
293
31
        if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
294
31
            LOG_WARNING("TxnManager.commit_txn.random_failed")
295
31
                    .tag("txn_id", transaction_id)
296
31
                    .tag("tablet_id", tablet_id);
297
31
            return Status::InternalError("debug commit txn random failed");
298
31
        }
299
31
    });
300
31
    DBUG_EXECUTE_IF("TxnManager.commit_txn.wait", {
301
31
        if (auto wait = dp->param<int>("duration", 0); wait > 0) {
302
31
            LOG_WARNING("TxnManager.commit_txn.wait")
303
31
                    .tag("txn_id", transaction_id)
304
31
                    .tag("tablet_id", tablet_id)
305
31
                    .tag("wait ms", wait);
306
31
            std::this_thread::sleep_for(std::chrono::milliseconds(wait));
307
31
        }
308
31
    });
309
310
31
    std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id));
311
    // this while loop just run only once, just for if break
312
31
    do {
313
        // get tx
314
31
        std::shared_lock rdlock(_get_txn_map_lock(transaction_id));
315
31
        auto rs_pb = rowset_ptr->rowset_meta()->get_rowset_pb();
316
        // TODO(dx): remove log after fix partition id eq 0 bug
317
31
        if (!rs_pb.has_partition_id() || rs_pb.partition_id() == 0) {
318
1
            rowset_ptr->rowset_meta()->set_partition_id(partition_id);
319
1
            LOG(WARNING) << "cant get partition id from rs pb, get from func arg partition_id="
320
1
                         << partition_id;
321
1
        }
322
31
        txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
323
31
        auto it = txn_tablet_map.find(key);
324
31
        if (it == txn_tablet_map.end()) {
325
8
            break;
326
8
        }
327
328
23
        auto load_itr = it->second.find(tablet_info);
329
23
        if (load_itr == it->second.end()) {
330
0
            break;
331
0
        }
332
333
        // found load for txn,tablet
334
        // case 1: user commit rowset, then the load id must be equal
335
23
        auto& load_info = load_itr->second;
336
        // check if load id is equal
337
23
        if (load_info->rowset == nullptr) {
338
21
            break;
339
21
        }
340
341
2
        if (load_info->load_id.hi() != load_id.hi() || load_info->load_id.lo() != load_id.lo()) {
342
0
            break;
343
0
        }
344
345
        // find a rowset with same rowset id, then it means a duplicate call
346
2
        if (load_info->rowset->rowset_id() == rowset_ptr->rowset_id()) {
347
1
            LOG(INFO) << "find rowset exists when commit transaction to engine."
348
1
                      << "partition_id: " << key.first << ", transaction_id: " << key.second
349
1
                      << ", tablet: " << tablet_info.to_string()
350
1
                      << ", rowset_id: " << load_info->rowset->rowset_id();
351
            // Should not remove this rowset from pending rowsets
352
1
            load_info->pending_rs_guard = std::move(guard);
353
1
            return Status::OK();
354
1
        }
355
356
        // find a rowset with different rowset id, then it should not happen, just return errors
357
1
        return Status::Error<PUSH_TRANSACTION_ALREADY_EXIST>(
358
1
                "find rowset exists when commit transaction to engine. but rowset ids are not "
359
1
                "same. partition_id: {}, transaction_id: {}, tablet: {}, exist rowset_id: {}, new "
360
1
                "rowset_id: {}",
361
1
                key.first, key.second, tablet_info.to_string(),
362
1
                load_info->rowset->rowset_id().to_string(), rowset_ptr->rowset_id().to_string());
363
2
    } while (false);
364
365
    // if not in recovery mode, then should persist the meta to meta env
366
    // save meta need access disk, it maybe very slow, so that it is not in global txn lock
367
    // it is under a single txn lock
368
29
    if (!is_recovery) {
369
29
        Status save_status =
370
29
                RowsetMetaManager::save(meta, tablet_uid, rowset_ptr->rowset_id(),
371
29
                                        rowset_ptr->rowset_meta()->get_rowset_pb(), false);
372
29
        DBUG_EXECUTE_IF("TxnManager.RowsetMetaManager.save_wait", {
373
29
            if (auto wait = dp->param<int>("duration", 0); wait > 0) {
374
29
                LOG_WARNING("TxnManager.RowsetMetaManager.save_wait")
375
29
                        .tag("txn_id", transaction_id)
376
29
                        .tag("tablet_id", tablet_id)
377
29
                        .tag("wait ms", wait);
378
29
                std::this_thread::sleep_for(std::chrono::milliseconds(wait));
379
29
            }
380
29
        });
381
29
        if (!save_status.ok()) {
382
0
            save_status.append(fmt::format(", txn id: {}", transaction_id));
383
0
            return save_status;
384
0
        }
385
386
29
        if (partial_update_info && partial_update_info->is_partial_update()) {
387
0
            PartialUpdateInfoPB partial_update_info_pb;
388
0
            partial_update_info->to_pb(&partial_update_info_pb);
389
0
            save_status = RowsetMetaManager::save_partial_update_info(
390
0
                    meta, tablet_id, partition_id, transaction_id, partial_update_info_pb);
391
0
            if (!save_status.ok()) {
392
0
                save_status.append(fmt::format(", txn_id: {}", transaction_id));
393
0
                return save_status;
394
0
            }
395
0
        }
396
29
    }
397
398
29
    TabletSharedPtr tablet;
399
29
    std::shared_ptr<PartialUpdateInfo> decoded_partial_update_info {nullptr};
400
29
    if (is_recovery) {
401
0
        tablet = _engine.tablet_manager()->get_tablet(tablet_id, tablet_uid);
402
0
        if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) {
403
0
            PartialUpdateInfoPB partial_update_info_pb;
404
0
            auto st = RowsetMetaManager::try_get_partial_update_info(
405
0
                    meta, tablet_id, partition_id, transaction_id, &partial_update_info_pb);
406
0
            if (st.ok()) {
407
0
                decoded_partial_update_info = std::make_shared<PartialUpdateInfo>();
408
0
                decoded_partial_update_info->from_pb(&partial_update_info_pb);
409
0
                DCHECK(decoded_partial_update_info->is_partial_update());
410
0
            } else if (!st.is<META_KEY_NOT_FOUND>()) {
411
                // the load is not a partial update
412
0
                return st;
413
0
            }
414
0
        }
415
0
    }
416
417
29
    {
418
29
        std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id));
419
29
        auto load_info = std::make_shared<TabletTxnInfo>(load_id, rowset_ptr);
420
29
        load_info->pending_rs_guard = std::move(guard);
421
29
        if (is_recovery) {
422
0
            if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) {
423
0
                load_info->unique_key_merge_on_write = true;
424
0
                load_info->delete_bitmap.reset(new DeleteBitmap(tablet->tablet_id()));
425
0
                if (decoded_partial_update_info) {
426
0
                    LOG_INFO(
427
0
                            "get partial update info from RocksDB during recovery. txn_id={}, "
428
0
                            "partition_id={}, tablet_id={}, partial_update_info=[{}]",
429
0
                            transaction_id, partition_id, tablet_id,
430
0
                            decoded_partial_update_info->summary());
431
0
                    load_info->partial_update_info = decoded_partial_update_info;
432
0
                }
433
0
            }
434
0
        }
435
29
        load_info->commit();
436
437
29
        txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
438
29
        txn_tablet_map[key][tablet_info] = std::move(load_info);
439
29
        _insert_txn_partition_map_unlocked(transaction_id, partition_id);
440
29
        VLOG_NOTICE << "commit transaction to engine successfully."
441
0
                    << " partition_id: " << key.first << ", transaction_id: " << key.second
442
0
                    << ", tablet: " << tablet_info.to_string()
443
0
                    << ", rowsetid: " << rowset_ptr->rowset_id()
444
0
                    << ", version: " << rowset_ptr->version().first;
445
29
    }
446
29
    return Status::OK();
447
29
}
448
449
// remove a txn from txn manager
450
Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,
451
                               TTransactionId transaction_id, TTabletId tablet_id,
452
                               TabletUid tablet_uid, const Version& version,
453
12
                               TabletPublishStatistics* stats) {
454
12
    auto tablet = _engine.tablet_manager()->get_tablet(tablet_id);
455
12
    if (tablet == nullptr) {
456
0
        return Status::OK();
457
0
    }
458
12
    DCHECK(stats != nullptr);
459
460
12
    pair<int64_t, int64_t> key(partition_id, transaction_id);
461
12
    TabletInfo tablet_info(tablet_id, tablet_uid);
462
12
    RowsetSharedPtr rowset;
463
12
    std::shared_ptr<TabletTxnInfo> tablet_txn_info;
464
12
    int64_t t1 = MonotonicMicros();
465
    /// Step 1: get rowset, tablet_txn_info by key
466
12
    {
467
12
        std::shared_lock txn_rlock(_get_txn_lock(transaction_id));
468
12
        std::shared_lock txn_map_rlock(_get_txn_map_lock(transaction_id));
469
12
        stats->lock_wait_time_us += MonotonicMicros() - t1;
470
471
12
        txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
472
12
        if (auto it = txn_tablet_map.find(key); it != txn_tablet_map.end()) {
473
11
            auto& tablet_map = it->second;
474
11
            if (auto txn_info_iter = tablet_map.find(tablet_info);
475
11
                txn_info_iter != tablet_map.end()) {
476
                // found load for txn,tablet
477
                // case 1: user commit rowset, then the load id must be equal
478
11
                tablet_txn_info = txn_info_iter->second;
479
11
                rowset = tablet_txn_info->rowset;
480
11
            }
481
11
        }
482
12
    }
483
12
    if (rowset == nullptr) {
484
1
        return Status::Error<TRANSACTION_NOT_EXIST>(
485
1
                "publish txn failed, rowset not found. partition_id={}, transaction_id={}, "
486
1
                "tablet={}",
487
1
                partition_id, transaction_id, tablet_info.to_string());
488
1
    }
489
11
    DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_before_save_rs_meta", {
490
11
        if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
491
11
            LOG_WARNING("TxnManager.publish_txn.random_failed_before_save_rs_meta")
492
11
                    .tag("txn_id", transaction_id)
493
11
                    .tag("tablet_id", tablet_id);
494
11
            return Status::InternalError("debug publish txn before save rs meta random failed");
495
11
        }
496
11
    });
497
11
    DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_before_save_rs_meta", {
498
11
        if (auto wait = dp->param<int>("duration", 0); wait > 0) {
499
11
            LOG_WARNING("TxnManager.publish_txn.wait_before_save_rs_meta")
500
11
                    .tag("txn_id", transaction_id)
501
11
                    .tag("tablet_id", tablet_id)
502
11
                    .tag("wait ms", wait);
503
11
            std::this_thread::sleep_for(std::chrono::milliseconds(wait));
504
11
        }
505
11
    });
506
507
    /// Step 2: make rowset visible
508
    // save meta need access disk, it maybe very slow, so that it is not in global txn lock
509
    // it is under a single txn lock
510
    // TODO(ygl): rowset is already set version here, memory is changed, if save failed
511
    // it maybe a fatal error
512
11
    rowset->make_visible(version);
513
514
11
    DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_after_save_rs_meta", {
515
11
        if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
516
11
            LOG_WARNING("TxnManager.publish_txn.random_failed_after_save_rs_meta")
517
11
                    .tag("txn_id", transaction_id)
518
11
                    .tag("tablet_id", tablet_id);
519
11
            return Status::InternalError("debug publish txn after save rs meta random failed");
520
11
        }
521
11
    });
522
11
    DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_after_save_rs_meta", {
523
11
        if (auto wait = dp->param<int>("duration", 0); wait > 0) {
524
11
            LOG_WARNING("TxnManager.publish_txn.wait_after_save_rs_meta")
525
11
                    .tag("txn_id", transaction_id)
526
11
                    .tag("tablet_id", tablet_id)
527
11
                    .tag("wait ms", wait);
528
11
            std::this_thread::sleep_for(std::chrono::milliseconds(wait));
529
11
        }
530
11
    });
531
    // update delete_bitmap
532
11
    if (tablet_txn_info->unique_key_merge_on_write) {
533
3
        int64_t t2 = MonotonicMicros();
534
3
        RETURN_IF_ERROR(
535
3
                Tablet::update_delete_bitmap(tablet, tablet_txn_info.get(), transaction_id));
536
3
        int64_t t3 = MonotonicMicros();
537
3
        stats->calc_delete_bitmap_time_us = t3 - t2;
538
3
        RETURN_IF_ERROR(TabletMetaManager::save_delete_bitmap(
539
3
                tablet->data_dir(), tablet->tablet_id(), tablet_txn_info->delete_bitmap,
540
3
                version.second));
541
3
        stats->save_meta_time_us = MonotonicMicros() - t3;
542
3
    }
543
544
    /// Step 3:  add to binlog
545
11
    auto enable_binlog = tablet->is_enable_binlog();
546
11
    if (enable_binlog) {
547
0
        auto status = rowset->add_to_binlog();
548
0
        if (!status.ok()) {
549
0
            return Status::Error<ROWSET_ADD_TO_BINLOG_FAILED>(
550
0
                    "add rowset to binlog failed. when publish txn rowset_id: {}, tablet id: {}, "
551
0
                    "txn id: {}",
552
0
                    rowset->rowset_id().to_string(), tablet_id, transaction_id);
553
0
        }
554
0
    }
555
556
    /// Step 4: save meta
557
11
    int64_t t5 = MonotonicMicros();
558
11
    auto status = RowsetMetaManager::save(meta, tablet_uid, rowset->rowset_id(),
559
11
                                          rowset->rowset_meta()->get_rowset_pb(), enable_binlog);
560
11
    stats->save_meta_time_us += MonotonicMicros() - t5;
561
11
    if (!status.ok()) {
562
0
        status.append(fmt::format(", txn id: {}", transaction_id));
563
0
        return status;
564
0
    }
565
566
11
    if (tablet_txn_info->unique_key_merge_on_write && tablet_txn_info->partial_update_info &&
567
11
        tablet_txn_info->partial_update_info->is_partial_update()) {
568
0
        status = RowsetMetaManager::remove_partial_update_info(meta, tablet_id, partition_id,
569
0
                                                               transaction_id);
570
0
        if (!status) {
571
            // discard the error status and print the warning log
572
0
            LOG_WARNING(
573
0
                    "fail to remove partial update info from RocksDB. txn_id={}, rowset_id={}, "
574
0
                    "tablet_id={}, tablet_uid={}",
575
0
                    transaction_id, rowset->rowset_id().to_string(), tablet_id,
576
0
                    tablet_uid.to_string());
577
0
        }
578
0
    }
579
580
    // TODO(Drogon): remove these test codes
581
11
    if (enable_binlog) {
582
0
        auto version_str = fmt::format("{}", version.first);
583
0
        VLOG_DEBUG << fmt::format("tabletid: {}, version: {}, binlog filepath: {}", tablet_id,
584
0
                                  version_str, tablet->get_binlog_filepath(version_str));
585
0
    }
586
587
    /// Step 5: remove tablet_info from tnx_tablet_map
588
    // txn_tablet_map[key] empty, remove key from txn_tablet_map
589
11
    int64_t t6 = MonotonicMicros();
590
11
    std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id));
591
11
    std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id));
592
11
    stats->lock_wait_time_us += MonotonicMicros() - t6;
593
11
    txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
594
11
    if (auto it = txn_tablet_map.find(key); it != txn_tablet_map.end()) {
595
11
        it->second.erase(tablet_info);
596
11
        VLOG_NOTICE << "publish txn successfully."
597
0
                    << " partition_id: " << key.first << ", txn_id: " << key.second
598
0
                    << ", tablet_id: " << tablet_info.tablet_id
599
0
                    << ", rowsetid: " << rowset->rowset_id() << ", version: " << version.first
600
0
                    << "," << version.second;
601
11
        if (it->second.empty()) {
602
11
            txn_tablet_map.erase(it);
603
11
            _clear_txn_partition_map_unlocked(transaction_id, partition_id);
604
11
        }
605
11
    }
606
607
11
    return status;
608
11
}
609
610
// txn could be rollbacked if it does not have related rowset
611
// if the txn has related rowset then could not rollback it, because it
612
// may be committed in another thread and our current thread meets errors when writing to data file
613
// BE has to wait for fe call clear txn api
614
Status TxnManager::rollback_txn(TPartitionId partition_id, TTransactionId transaction_id,
615
7
                                TTabletId tablet_id, TabletUid tablet_uid) {
616
7
    pair<int64_t, int64_t> key(partition_id, transaction_id);
617
7
    TabletInfo tablet_info(tablet_id, tablet_uid);
618
619
7
    std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id));
620
7
    txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
621
622
7
    auto it = txn_tablet_map.find(key);
623
7
    if (it == txn_tablet_map.end()) {
624
0
        return Status::OK();
625
0
    }
626
627
7
    auto& tablet_txn_info_map = it->second;
628
7
    if (auto load_itr = tablet_txn_info_map.find(tablet_info);
629
7
        load_itr != tablet_txn_info_map.end()) {
630
        // found load for txn,tablet
631
        // case 1: user commit rowset, then the load id must be equal
632
7
        const auto& load_info = load_itr->second;
633
7
        if (load_info->rowset != nullptr) {
634
1
            return Status::Error<TRANSACTION_ALREADY_COMMITTED>(
635
1
                    "if rowset is not null, it means other thread may commit the rowset should "
636
1
                    "not delete txn any more");
637
1
        }
638
7
    }
639
640
6
    tablet_txn_info_map.erase(tablet_info);
641
6
    LOG(INFO) << "rollback transaction from engine successfully."
642
6
              << " partition_id: " << key.first << ", transaction_id: " << key.second
643
6
              << ", tablet: " << tablet_info.to_string();
644
6
    if (tablet_txn_info_map.empty()) {
645
6
        txn_tablet_map.erase(it);
646
6
        _clear_txn_partition_map_unlocked(transaction_id, partition_id);
647
6
    }
648
6
    return Status::OK();
649
7
}
650
651
// fe call this api to clear unused rowsets in be
652
// could not delete the rowset if it already has a valid version
653
Status TxnManager::delete_txn(OlapMeta* meta, TPartitionId partition_id,
654
                              TTransactionId transaction_id, TTabletId tablet_id,
655
3
                              TabletUid tablet_uid) {
656
3
    pair<int64_t, int64_t> key(partition_id, transaction_id);
657
3
    TabletInfo tablet_info(tablet_id, tablet_uid);
658
3
    std::lock_guard<std::shared_mutex> txn_wrlock(_get_txn_map_lock(transaction_id));
659
3
    txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
660
3
    auto it = txn_tablet_map.find(key);
661
3
    if (it == txn_tablet_map.end()) {
662
0
        return Status::Error<TRANSACTION_NOT_EXIST>("key not founded from txn_tablet_map");
663
0
    }
664
3
    auto load_itr = it->second.find(tablet_info);
665
3
    if (load_itr != it->second.end()) {
666
        // found load for txn,tablet
667
        // case 1: user commit rowset, then the load id must be equal
668
3
        auto& load_info = load_itr->second;
669
3
        auto& rowset = load_info->rowset;
670
3
        if (rowset != nullptr && meta != nullptr) {
671
2
            if (!rowset->is_pending()) {
672
0
                return Status::Error<TRANSACTION_ALREADY_COMMITTED>(
673
0
                        "could not delete transaction from engine, just remove it from memory not "
674
0
                        "delete from disk, because related rowset already published. partition_id: "
675
0
                        "{}, transaction_id: {}, tablet: {}, rowset id: {}, version: {}, state: {}",
676
0
                        key.first, key.second, tablet_info.to_string(),
677
0
                        rowset->rowset_id().to_string(), rowset->version().to_string(),
678
0
                        RowsetStatePB_Name(rowset->rowset_meta_state()));
679
2
            } else {
680
2
                static_cast<void>(RowsetMetaManager::remove(meta, tablet_uid, rowset->rowset_id()));
681
#ifndef BE_TEST
682
                _engine.add_unused_rowset(rowset);
683
#endif
684
2
                VLOG_NOTICE << "delete transaction from engine successfully."
685
0
                            << " partition_id: " << key.first << ", transaction_id: " << key.second
686
0
                            << ", tablet: " << tablet_info.to_string() << ", rowset: "
687
0
                            << (rowset != nullptr ? rowset->rowset_id().to_string() : "0");
688
2
            }
689
2
        }
690
3
        it->second.erase(load_itr);
691
3
    }
692
3
    if (it->second.empty()) {
693
3
        txn_tablet_map.erase(it);
694
3
        _clear_txn_partition_map_unlocked(transaction_id, partition_id);
695
3
    }
696
3
    return Status::OK();
697
3
}
698
699
void TxnManager::get_tablet_related_txns(TTabletId tablet_id, TabletUid tablet_uid,
700
                                         int64_t* partition_id,
701
4
                                         std::set<int64_t>* transaction_ids) {
702
4
    if (partition_id == nullptr || transaction_ids == nullptr) {
703
0
        LOG(WARNING) << "parameter is null when get transactions by tablet";
704
0
        return;
705
0
    }
706
707
4
    TabletInfo tablet_info(tablet_id, tablet_uid);
708
8
    for (int32_t i = 0; i < _txn_map_shard_size; i++) {
709
4
        std::shared_lock txn_rdlock(_txn_map_locks[i]);
710
4
        txn_tablet_map_t& txn_tablet_map = _txn_tablet_maps[i];
711
4
        for (auto& it : txn_tablet_map) {
712
0
            if (it.second.find(tablet_info) != it.second.end()) {
713
0
                *partition_id = it.first.first;
714
0
                transaction_ids->insert(it.first.second);
715
0
                VLOG_NOTICE << "find transaction on tablet."
716
0
                            << "partition_id: " << it.first.first
717
0
                            << ", transaction_id: " << it.first.second
718
0
                            << ", tablet: " << tablet_info.to_string();
719
0
            }
720
0
        }
721
4
    }
722
4
}
723
724
// force drop all txns related with the tablet
725
// maybe lock error, because not get txn lock before remove from meta
726
void TxnManager::force_rollback_tablet_related_txns(OlapMeta* meta, TTabletId tablet_id,
727
0
                                                    TabletUid tablet_uid) {
728
0
    TabletInfo tablet_info(tablet_id, tablet_uid);
729
0
    for (int32_t i = 0; i < _txn_map_shard_size; i++) {
730
0
        std::lock_guard<std::shared_mutex> txn_wrlock(_txn_map_locks[i]);
731
0
        txn_tablet_map_t& txn_tablet_map = _txn_tablet_maps[i];
732
0
        for (auto it = txn_tablet_map.begin(); it != txn_tablet_map.end();) {
733
0
            auto load_itr = it->second.find(tablet_info);
734
0
            if (load_itr != it->second.end()) {
735
0
                auto& load_info = load_itr->second;
736
0
                auto& rowset = load_info->rowset;
737
0
                if (rowset != nullptr && meta != nullptr) {
738
0
                    LOG(INFO) << " delete transaction from engine "
739
0
                              << ", tablet: " << tablet_info.to_string()
740
0
                              << ", rowset id: " << rowset->rowset_id();
741
0
                    static_cast<void>(
742
0
                            RowsetMetaManager::remove(meta, tablet_uid, rowset->rowset_id()));
743
0
                }
744
0
                LOG(INFO) << "remove tablet related txn."
745
0
                          << " partition_id: " << it->first.first
746
0
                          << ", transaction_id: " << it->first.second
747
0
                          << ", tablet: " << tablet_info.to_string() << ", rowset: "
748
0
                          << (rowset != nullptr ? rowset->rowset_id().to_string() : "0");
749
0
                it->second.erase(load_itr);
750
0
            }
751
0
            if (it->second.empty()) {
752
0
                _clear_txn_partition_map_unlocked(it->first.second, it->first.first);
753
0
                it = txn_tablet_map.erase(it);
754
0
            } else {
755
0
                ++it;
756
0
            }
757
0
        }
758
0
    }
759
0
    if (meta != nullptr) {
760
0
        Status st = RowsetMetaManager::remove_tablet_related_partial_update_info(meta, tablet_id);
761
0
        if (!st.ok()) {
762
0
            LOG_WARNING("failed to partial update info, tablet_id={}, err={}", tablet_id,
763
0
                        st.to_string());
764
0
        }
765
0
    }
766
0
}
767
768
void TxnManager::get_txn_related_tablets(const TTransactionId transaction_id,
769
                                         TPartitionId partition_id,
770
32
                                         std::map<TabletInfo, RowsetSharedPtr>* tablet_infos) {
771
    // get tablets in this transaction
772
32
    pair<int64_t, int64_t> key(partition_id, transaction_id);
773
32
    std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
774
32
    txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
775
32
    auto it = txn_tablet_map.find(key);
776
32
    if (it == txn_tablet_map.end()) {
777
2
        VLOG_NOTICE << "could not find tablet for"
778
0
                    << " partition_id=" << partition_id << ", transaction_id=" << transaction_id;
779
2
        return;
780
2
    }
781
30
    auto& load_info_map = it->second;
782
783
    // each tablet
784
36
    for (auto& load_info : load_info_map) {
785
36
        const TabletInfo& tablet_info = load_info.first;
786
        // must not check rowset == null here, because if rowset == null
787
        // publish version should failed
788
36
        tablet_infos->emplace(tablet_info, load_info.second->rowset);
789
36
    }
790
30
}
791
792
0
void TxnManager::get_all_related_tablets(std::set<TabletInfo>* tablet_infos) {
793
0
    for (int32_t i = 0; i < _txn_map_shard_size; i++) {
794
0
        std::shared_lock txn_rdlock(_txn_map_locks[i]);
795
0
        for (auto& it : _txn_tablet_maps[i]) {
796
0
            for (auto& tablet_load_it : it.second) {
797
0
                tablet_infos->emplace(tablet_load_it.first);
798
0
            }
799
0
        }
800
0
    }
801
0
}
802
803
void TxnManager::get_all_commit_tablet_txn_info_by_tablet(
804
1
        const Tablet& tablet, CommitTabletTxnInfoVec* commit_tablet_txn_info_vec) {
805
2
    for (int32_t i = 0; i < _txn_map_shard_size; i++) {
806
1
        std::shared_lock txn_rdlock(_txn_map_locks[i]);
807
2
        for (const auto& [txn_key, load_info_map] : _txn_tablet_maps[i]) {
808
2
            auto tablet_load_it = load_info_map.find(tablet.get_tablet_info());
809
2
            if (tablet_load_it != load_info_map.end()) {
810
2
                const auto& [_, load_info] = *tablet_load_it;
811
2
                const auto& rowset = load_info->rowset;
812
2
                const auto& delete_bitmap = load_info->delete_bitmap;
813
2
                if (!rowset || !delete_bitmap) {
814
1
                    continue;
815
1
                }
816
1
                commit_tablet_txn_info_vec->push_back({
817
1
                        .transaction_id = txn_key.second,
818
1
                        .partition_id = txn_key.first,
819
1
                        .delete_bitmap = delete_bitmap,
820
1
                        .rowset_ids = load_info->rowset_ids,
821
1
                        .partial_update_info = load_info->partial_update_info,
822
1
                });
823
1
            }
824
2
        }
825
1
    }
826
1
}
827
828
0
void TxnManager::build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>* expire_txn_map) {
829
0
    int64_t now = UnixSeconds();
830
    // traverse the txn map, and get all expired txns
831
0
    for (int32_t i = 0; i < _txn_map_shard_size; i++) {
832
0
        std::shared_lock txn_rdlock(_txn_map_locks[i]);
833
0
        for (auto&& [txn_key, tablet_txn_infos] : _txn_tablet_maps[i]) {
834
0
            auto txn_id = txn_key.second;
835
0
            for (auto&& [tablet_info, txn_info] : tablet_txn_infos) {
836
0
                double diff = difftime(now, txn_info->creation_time);
837
0
                if (diff < config::pending_data_expire_time_sec) {
838
0
                    continue;
839
0
                }
840
841
0
                (*expire_txn_map)[tablet_info].push_back(txn_id);
842
0
                if (VLOG_IS_ON(3)) {
843
0
                    VLOG_NOTICE << "find expired txn."
844
0
                                << " tablet=" << tablet_info.to_string()
845
0
                                << " transaction_id=" << txn_id << " exist_sec=" << diff;
846
0
                }
847
0
            }
848
0
        }
849
0
    }
850
0
}
851
852
void TxnManager::get_partition_ids(const TTransactionId transaction_id,
853
0
                                   std::vector<TPartitionId>* partition_ids) {
854
0
    std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
855
0
    txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id);
856
0
    auto it = txn_partition_map.find(transaction_id);
857
0
    if (it != txn_partition_map.end()) {
858
0
        for (int64_t partition_id : it->second) {
859
0
            partition_ids->push_back(partition_id);
860
0
        }
861
0
    }
862
0
}
863
864
61
void TxnManager::_insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id) {
865
61
    txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id);
866
61
    auto find = txn_partition_map.find(transaction_id);
867
61
    if (find == txn_partition_map.end()) {
868
37
        txn_partition_map[transaction_id] = std::unordered_set<int64_t>();
869
37
    }
870
61
    txn_partition_map[transaction_id].insert(partition_id);
871
61
}
872
873
20
void TxnManager::_clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id) {
874
20
    txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id);
875
20
    auto it = txn_partition_map.find(transaction_id);
876
20
    if (it != txn_partition_map.end()) {
877
20
        it->second.erase(partition_id);
878
20
        if (it->second.empty()) {
879
20
            txn_partition_map.erase(it);
880
20
        }
881
20
    }
882
20
}
883
884
void TxnManager::add_txn_tablet_delta_writer(int64_t transaction_id, int64_t tablet_id,
885
0
                                             DeltaWriter* delta_writer) {
886
0
    std::lock_guard<std::shared_mutex> txn_wrlock(
887
0
            _get_txn_tablet_delta_writer_map_lock(transaction_id));
888
0
    txn_tablet_delta_writer_map_t& txn_tablet_delta_writer_map =
889
0
            _get_txn_tablet_delta_writer_map(transaction_id);
890
0
    auto find = txn_tablet_delta_writer_map.find(transaction_id);
891
0
    if (find == txn_tablet_delta_writer_map.end()) {
892
0
        txn_tablet_delta_writer_map[transaction_id] = std::map<int64_t, DeltaWriter*>();
893
0
    }
894
0
    txn_tablet_delta_writer_map[transaction_id][tablet_id] = delta_writer;
895
0
}
896
897
void TxnManager::finish_slave_tablet_pull_rowset(int64_t transaction_id, int64_t tablet_id,
898
0
                                                 int64_t node_id, bool is_succeed) {
899
0
    std::lock_guard<std::shared_mutex> txn_wrlock(
900
0
            _get_txn_tablet_delta_writer_map_lock(transaction_id));
901
0
    txn_tablet_delta_writer_map_t& txn_tablet_delta_writer_map =
902
0
            _get_txn_tablet_delta_writer_map(transaction_id);
903
0
    auto find_txn = txn_tablet_delta_writer_map.find(transaction_id);
904
0
    if (find_txn == txn_tablet_delta_writer_map.end()) {
905
0
        LOG(WARNING) << "delta writer manager is not exist, txn_id=" << transaction_id
906
0
                     << ", tablet_id=" << tablet_id;
907
0
        return;
908
0
    }
909
0
    auto find_tablet = txn_tablet_delta_writer_map[transaction_id].find(tablet_id);
910
0
    if (find_tablet == txn_tablet_delta_writer_map[transaction_id].end()) {
911
0
        LOG(WARNING) << "delta writer is not exist, txn_id=" << transaction_id
912
0
                     << ", tablet_id=" << tablet_id;
913
0
        return;
914
0
    }
915
0
    DeltaWriter* delta_writer = txn_tablet_delta_writer_map[transaction_id][tablet_id];
916
0
    delta_writer->finish_slave_tablet_pull_rowset(node_id, is_succeed);
917
0
}
918
919
0
void TxnManager::clear_txn_tablet_delta_writer(int64_t transaction_id) {
920
0
    std::lock_guard<std::shared_mutex> txn_wrlock(
921
0
            _get_txn_tablet_delta_writer_map_lock(transaction_id));
922
0
    txn_tablet_delta_writer_map_t& txn_tablet_delta_writer_map =
923
0
            _get_txn_tablet_delta_writer_map(transaction_id);
924
0
    auto it = txn_tablet_delta_writer_map.find(transaction_id);
925
0
    if (it != txn_tablet_delta_writer_map.end()) {
926
0
        txn_tablet_delta_writer_map.erase(it);
927
0
    }
928
0
    VLOG_CRITICAL << "remove delta writer manager, txn_id=" << transaction_id;
929
0
}
930
931
6
int64_t TxnManager::get_txn_by_tablet_version(int64_t tablet_id, int64_t version) {
932
6
    char key[16];
933
6
    memcpy(key, &tablet_id, sizeof(int64_t));
934
6
    memcpy(key + sizeof(int64_t), &version, sizeof(int64_t));
935
6
    CacheKey cache_key((const char*)&key, sizeof(key));
936
937
6
    auto* handle = _tablet_version_cache->lookup(cache_key);
938
6
    if (handle == nullptr) {
939
1
        return -1;
940
1
    }
941
5
    int64_t res = ((CacheValue*)_tablet_version_cache->value(handle))->value;
942
5
    _tablet_version_cache->release(handle);
943
5
    return res;
944
6
}
945
946
4
void TxnManager::update_tablet_version_txn(int64_t tablet_id, int64_t version, int64_t txn_id) {
947
4
    char key[16];
948
4
    memcpy(key, &tablet_id, sizeof(int64_t));
949
4
    memcpy(key + sizeof(int64_t), &version, sizeof(int64_t));
950
4
    CacheKey cache_key((const char*)&key, sizeof(key));
951
952
4
    auto* value = new CacheValue;
953
4
    value->value = txn_id;
954
4
    auto* handle = _tablet_version_cache->insert(cache_key, value, 1, sizeof(txn_id),
955
4
                                                 CachePriority::NORMAL);
956
4
    _tablet_version_cache->release(handle);
957
4
}
958
959
TxnState TxnManager::get_txn_state(TPartitionId partition_id, TTransactionId transaction_id,
960
0
                                   TTabletId tablet_id, TabletUid tablet_uid) {
961
0
    pair<int64_t, int64_t> key(partition_id, transaction_id);
962
0
    TabletInfo tablet_info(tablet_id, tablet_uid);
963
964
0
    std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
965
966
0
    auto& txn_tablet_map = _get_txn_tablet_map(transaction_id);
967
0
    auto it = txn_tablet_map.find(key);
968
0
    if (it == txn_tablet_map.end()) {
969
0
        return TxnState::NOT_FOUND;
970
0
    }
971
972
0
    auto& tablet_txn_info_map = it->second;
973
0
    auto tablet_txn_info_iter = tablet_txn_info_map.find(tablet_info);
974
0
    if (tablet_txn_info_iter == tablet_txn_info_map.end()) {
975
0
        return TxnState::NOT_FOUND;
976
0
    }
977
978
0
    const auto& txn_info = tablet_txn_info_iter->second;
979
0
    return txn_info->state;
980
0
}
981
982
} // namespace doris