Coverage Report

Created: 2025-04-12 02:12

/root/doris/be/src/olap/txn_manager.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "txn_manager.h"
19
20
#include <bvar/bvar.h>
21
#include <fmt/format.h>
22
#include <fmt/ranges.h>
23
#include <thrift/protocol/TDebugProtocol.h>
24
#include <time.h>
25
26
#include <filesystem>
27
#include <iterator>
28
#include <list>
29
#include <new>
30
#include <ostream>
31
#include <queue>
32
#include <set>
33
#include <string>
34
35
#include "common/config.h"
36
#include "common/logging.h"
37
#include "common/status.h"
38
#include "olap/data_dir.h"
39
#include "olap/delta_writer.h"
40
#include "olap/olap_common.h"
41
#include "olap/partial_update_info.h"
42
#include "olap/rowset/beta_rowset.h"
43
#include "olap/rowset/pending_rowset_helper.h"
44
#include "olap/rowset/rowset_meta.h"
45
#include "olap/rowset/rowset_meta_manager.h"
46
#include "olap/schema_change.h"
47
#include "olap/segment_loader.h"
48
#include "olap/storage_engine.h"
49
#include "olap/tablet_manager.h"
50
#include "olap/tablet_meta.h"
51
#include "olap/tablet_meta_manager.h"
52
#include "olap/task/engine_publish_version_task.h"
53
#include "util/debug_points.h"
54
#include "util/time.h"
55
56
namespace doris {
57
class OlapMeta;
58
} // namespace doris
59
60
using std::map;
61
using std::pair;
62
using std::set;
63
using std::string;
64
using std::stringstream;
65
using std::vector;
66
67
namespace doris {
68
using namespace ErrorCode;
69
70
bvar::Adder<int64_t> g_tablet_txn_info_txn_partitions_count("tablet_txn_info_txn_partitions_count");
71
72
TxnManager::TxnManager(StorageEngine& engine, int32_t txn_map_shard_size, int32_t txn_shard_size)
73
        : _engine(engine),
74
          _txn_map_shard_size(txn_map_shard_size),
75
207
          _txn_shard_size(txn_shard_size) {
76
207
    DCHECK_GT(_txn_map_shard_size, 0);
77
207
    DCHECK_GT(_txn_shard_size, 0);
78
207
    DCHECK_EQ(_txn_map_shard_size & (_txn_map_shard_size - 1), 0);
79
207
    DCHECK_EQ(_txn_shard_size & (_txn_shard_size - 1), 0);
80
207
    _txn_map_locks = new std::shared_mutex[_txn_map_shard_size];
81
207
    _txn_tablet_maps = new txn_tablet_map_t[_txn_map_shard_size];
82
207
    _txn_partition_maps = new txn_partition_map_t[_txn_map_shard_size];
83
207
    _txn_mutex = new std::shared_mutex[_txn_shard_size];
84
207
    _txn_tablet_delta_writer_map = new txn_tablet_delta_writer_map_t[_txn_map_shard_size];
85
207
    _txn_tablet_delta_writer_map_locks = new std::shared_mutex[_txn_map_shard_size];
86
    // For debugging
87
207
    _tablet_version_cache = std::make_unique<TabletVersionCache>(100000);
88
207
}
89
90
// prepare txn should always be allowed because ingest task will be retried
91
// could not distinguish rollup, schema change or base table, prepare txn successfully will allow
92
// ingest retried
93
Status TxnManager::prepare_txn(TPartitionId partition_id, const Tablet& tablet,
94
                               TTransactionId transaction_id, const PUniqueId& load_id,
95
28
                               bool ingest) {
96
    // check if the tablet has already been shutdown. If it has, it indicates that
97
    // it is an old tablet, and data should not be imported into the old tablet.
98
    // Otherwise, it may lead to data loss during migration.
99
28
    if (tablet.tablet_state() == TABLET_SHUTDOWN) {
100
0
        return Status::InternalError<false>(
101
0
                "The tablet's state is shutdown, tablet_id: {}. The tablet may have been dropped "
102
0
                "or migrationed. Please check if the table has been dropped or try again.",
103
0
                tablet.tablet_id());
104
0
    }
105
28
    return prepare_txn(partition_id, transaction_id, tablet.tablet_id(), tablet.tablet_uid(),
106
28
                       load_id, ingest);
107
28
}
108
109
// most used for ut
110
Status TxnManager::prepare_txn(TPartitionId partition_id, TTransactionId transaction_id,
111
                               TTabletId tablet_id, TabletUid tablet_uid, const PUniqueId& load_id,
112
36
                               bool ingest) {
113
36
    TxnKey key(partition_id, transaction_id);
114
36
    TabletInfo tablet_info(tablet_id, tablet_uid);
115
36
    std::lock_guard<std::shared_mutex> txn_wrlock(_get_txn_map_lock(transaction_id));
116
36
    txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
117
118
36
    DBUG_EXECUTE_IF("TxnManager.prepare_txn.random_failed", {
119
36
        if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
120
36
            LOG_WARNING("TxnManager.prepare_txn.random_failed random failed")
121
36
                    .tag("txn_id", transaction_id)
122
36
                    .tag("tablet_id", tablet_id);
123
36
            return Status::InternalError("debug prepare txn random failed");
124
36
        }
125
36
    });
126
36
    DBUG_EXECUTE_IF("TxnManager.prepare_txn.wait", {
127
36
        if (auto wait = dp->param<int>("duration", 0); wait > 0) {
128
36
            LOG_WARNING("TxnManager.prepare_txn.wait")
129
36
                    .tag("txn_id", transaction_id)
130
36
                    .tag("tablet_id", tablet_id)
131
36
                    .tag("wait ms", wait);
132
36
            std::this_thread::sleep_for(std::chrono::milliseconds(wait));
133
36
        }
134
36
    });
135
136
    /// Step 1: check if the transaction is already exist
137
36
    do {
138
36
        auto iter = txn_tablet_map.find(key);
139
36
        if (iter == txn_tablet_map.end()) {
140
32
            break;
141
32
        }
142
143
        // exist TxnKey
144
4
        auto& txn_tablet_info_map = iter->second;
145
4
        auto load_itr = txn_tablet_info_map.find(tablet_info);
146
4
        if (load_itr == txn_tablet_info_map.end()) {
147
2
            break;
148
2
        }
149
150
        // found load for txn,tablet
151
2
        auto& load_info = load_itr->second;
152
        // case 1: user commit rowset, then the load id must be equal
153
        // check if load id is equal
154
2
        auto& load_id = load_info->load_id;
155
2
        if (load_info->load_id.hi() == load_id.hi() && load_info->load_id.lo() == load_id.lo() &&
156
2
            load_info->rowset != nullptr) {
157
1
            LOG(WARNING) << "find transaction exists when add to engine."
158
1
                         << "partition_id: " << key.first << ", transaction_id: " << key.second
159
1
                         << ", tablet: " << tablet_info.to_string();
160
1
            return Status::OK();
161
1
        }
162
2
    } while (false);
163
164
    /// Step 2: check if there are too many transactions on running.
165
    // check if there are too many transactions on running.
166
    // if yes, reject the request.
167
35
    txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id);
168
35
    if (txn_partition_map.size() > config::max_runnings_transactions_per_txn_map) {
169
0
        return Status::Error<TOO_MANY_TRANSACTIONS>("too many transactions: {}, limit: {}",
170
0
                                                    txn_tablet_map.size(),
171
0
                                                    config::max_runnings_transactions_per_txn_map);
172
0
    }
173
174
    /// Step 3: Add transaction to engine
175
    // not found load id
176
    // case 1: user start a new txn, rowset = null
177
    // case 2: loading txn from meta env
178
35
    auto load_info = std::make_shared<TabletTxnInfo>(load_id, nullptr, ingest);
179
35
    load_info->prepare();
180
35
    if (!txn_tablet_map.contains(key)) {
181
32
        g_tablet_txn_info_txn_partitions_count << 1;
182
32
    }
183
35
    txn_tablet_map[key][tablet_info] = std::move(load_info);
184
35
    _insert_txn_partition_map_unlocked(transaction_id, partition_id);
185
35
    VLOG_NOTICE << "add transaction to engine successfully."
186
0
                << "partition_id: " << key.first << ", transaction_id: " << key.second
187
0
                << ", tablet: " << tablet_info.to_string();
188
35
    return Status::OK();
189
35
}
190
191
Status TxnManager::commit_txn(TPartitionId partition_id, const Tablet& tablet,
192
                              TTransactionId transaction_id, const PUniqueId& load_id,
193
                              const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard,
194
                              bool is_recovery,
195
23
                              std::shared_ptr<PartialUpdateInfo> partial_update_info) {
196
23
    return commit_txn(tablet.data_dir()->get_meta(), partition_id, transaction_id,
197
23
                      tablet.tablet_id(), tablet.tablet_uid(), load_id, rowset_ptr,
198
23
                      std::move(guard), is_recovery, partial_update_info);
199
23
}
200
201
Status TxnManager::publish_txn(TPartitionId partition_id, const TabletSharedPtr& tablet,
202
                               TTransactionId transaction_id, const Version& version,
203
3
                               TabletPublishStatistics* stats) {
204
3
    return publish_txn(tablet->data_dir()->get_meta(), partition_id, transaction_id,
205
3
                       tablet->tablet_id(), tablet->tablet_uid(), version, stats);
206
3
}
207
208
void TxnManager::abort_txn(TPartitionId partition_id, TTransactionId transaction_id,
209
0
                           TTabletId tablet_id, TabletUid tablet_uid) {
210
0
    pair<int64_t, int64_t> key(partition_id, transaction_id);
211
0
    TabletInfo tablet_info(tablet_id, tablet_uid);
212
213
0
    std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
214
215
0
    auto& txn_tablet_map = _get_txn_tablet_map(transaction_id);
216
0
    auto it = txn_tablet_map.find(key);
217
0
    if (it == txn_tablet_map.end()) {
218
0
        return;
219
0
    }
220
221
0
    auto& tablet_txn_info_map = it->second;
222
0
    auto tablet_txn_info_iter = tablet_txn_info_map.find(tablet_info);
223
0
    if (tablet_txn_info_iter == tablet_txn_info_map.end()) {
224
0
        return;
225
0
    }
226
227
0
    auto& txn_info = tablet_txn_info_iter->second;
228
0
    txn_info->abort();
229
0
}
230
231
// delete the txn from manager if it is not committed(not have a valid rowset)
232
Status TxnManager::rollback_txn(TPartitionId partition_id, const Tablet& tablet,
233
5
                                TTransactionId transaction_id) {
234
5
    return rollback_txn(partition_id, transaction_id, tablet.tablet_id(), tablet.tablet_uid());
235
5
}
236
237
Status TxnManager::delete_txn(TPartitionId partition_id, const TabletSharedPtr& tablet,
238
0
                              TTransactionId transaction_id) {
239
0
    return delete_txn(tablet->data_dir()->get_meta(), partition_id, transaction_id,
240
0
                      tablet->tablet_id(), tablet->tablet_uid());
241
0
}
242
243
void TxnManager::set_txn_related_delete_bitmap(
244
        TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id,
245
        TabletUid tablet_uid, bool unique_key_merge_on_write, DeleteBitmapPtr delete_bitmap,
246
        const RowsetIdUnorderedSet& rowset_ids,
247
5
        std::shared_ptr<PartialUpdateInfo> partial_update_info) {
248
5
    pair<int64_t, int64_t> key(partition_id, transaction_id);
249
5
    TabletInfo tablet_info(tablet_id, tablet_uid);
250
251
5
    std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id));
252
5
    {
253
        // get tx
254
5
        std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id));
255
5
        txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
256
5
        auto it = txn_tablet_map.find(key);
257
5
        if (it == txn_tablet_map.end()) {
258
1
            LOG(WARNING) << "transaction_id: " << transaction_id
259
1
                         << " partition_id: " << partition_id << " may be cleared";
260
1
            return;
261
1
        }
262
4
        auto load_itr = it->second.find(tablet_info);
263
4
        if (load_itr == it->second.end()) {
264
0
            LOG(WARNING) << "transaction_id: " << transaction_id
265
0
                         << " partition_id: " << partition_id << " tablet_id: " << tablet_id
266
0
                         << " may be cleared";
267
0
            return;
268
0
        }
269
4
        auto& load_info = load_itr->second;
270
4
        load_info->unique_key_merge_on_write = unique_key_merge_on_write;
271
4
        load_info->delete_bitmap = delete_bitmap;
272
4
        load_info->rowset_ids = rowset_ids;
273
4
        load_info->partial_update_info = partial_update_info;
274
4
    }
275
4
}
276
277
Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id,
278
                              TTransactionId transaction_id, TTabletId tablet_id,
279
                              TabletUid tablet_uid, const PUniqueId& load_id,
280
                              const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard,
281
                              bool is_recovery,
282
34
                              std::shared_ptr<PartialUpdateInfo> partial_update_info) {
283
34
    if (partition_id < 1 || transaction_id < 1 || tablet_id < 1) {
284
0
        LOG(WARNING) << "invalid commit req "
285
0
                     << " partition_id=" << partition_id << " transaction_id=" << transaction_id
286
0
                     << " tablet_id=" << tablet_id;
287
0
        return Status::InternalError("invalid partition id");
288
0
    }
289
290
34
    pair<int64_t, int64_t> key(partition_id, transaction_id);
291
34
    TabletInfo tablet_info(tablet_id, tablet_uid);
292
34
    if (rowset_ptr == nullptr) {
293
0
        return Status::Error<ROWSET_INVALID>(
294
0
                "could not commit txn because rowset ptr is null. partition_id: {}, "
295
0
                "transaction_id: {}, tablet: {}",
296
0
                key.first, key.second, tablet_info.to_string());
297
0
    }
298
299
34
    DBUG_EXECUTE_IF("TxnManager.commit_txn.random_failed", {
300
34
        if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
301
34
            LOG_WARNING("TxnManager.commit_txn.random_failed")
302
34
                    .tag("txn_id", transaction_id)
303
34
                    .tag("tablet_id", tablet_id);
304
34
            return Status::InternalError("debug commit txn random failed");
305
34
        }
306
34
    });
307
34
    DBUG_EXECUTE_IF("TxnManager.commit_txn.wait", {
308
34
        if (auto wait = dp->param<int>("duration", 0); wait > 0) {
309
34
            LOG_WARNING("TxnManager.commit_txn.wait")
310
34
                    .tag("txn_id", transaction_id)
311
34
                    .tag("tablet_id", tablet_id)
312
34
                    .tag("wait ms", wait);
313
34
            std::this_thread::sleep_for(std::chrono::milliseconds(wait));
314
34
        }
315
34
    });
316
317
34
    std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id));
318
    // this while loop just run only once, just for if break
319
34
    do {
320
        // get tx
321
34
        std::shared_lock rdlock(_get_txn_map_lock(transaction_id));
322
34
        auto rs_pb = rowset_ptr->rowset_meta()->get_rowset_pb();
323
        // TODO(dx): remove log after fix partition id eq 0 bug
324
34
        if (!rs_pb.has_partition_id() || rs_pb.partition_id() == 0) {
325
1
            rowset_ptr->rowset_meta()->set_partition_id(partition_id);
326
1
            LOG(WARNING) << "cant get partition id from rs pb, get from func arg partition_id="
327
1
                         << partition_id;
328
1
        }
329
34
        txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
330
34
        auto it = txn_tablet_map.find(key);
331
34
        if (it == txn_tablet_map.end()) {
332
8
            break;
333
8
        }
334
335
26
        auto load_itr = it->second.find(tablet_info);
336
26
        if (load_itr == it->second.end()) {
337
0
            break;
338
0
        }
339
340
        // found load for txn,tablet
341
        // case 1: user commit rowset, then the load id must be equal
342
26
        auto& load_info = load_itr->second;
343
        // check if load id is equal
344
26
        if (load_info->rowset == nullptr) {
345
24
            break;
346
24
        }
347
348
2
        if (load_info->load_id.hi() != load_id.hi() || load_info->load_id.lo() != load_id.lo()) {
349
0
            break;
350
0
        }
351
352
        // find a rowset with same rowset id, then it means a duplicate call
353
2
        if (load_info->rowset->rowset_id() == rowset_ptr->rowset_id()) {
354
1
            LOG(INFO) << "find rowset exists when commit transaction to engine."
355
1
                      << "partition_id: " << key.first << ", transaction_id: " << key.second
356
1
                      << ", tablet: " << tablet_info.to_string()
357
1
                      << ", rowset_id: " << load_info->rowset->rowset_id();
358
            // Should not remove this rowset from pending rowsets
359
1
            load_info->pending_rs_guard = std::move(guard);
360
1
            return Status::OK();
361
1
        }
362
363
        // find a rowset with different rowset id, then it should not happen, just return errors
364
1
        return Status::Error<PUSH_TRANSACTION_ALREADY_EXIST>(
365
1
                "find rowset exists when commit transaction to engine. but rowset ids are not "
366
1
                "same. partition_id: {}, transaction_id: {}, tablet: {}, exist rowset_id: {}, new "
367
1
                "rowset_id: {}",
368
1
                key.first, key.second, tablet_info.to_string(),
369
1
                load_info->rowset->rowset_id().to_string(), rowset_ptr->rowset_id().to_string());
370
2
    } while (false);
371
372
    // if not in recovery mode, then should persist the meta to meta env
373
    // save meta need access disk, it maybe very slow, so that it is not in global txn lock
374
    // it is under a single txn lock
375
32
    if (!is_recovery) {
376
32
        Status save_status =
377
32
                RowsetMetaManager::save(meta, tablet_uid, rowset_ptr->rowset_id(),
378
32
                                        rowset_ptr->rowset_meta()->get_rowset_pb(), false);
379
32
        DBUG_EXECUTE_IF("TxnManager.RowsetMetaManager.save_wait", {
380
32
            if (auto wait = dp->param<int>("duration", 0); wait > 0) {
381
32
                LOG_WARNING("TxnManager.RowsetMetaManager.save_wait")
382
32
                        .tag("txn_id", transaction_id)
383
32
                        .tag("tablet_id", tablet_id)
384
32
                        .tag("wait ms", wait);
385
32
                std::this_thread::sleep_for(std::chrono::milliseconds(wait));
386
32
            }
387
32
        });
388
32
        if (!save_status.ok()) {
389
0
            save_status.append(fmt::format(", txn id: {}", transaction_id));
390
0
            return save_status;
391
0
        }
392
393
32
        if (partial_update_info && partial_update_info->is_partial_update()) {
394
0
            PartialUpdateInfoPB partial_update_info_pb;
395
0
            partial_update_info->to_pb(&partial_update_info_pb);
396
0
            save_status = RowsetMetaManager::save_partial_update_info(
397
0
                    meta, tablet_id, partition_id, transaction_id, partial_update_info_pb);
398
0
            if (!save_status.ok()) {
399
0
                save_status.append(fmt::format(", txn_id: {}", transaction_id));
400
0
                return save_status;
401
0
            }
402
0
        }
403
32
    }
404
405
32
    TabletSharedPtr tablet;
406
32
    std::shared_ptr<PartialUpdateInfo> decoded_partial_update_info {nullptr};
407
32
    if (is_recovery) {
408
0
        tablet = _engine.tablet_manager()->get_tablet(tablet_id, tablet_uid);
409
0
        if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) {
410
0
            PartialUpdateInfoPB partial_update_info_pb;
411
0
            auto st = RowsetMetaManager::try_get_partial_update_info(
412
0
                    meta, tablet_id, partition_id, transaction_id, &partial_update_info_pb);
413
0
            if (st.ok()) {
414
0
                decoded_partial_update_info = std::make_shared<PartialUpdateInfo>();
415
0
                decoded_partial_update_info->from_pb(&partial_update_info_pb);
416
0
                DCHECK(decoded_partial_update_info->is_partial_update());
417
0
            } else if (!st.is<META_KEY_NOT_FOUND>()) {
418
                // the load is not a partial update
419
0
                return st;
420
0
            }
421
0
        }
422
0
    }
423
424
32
    {
425
32
        std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id));
426
32
        auto load_info = std::make_shared<TabletTxnInfo>(load_id, rowset_ptr);
427
32
        load_info->pending_rs_guard = std::move(guard);
428
32
        if (is_recovery) {
429
0
            if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) {
430
0
                load_info->unique_key_merge_on_write = true;
431
0
                load_info->delete_bitmap.reset(new DeleteBitmap(tablet->tablet_id()));
432
0
                if (decoded_partial_update_info) {
433
0
                    LOG_INFO(
434
0
                            "get partial update info from RocksDB during recovery. txn_id={}, "
435
0
                            "partition_id={}, tablet_id={}, partial_update_info=[{}]",
436
0
                            transaction_id, partition_id, tablet_id,
437
0
                            decoded_partial_update_info->summary());
438
0
                    load_info->partial_update_info = decoded_partial_update_info;
439
0
                }
440
0
            }
441
0
        }
442
32
        load_info->commit();
443
444
32
        txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
445
32
        txn_tablet_map[key][tablet_info] = std::move(load_info);
446
32
        _insert_txn_partition_map_unlocked(transaction_id, partition_id);
447
32
        VLOG_NOTICE << "commit transaction to engine successfully."
448
0
                    << " partition_id: " << key.first << ", transaction_id: " << key.second
449
0
                    << ", tablet: " << tablet_info.to_string()
450
0
                    << ", rowsetid: " << rowset_ptr->rowset_id()
451
0
                    << ", version: " << rowset_ptr->version().first;
452
32
    }
453
32
    return Status::OK();
454
32
}
455
456
// remove a txn from txn manager
457
Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,
458
                               TTransactionId transaction_id, TTabletId tablet_id,
459
                               TabletUid tablet_uid, const Version& version,
460
15
                               TabletPublishStatistics* stats) {
461
15
    auto tablet = _engine.tablet_manager()->get_tablet(tablet_id);
462
15
    if (tablet == nullptr) {
463
0
        return Status::OK();
464
0
    }
465
15
    DCHECK(stats != nullptr);
466
467
15
    pair<int64_t, int64_t> key(partition_id, transaction_id);
468
15
    TabletInfo tablet_info(tablet_id, tablet_uid);
469
15
    RowsetSharedPtr rowset;
470
15
    std::shared_ptr<TabletTxnInfo> tablet_txn_info;
471
15
    int64_t t1 = MonotonicMicros();
472
    /// Step 1: get rowset, tablet_txn_info by key
473
15
    {
474
15
        std::shared_lock txn_rlock(_get_txn_lock(transaction_id));
475
15
        std::shared_lock txn_map_rlock(_get_txn_map_lock(transaction_id));
476
15
        stats->lock_wait_time_us += MonotonicMicros() - t1;
477
478
15
        txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
479
15
        if (auto it = txn_tablet_map.find(key); it != txn_tablet_map.end()) {
480
14
            auto& tablet_map = it->second;
481
14
            if (auto txn_info_iter = tablet_map.find(tablet_info);
482
14
                txn_info_iter != tablet_map.end()) {
483
                // found load for txn,tablet
484
                // case 1: user commit rowset, then the load id must be equal
485
14
                tablet_txn_info = txn_info_iter->second;
486
14
                rowset = tablet_txn_info->rowset;
487
14
            }
488
14
        }
489
15
    }
490
15
    if (rowset == nullptr) {
491
1
        return Status::Error<TRANSACTION_NOT_EXIST>(
492
1
                "publish txn failed, rowset not found. partition_id={}, transaction_id={}, "
493
1
                "tablet={}",
494
1
                partition_id, transaction_id, tablet_info.to_string());
495
1
    }
496
14
    DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_before_save_rs_meta", {
497
14
        if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
498
14
            LOG_WARNING("TxnManager.publish_txn.random_failed_before_save_rs_meta")
499
14
                    .tag("txn_id", transaction_id)
500
14
                    .tag("tablet_id", tablet_id);
501
14
            return Status::InternalError("debug publish txn before save rs meta random failed");
502
14
        }
503
14
    });
504
14
    DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_before_save_rs_meta", {
505
14
        if (auto wait = dp->param<int>("duration", 0); wait > 0) {
506
14
            LOG_WARNING("TxnManager.publish_txn.wait_before_save_rs_meta")
507
14
                    .tag("txn_id", transaction_id)
508
14
                    .tag("tablet_id", tablet_id)
509
14
                    .tag("wait ms", wait);
510
14
            std::this_thread::sleep_for(std::chrono::milliseconds(wait));
511
14
        }
512
14
    });
513
514
    /// Step 2: make rowset visible
515
    // save meta need access disk, it maybe very slow, so that it is not in global txn lock
516
    // it is under a single txn lock
517
    // TODO(ygl): rowset is already set version here, memory is changed, if save failed
518
    // it maybe a fatal error
519
14
    rowset->make_visible(version);
520
521
14
    DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_after_save_rs_meta", {
522
14
        if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
523
14
            LOG_WARNING("TxnManager.publish_txn.random_failed_after_save_rs_meta")
524
14
                    .tag("txn_id", transaction_id)
525
14
                    .tag("tablet_id", tablet_id);
526
14
            return Status::InternalError("debug publish txn after save rs meta random failed");
527
14
        }
528
14
    });
529
14
    DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_after_save_rs_meta", {
530
14
        if (auto wait = dp->param<int>("duration", 0); wait > 0) {
531
14
            LOG_WARNING("TxnManager.publish_txn.wait_after_save_rs_meta")
532
14
                    .tag("txn_id", transaction_id)
533
14
                    .tag("tablet_id", tablet_id)
534
14
                    .tag("wait ms", wait);
535
14
            std::this_thread::sleep_for(std::chrono::milliseconds(wait));
536
14
        }
537
14
    });
538
    // update delete_bitmap
539
14
    if (tablet_txn_info->unique_key_merge_on_write) {
540
3
        int64_t t2 = MonotonicMicros();
541
3
        if (rowset->num_segments() > 1 &&
542
3
            !tablet_txn_info->delete_bitmap->has_calculated_for_multi_segments(
543
0
                    rowset->rowset_id())) {
544
            // delete bitmap is empty, should re-calculate delete bitmaps between segments
545
0
            std::vector<segment_v2::SegmentSharedPtr> segments;
546
0
            RETURN_IF_ERROR(std::static_pointer_cast<BetaRowset>(rowset)->load_segments(&segments));
547
0
            RETURN_IF_ERROR(tablet->calc_delete_bitmap_between_segments(
548
0
                    rowset->rowset_id(), segments, tablet_txn_info->delete_bitmap));
549
0
        }
550
551
3
        RETURN_IF_ERROR(
552
3
                Tablet::update_delete_bitmap(tablet, tablet_txn_info.get(), transaction_id));
553
3
        int64_t t3 = MonotonicMicros();
554
3
        stats->calc_delete_bitmap_time_us = t3 - t2;
555
3
        RETURN_IF_ERROR(TabletMetaManager::save_delete_bitmap(
556
3
                tablet->data_dir(), tablet->tablet_id(), tablet_txn_info->delete_bitmap,
557
3
                version.second));
558
3
        stats->save_meta_time_us = MonotonicMicros() - t3;
559
3
    }
560
561
    /// Step 3:  add to binlog
562
14
    auto enable_binlog = tablet->is_enable_binlog();
563
14
    if (enable_binlog) {
564
0
        auto status = rowset->add_to_binlog();
565
0
        if (!status.ok()) {
566
0
            return Status::Error<ROWSET_ADD_TO_BINLOG_FAILED>(
567
0
                    "add rowset to binlog failed. when publish txn rowset_id: {}, tablet id: {}, "
568
0
                    "txn id: {}, status: {}",
569
0
                    rowset->rowset_id().to_string(), tablet_id, transaction_id,
570
0
                    status.to_string_no_stack());
571
0
        }
572
0
    }
573
574
    /// Step 4: save meta
575
14
    int64_t t5 = MonotonicMicros();
576
14
    auto status = RowsetMetaManager::save(meta, tablet_uid, rowset->rowset_id(),
577
14
                                          rowset->rowset_meta()->get_rowset_pb(), enable_binlog);
578
14
    stats->save_meta_time_us += MonotonicMicros() - t5;
579
14
    if (!status.ok()) {
580
0
        status.append(fmt::format(", txn id: {}", transaction_id));
581
0
        return status;
582
0
    }
583
584
14
    if (tablet_txn_info->unique_key_merge_on_write && tablet_txn_info->partial_update_info &&
585
14
        tablet_txn_info->partial_update_info->is_partial_update()) {
586
0
        status = RowsetMetaManager::remove_partial_update_info(meta, tablet_id, partition_id,
587
0
                                                               transaction_id);
588
0
        if (!status) {
589
            // discard the error status and print the warning log
590
0
            LOG_WARNING(
591
0
                    "fail to remove partial update info from RocksDB. txn_id={}, rowset_id={}, "
592
0
                    "tablet_id={}, tablet_uid={}",
593
0
                    transaction_id, rowset->rowset_id().to_string(), tablet_id,
594
0
                    tablet_uid.to_string());
595
0
        }
596
0
    }
597
598
    // TODO(Drogon): remove these test codes
599
14
    if (enable_binlog) {
600
0
        auto version_str = fmt::format("{}", version.first);
601
0
        VLOG_DEBUG << fmt::format("tabletid: {}, version: {}, binlog filepath: {}", tablet_id,
602
0
                                  version_str, tablet->get_binlog_filepath(version_str));
603
0
    }
604
605
    /// Step 5: remove tablet_info from tnx_tablet_map
606
    // txn_tablet_map[key] empty, remove key from txn_tablet_map
607
14
    int64_t t6 = MonotonicMicros();
608
14
    std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id));
609
14
    std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id));
610
14
    stats->lock_wait_time_us += MonotonicMicros() - t6;
611
14
    _remove_txn_tablet_info_unlocked(partition_id, transaction_id, tablet_id, tablet_uid, txn_lock,
612
14
                                     wrlock);
613
14
    VLOG_NOTICE << "publish txn successfully."
614
0
                << " partition_id: " << key.first << ", txn_id: " << key.second
615
0
                << ", tablet_id: " << tablet_info.tablet_id << ", rowsetid: " << rowset->rowset_id()
616
0
                << ", version: " << version.first << "," << version.second;
617
14
    return status;
618
14
}
619
620
void TxnManager::_remove_txn_tablet_info_unlocked(TPartitionId partition_id,
621
                                                  TTransactionId transaction_id,
622
                                                  TTabletId tablet_id, TabletUid tablet_uid,
623
                                                  std::lock_guard<std::shared_mutex>& txn_lock,
624
14
                                                  std::lock_guard<std::shared_mutex>& wrlock) {
625
14
    std::pair<int64_t, int64_t> key {partition_id, transaction_id};
626
14
    TabletInfo tablet_info {tablet_id, tablet_uid};
627
14
    txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
628
14
    if (auto it = txn_tablet_map.find(key); it != txn_tablet_map.end()) {
629
14
        it->second.erase(tablet_info);
630
14
        if (it->second.empty()) {
631
14
            txn_tablet_map.erase(it);
632
14
            g_tablet_txn_info_txn_partitions_count << -1;
633
14
            _clear_txn_partition_map_unlocked(transaction_id, partition_id);
634
14
        }
635
14
    }
636
14
}
637
638
void TxnManager::remove_txn_tablet_info(TPartitionId partition_id, TTransactionId transaction_id,
639
0
                                        TTabletId tablet_id, TabletUid tablet_uid) {
640
0
    std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id));
641
0
    std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id));
642
0
    _remove_txn_tablet_info_unlocked(partition_id, transaction_id, tablet_id, tablet_uid, txn_lock,
643
0
                                     wrlock);
644
0
}
645
646
// txn could be rollbacked if it does not have related rowset
647
// if the txn has related rowset then could not rollback it, because it
648
// may be committed in another thread and our current thread meets errors when writing to data file
649
// BE has to wait for fe call clear txn api
650
Status TxnManager::rollback_txn(TPartitionId partition_id, TTransactionId transaction_id,
651
7
                                TTabletId tablet_id, TabletUid tablet_uid) {
652
7
    pair<int64_t, int64_t> key(partition_id, transaction_id);
653
7
    TabletInfo tablet_info(tablet_id, tablet_uid);
654
655
7
    std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id));
656
7
    txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
657
658
7
    auto it = txn_tablet_map.find(key);
659
7
    if (it == txn_tablet_map.end()) {
660
0
        return Status::OK();
661
0
    }
662
663
7
    auto& tablet_txn_info_map = it->second;
664
7
    if (auto load_itr = tablet_txn_info_map.find(tablet_info);
665
7
        load_itr != tablet_txn_info_map.end()) {
666
        // found load for txn,tablet
667
        // case 1: user commit rowset, then the load id must be equal
668
7
        const auto& load_info = load_itr->second;
669
7
        if (load_info->rowset != nullptr) {
670
1
            return Status::Error<TRANSACTION_ALREADY_COMMITTED>(
671
1
                    "if rowset is not null, it means other thread may commit the rowset should "
672
1
                    "not delete txn any more");
673
1
        }
674
7
    }
675
676
6
    tablet_txn_info_map.erase(tablet_info);
677
6
    LOG(INFO) << "rollback transaction from engine successfully."
678
6
              << " partition_id: " << key.first << ", transaction_id: " << key.second
679
6
              << ", tablet: " << tablet_info.to_string();
680
6
    if (tablet_txn_info_map.empty()) {
681
6
        txn_tablet_map.erase(it);
682
6
        g_tablet_txn_info_txn_partitions_count << -1;
683
6
        _clear_txn_partition_map_unlocked(transaction_id, partition_id);
684
6
    }
685
6
    return Status::OK();
686
7
}
687
688
// fe call this api to clear unused rowsets in be
689
// could not delete the rowset if it already has a valid version
690
Status TxnManager::delete_txn(OlapMeta* meta, TPartitionId partition_id,
691
                              TTransactionId transaction_id, TTabletId tablet_id,
692
3
                              TabletUid tablet_uid) {
693
3
    pair<int64_t, int64_t> key(partition_id, transaction_id);
694
3
    TabletInfo tablet_info(tablet_id, tablet_uid);
695
3
    std::lock_guard<std::shared_mutex> txn_wrlock(_get_txn_map_lock(transaction_id));
696
3
    txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
697
3
    auto it = txn_tablet_map.find(key);
698
3
    if (it == txn_tablet_map.end()) {
699
0
        return Status::Error<TRANSACTION_NOT_EXIST>("key not founded from txn_tablet_map");
700
0
    }
701
3
    auto load_itr = it->second.find(tablet_info);
702
3
    if (load_itr != it->second.end()) {
703
        // found load for txn,tablet
704
        // case 1: user commit rowset, then the load id must be equal
705
3
        auto& load_info = load_itr->second;
706
3
        auto& rowset = load_info->rowset;
707
3
        if (rowset != nullptr && meta != nullptr) {
708
2
            if (!rowset->is_pending()) {
709
0
                return Status::Error<TRANSACTION_ALREADY_COMMITTED>(
710
0
                        "could not delete transaction from engine, just remove it from memory not "
711
0
                        "delete from disk, because related rowset already published. partition_id: "
712
0
                        "{}, transaction_id: {}, tablet: {}, rowset id: {}, version: {}, state: {}",
713
0
                        key.first, key.second, tablet_info.to_string(),
714
0
                        rowset->rowset_id().to_string(), rowset->version().to_string(),
715
0
                        RowsetStatePB_Name(rowset->rowset_meta_state()));
716
2
            } else {
717
2
                static_cast<void>(RowsetMetaManager::remove(meta, tablet_uid, rowset->rowset_id()));
718
#ifndef BE_TEST
719
                _engine.add_unused_rowset(rowset);
720
#endif
721
2
                VLOG_NOTICE << "delete transaction from engine successfully."
722
0
                            << " partition_id: " << key.first << ", transaction_id: " << key.second
723
0
                            << ", tablet: " << tablet_info.to_string() << ", rowset: "
724
0
                            << (rowset != nullptr ? rowset->rowset_id().to_string() : "0");
725
2
            }
726
2
        }
727
3
        it->second.erase(load_itr);
728
3
    }
729
3
    if (it->second.empty()) {
730
3
        txn_tablet_map.erase(it);
731
3
        g_tablet_txn_info_txn_partitions_count << -1;
732
3
        _clear_txn_partition_map_unlocked(transaction_id, partition_id);
733
3
    }
734
3
    return Status::OK();
735
3
}
736
737
void TxnManager::get_tablet_related_txns(TTabletId tablet_id, TabletUid tablet_uid,
738
                                         int64_t* partition_id,
739
4
                                         std::set<int64_t>* transaction_ids) {
740
4
    if (partition_id == nullptr || transaction_ids == nullptr) {
741
0
        LOG(WARNING) << "parameter is null when get transactions by tablet";
742
0
        return;
743
0
    }
744
745
4
    TabletInfo tablet_info(tablet_id, tablet_uid);
746
8
    for (int32_t i = 0; i < _txn_map_shard_size; i++) {
747
4
        std::shared_lock txn_rdlock(_txn_map_locks[i]);
748
4
        txn_tablet_map_t& txn_tablet_map = _txn_tablet_maps[i];
749
4
        for (auto& it : txn_tablet_map) {
750
0
            if (it.second.find(tablet_info) != it.second.end()) {
751
0
                *partition_id = it.first.first;
752
0
                transaction_ids->insert(it.first.second);
753
0
                VLOG_NOTICE << "find transaction on tablet."
754
0
                            << "partition_id: " << it.first.first
755
0
                            << ", transaction_id: " << it.first.second
756
0
                            << ", tablet: " << tablet_info.to_string();
757
0
            }
758
0
        }
759
4
    }
760
4
}
761
762
// force drop all txns related with the tablet
763
// maybe lock error, because not get txn lock before remove from meta
764
void TxnManager::force_rollback_tablet_related_txns(OlapMeta* meta, TTabletId tablet_id,
765
0
                                                    TabletUid tablet_uid) {
766
0
    TabletInfo tablet_info(tablet_id, tablet_uid);
767
0
    for (int32_t i = 0; i < _txn_map_shard_size; i++) {
768
0
        std::lock_guard<std::shared_mutex> txn_wrlock(_txn_map_locks[i]);
769
0
        txn_tablet_map_t& txn_tablet_map = _txn_tablet_maps[i];
770
0
        for (auto it = txn_tablet_map.begin(); it != txn_tablet_map.end();) {
771
0
            auto load_itr = it->second.find(tablet_info);
772
0
            if (load_itr != it->second.end()) {
773
0
                auto& load_info = load_itr->second;
774
0
                auto& rowset = load_info->rowset;
775
0
                if (rowset != nullptr && meta != nullptr) {
776
0
                    LOG(INFO) << " delete transaction from engine "
777
0
                              << ", tablet: " << tablet_info.to_string()
778
0
                              << ", rowset id: " << rowset->rowset_id();
779
0
                    static_cast<void>(
780
0
                            RowsetMetaManager::remove(meta, tablet_uid, rowset->rowset_id()));
781
0
                }
782
0
                LOG(INFO) << "remove tablet related txn."
783
0
                          << " partition_id: " << it->first.first
784
0
                          << ", transaction_id: " << it->first.second
785
0
                          << ", tablet: " << tablet_info.to_string() << ", rowset: "
786
0
                          << (rowset != nullptr ? rowset->rowset_id().to_string() : "0");
787
0
                it->second.erase(load_itr);
788
0
            }
789
0
            if (it->second.empty()) {
790
0
                _clear_txn_partition_map_unlocked(it->first.second, it->first.first);
791
0
                it = txn_tablet_map.erase(it);
792
0
                g_tablet_txn_info_txn_partitions_count << -1;
793
0
            } else {
794
0
                ++it;
795
0
            }
796
0
        }
797
0
    }
798
0
    if (meta != nullptr) {
799
0
        Status st = RowsetMetaManager::remove_tablet_related_partial_update_info(meta, tablet_id);
800
0
        if (!st.ok()) {
801
0
            LOG_WARNING("failed to partial update info, tablet_id={}, err={}", tablet_id,
802
0
                        st.to_string());
803
0
        }
804
0
    }
805
0
}
806
807
void TxnManager::get_txn_related_tablets(const TTransactionId transaction_id,
808
                                         TPartitionId partition_id,
809
35
                                         std::map<TabletInfo, RowsetSharedPtr>* tablet_infos) {
810
    // get tablets in this transaction
811
35
    pair<int64_t, int64_t> key(partition_id, transaction_id);
812
35
    std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
813
35
    txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
814
35
    auto it = txn_tablet_map.find(key);
815
35
    if (it == txn_tablet_map.end()) {
816
2
        VLOG_NOTICE << "could not find tablet for"
817
0
                    << " partition_id=" << partition_id << ", transaction_id=" << transaction_id;
818
2
        return;
819
2
    }
820
33
    auto& load_info_map = it->second;
821
822
    // each tablet
823
39
    for (auto& load_info : load_info_map) {
824
39
        const TabletInfo& tablet_info = load_info.first;
825
        // must not check rowset == null here, because if rowset == null
826
        // publish version should failed
827
39
        tablet_infos->emplace(tablet_info, load_info.second->rowset);
828
39
    }
829
33
}
830
831
0
void TxnManager::get_all_related_tablets(std::set<TabletInfo>* tablet_infos) {
832
0
    for (int32_t i = 0; i < _txn_map_shard_size; i++) {
833
0
        std::shared_lock txn_rdlock(_txn_map_locks[i]);
834
0
        for (auto& it : _txn_tablet_maps[i]) {
835
0
            for (auto& tablet_load_it : it.second) {
836
0
                tablet_infos->emplace(tablet_load_it.first);
837
0
            }
838
0
        }
839
0
    }
840
0
}
841
842
void TxnManager::get_all_commit_tablet_txn_info_by_tablet(
843
1
        const Tablet& tablet, CommitTabletTxnInfoVec* commit_tablet_txn_info_vec) {
844
2
    for (int32_t i = 0; i < _txn_map_shard_size; i++) {
845
1
        std::shared_lock txn_rdlock(_txn_map_locks[i]);
846
2
        for (const auto& [txn_key, load_info_map] : _txn_tablet_maps[i]) {
847
2
            auto tablet_load_it = load_info_map.find(tablet.get_tablet_info());
848
2
            if (tablet_load_it != load_info_map.end()) {
849
2
                const auto& [_, load_info] = *tablet_load_it;
850
2
                const auto& rowset = load_info->rowset;
851
2
                const auto& delete_bitmap = load_info->delete_bitmap;
852
2
                if (!rowset || !delete_bitmap) {
853
1
                    continue;
854
1
                }
855
1
                commit_tablet_txn_info_vec->push_back({
856
1
                        .transaction_id = txn_key.second,
857
1
                        .partition_id = txn_key.first,
858
1
                        .delete_bitmap = delete_bitmap,
859
1
                        .rowset_ids = load_info->rowset_ids,
860
1
                        .partial_update_info = load_info->partial_update_info,
861
1
                });
862
1
            }
863
2
        }
864
1
    }
865
1
}
866
867
0
void TxnManager::build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>* expire_txn_map) {
868
0
    int64_t now = UnixSeconds();
869
    // traverse the txn map, and get all expired txns
870
0
    for (int32_t i = 0; i < _txn_map_shard_size; i++) {
871
0
        std::shared_lock txn_rdlock(_txn_map_locks[i]);
872
0
        for (auto&& [txn_key, tablet_txn_infos] : _txn_tablet_maps[i]) {
873
0
            auto txn_id = txn_key.second;
874
0
            for (auto&& [tablet_info, txn_info] : tablet_txn_infos) {
875
0
                double diff = difftime(now, txn_info->creation_time);
876
0
                if (diff < config::pending_data_expire_time_sec) {
877
0
                    continue;
878
0
                }
879
880
0
                (*expire_txn_map)[tablet_info].push_back(txn_id);
881
0
                if (VLOG_IS_ON(3)) {
882
0
                    VLOG_NOTICE << "find expired txn."
883
0
                                << " tablet=" << tablet_info.to_string()
884
0
                                << " transaction_id=" << txn_id << " exist_sec=" << diff;
885
0
                }
886
0
            }
887
0
        }
888
0
    }
889
0
}
890
891
void TxnManager::get_partition_ids(const TTransactionId transaction_id,
892
0
                                   std::vector<TPartitionId>* partition_ids) {
893
0
    std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
894
0
    txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id);
895
0
    auto it = txn_partition_map.find(transaction_id);
896
0
    if (it != txn_partition_map.end()) {
897
0
        for (int64_t partition_id : it->second) {
898
0
            partition_ids->push_back(partition_id);
899
0
        }
900
0
    }
901
0
}
902
903
67
void TxnManager::_insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id) {
904
67
    txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id);
905
67
    auto find = txn_partition_map.find(transaction_id);
906
67
    if (find == txn_partition_map.end()) {
907
40
        txn_partition_map[transaction_id] = std::unordered_set<int64_t>();
908
40
    }
909
67
    txn_partition_map[transaction_id].insert(partition_id);
910
67
}
911
912
23
void TxnManager::_clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id) {
913
23
    txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id);
914
23
    auto it = txn_partition_map.find(transaction_id);
915
23
    if (it != txn_partition_map.end()) {
916
23
        it->second.erase(partition_id);
917
23
        if (it->second.empty()) {
918
23
            txn_partition_map.erase(it);
919
23
        }
920
23
    }
921
23
}
922
923
void TxnManager::add_txn_tablet_delta_writer(int64_t transaction_id, int64_t tablet_id,
924
0
                                             DeltaWriter* delta_writer) {
925
0
    std::lock_guard<std::shared_mutex> txn_wrlock(
926
0
            _get_txn_tablet_delta_writer_map_lock(transaction_id));
927
0
    txn_tablet_delta_writer_map_t& txn_tablet_delta_writer_map =
928
0
            _get_txn_tablet_delta_writer_map(transaction_id);
929
0
    auto find = txn_tablet_delta_writer_map.find(transaction_id);
930
0
    if (find == txn_tablet_delta_writer_map.end()) {
931
0
        txn_tablet_delta_writer_map[transaction_id] = std::map<int64_t, DeltaWriter*>();
932
0
    }
933
0
    txn_tablet_delta_writer_map[transaction_id][tablet_id] = delta_writer;
934
0
}
935
936
void TxnManager::finish_slave_tablet_pull_rowset(int64_t transaction_id, int64_t tablet_id,
937
0
                                                 int64_t node_id, bool is_succeed) {
938
0
    std::lock_guard<std::shared_mutex> txn_wrlock(
939
0
            _get_txn_tablet_delta_writer_map_lock(transaction_id));
940
0
    txn_tablet_delta_writer_map_t& txn_tablet_delta_writer_map =
941
0
            _get_txn_tablet_delta_writer_map(transaction_id);
942
0
    auto find_txn = txn_tablet_delta_writer_map.find(transaction_id);
943
0
    if (find_txn == txn_tablet_delta_writer_map.end()) {
944
0
        LOG(WARNING) << "delta writer manager is not exist, txn_id=" << transaction_id
945
0
                     << ", tablet_id=" << tablet_id;
946
0
        return;
947
0
    }
948
0
    auto find_tablet = txn_tablet_delta_writer_map[transaction_id].find(tablet_id);
949
0
    if (find_tablet == txn_tablet_delta_writer_map[transaction_id].end()) {
950
0
        LOG(WARNING) << "delta writer is not exist, txn_id=" << transaction_id
951
0
                     << ", tablet_id=" << tablet_id;
952
0
        return;
953
0
    }
954
0
    DeltaWriter* delta_writer = txn_tablet_delta_writer_map[transaction_id][tablet_id];
955
0
    delta_writer->finish_slave_tablet_pull_rowset(node_id, is_succeed);
956
0
}
957
958
0
void TxnManager::clear_txn_tablet_delta_writer(int64_t transaction_id) {
959
0
    std::lock_guard<std::shared_mutex> txn_wrlock(
960
0
            _get_txn_tablet_delta_writer_map_lock(transaction_id));
961
0
    txn_tablet_delta_writer_map_t& txn_tablet_delta_writer_map =
962
0
            _get_txn_tablet_delta_writer_map(transaction_id);
963
0
    auto it = txn_tablet_delta_writer_map.find(transaction_id);
964
0
    if (it != txn_tablet_delta_writer_map.end()) {
965
0
        txn_tablet_delta_writer_map.erase(it);
966
0
    }
967
0
    VLOG_CRITICAL << "remove delta writer manager, txn_id=" << transaction_id;
968
0
}
969
970
6
int64_t TxnManager::get_txn_by_tablet_version(int64_t tablet_id, int64_t version) {
971
6
    char key[16];
972
6
    memcpy(key, &tablet_id, sizeof(int64_t));
973
6
    memcpy(key + sizeof(int64_t), &version, sizeof(int64_t));
974
6
    CacheKey cache_key((const char*)&key, sizeof(key));
975
976
6
    auto* handle = _tablet_version_cache->lookup(cache_key);
977
6
    if (handle == nullptr) {
978
1
        return -1;
979
1
    }
980
5
    int64_t res = ((CacheValue*)_tablet_version_cache->value(handle))->value;
981
5
    _tablet_version_cache->release(handle);
982
5
    return res;
983
6
}
984
985
4
void TxnManager::update_tablet_version_txn(int64_t tablet_id, int64_t version, int64_t txn_id) {
986
4
    char key[16];
987
4
    memcpy(key, &tablet_id, sizeof(int64_t));
988
4
    memcpy(key + sizeof(int64_t), &version, sizeof(int64_t));
989
4
    CacheKey cache_key((const char*)&key, sizeof(key));
990
991
4
    auto* value = new CacheValue;
992
4
    value->value = txn_id;
993
4
    auto* handle = _tablet_version_cache->insert(cache_key, value, 1, sizeof(txn_id),
994
4
                                                 CachePriority::NORMAL);
995
4
    _tablet_version_cache->release(handle);
996
4
}
997
998
TxnState TxnManager::get_txn_state(TPartitionId partition_id, TTransactionId transaction_id,
999
0
                                   TTabletId tablet_id, TabletUid tablet_uid) {
1000
0
    pair<int64_t, int64_t> key(partition_id, transaction_id);
1001
0
    TabletInfo tablet_info(tablet_id, tablet_uid);
1002
1003
0
    std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
1004
1005
0
    auto& txn_tablet_map = _get_txn_tablet_map(transaction_id);
1006
0
    auto it = txn_tablet_map.find(key);
1007
0
    if (it == txn_tablet_map.end()) {
1008
0
        return TxnState::NOT_FOUND;
1009
0
    }
1010
1011
0
    auto& tablet_txn_info_map = it->second;
1012
0
    auto tablet_txn_info_iter = tablet_txn_info_map.find(tablet_info);
1013
0
    if (tablet_txn_info_iter == tablet_txn_info_map.end()) {
1014
0
        return TxnState::NOT_FOUND;
1015
0
    }
1016
1017
0
    const auto& txn_info = tablet_txn_info_iter->second;
1018
0
    return txn_info->state;
1019
0
}
1020
1021
} // namespace doris