Coverage Report

Created: 2026-05-20 07:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/txn/txn_manager.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/txn/txn_manager.h"
19
20
#include <bvar/bvar.h>
21
#include <fmt/format.h>
22
#include <fmt/ranges.h>
23
#include <thrift/protocol/TDebugProtocol.h>
24
#include <time.h>
25
26
#include <filesystem>
27
#include <iterator>
28
#include <list>
29
#include <new>
30
#include <ostream>
31
#include <queue>
32
#include <set>
33
#include <string>
34
#include <unordered_set>
35
36
#include "common/config.h"
37
#include "common/logging.h"
38
#include "common/status.h"
39
#include "load/delta_writer/delta_writer.h"
40
#include "storage/binlog.h"
41
#include "storage/data_dir.h"
42
#include "storage/olap_common.h"
43
#include "storage/partial_update_info.h"
44
#include "storage/rowset/beta_rowset.h"
45
#include "storage/rowset/pending_rowset_helper.h"
46
#include "storage/rowset/rowset_meta.h"
47
#include "storage/rowset/rowset_meta_manager.h"
48
#include "storage/schema_change/schema_change.h"
49
#include "storage/segment/segment_loader.h"
50
#include "storage/storage_engine.h"
51
#include "storage/tablet/tablet_manager.h"
52
#include "storage/tablet/tablet_meta.h"
53
#include "storage/tablet/tablet_meta_manager.h"
54
#include "storage/task/engine_publish_version_task.h"
55
#include "util/debug_points.h"
56
#include "util/time.h"
57
58
namespace doris {
59
class OlapMeta;
60
} // namespace doris
61
62
using std::map;
63
using std::pair;
64
using std::set;
65
using std::string;
66
using std::stringstream;
67
using std::vector;
68
69
namespace doris {
70
using namespace ErrorCode;
71
72
bvar::Adder<int64_t> g_tablet_txn_info_txn_partitions_count("tablet_txn_info_txn_partitions_count");
73
74
TxnManager::TxnManager(StorageEngine& engine, int32_t txn_map_shard_size, int32_t txn_shard_size)
75
382
        : _engine(engine),
76
382
          _txn_map_shard_size(txn_map_shard_size),
77
382
          _txn_shard_size(txn_shard_size) {
78
382
    DCHECK_GT(_txn_map_shard_size, 0);
79
382
    DCHECK_GT(_txn_shard_size, 0);
80
382
    DCHECK_EQ(_txn_map_shard_size & (_txn_map_shard_size - 1), 0);
81
382
    DCHECK_EQ(_txn_shard_size & (_txn_shard_size - 1), 0);
82
382
    _txn_map_locks = new std::shared_mutex[_txn_map_shard_size];
83
382
    _txn_tablet_maps = new txn_tablet_map_t[_txn_map_shard_size];
84
382
    _txn_partition_maps = new txn_partition_map_t[_txn_map_shard_size];
85
382
    _txn_mutex = new std::shared_mutex[_txn_shard_size];
86
382
    _txn_tablet_delta_writer_map = new txn_tablet_delta_writer_map_t[_txn_map_shard_size];
87
382
    _txn_tablet_delta_writer_map_locks = new std::shared_mutex[_txn_map_shard_size];
88
    // For debugging
89
382
    _tablet_version_cache = std::make_unique<TabletVersionCache>(100000);
90
382
}
91
92
// prepare txn should always be allowed because ingest task will be retried
93
// could not distinguish rollup, schema change or base table, prepare txn successfully will allow
94
// ingest retried
95
Status TxnManager::prepare_txn(TPartitionId partition_id, const Tablet& tablet,
96
                               TTransactionId transaction_id, const PUniqueId& load_id,
97
0
                               bool ingest) {
98
    // check if the tablet has already been shutdown. If it has, it indicates that
99
    // it is an old tablet, and data should not be imported into the old tablet.
100
    // Otherwise, it may lead to data loss during migration.
101
0
    if (tablet.tablet_state() == TABLET_SHUTDOWN) {
102
0
        return Status::InternalError<false>(
103
0
                "The tablet's state is shutdown, tablet_id: {}. The tablet may have been dropped "
104
0
                "or migrationed. Please check if the table has been dropped or try again.",
105
0
                tablet.tablet_id());
106
0
    }
107
0
    return prepare_txn(partition_id, transaction_id, tablet.tablet_id(), tablet.tablet_uid(),
108
0
                       load_id, ingest);
109
0
}
110
111
// most used for ut
112
Status TxnManager::prepare_txn(TPartitionId partition_id, TTransactionId transaction_id,
113
                               TTabletId tablet_id, TabletUid tablet_uid, const PUniqueId& load_id,
114
30.1k
                               bool ingest) {
115
30.1k
    TxnKey key(partition_id, transaction_id);
116
30.1k
    TabletInfo tablet_info(tablet_id, tablet_uid);
117
30.1k
    std::lock_guard<std::shared_mutex> txn_wrlock(_get_txn_map_lock(transaction_id));
118
30.1k
    txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
119
120
30.1k
    DBUG_EXECUTE_IF("TxnManager.prepare_txn.random_failed", {
121
30.1k
        if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
122
30.1k
            LOG_WARNING("TxnManager.prepare_txn.random_failed random failed")
123
30.1k
                    .tag("txn_id", transaction_id)
124
30.1k
                    .tag("tablet_id", tablet_id);
125
30.1k
            return Status::InternalError("debug prepare txn random failed");
126
30.1k
        }
127
30.1k
    });
128
30.1k
    DBUG_EXECUTE_IF("TxnManager.prepare_txn.wait", {
129
30.1k
        if (auto wait = dp->param<int>("duration", 0); wait > 0) {
130
30.1k
            LOG_WARNING("TxnManager.prepare_txn.wait")
131
30.1k
                    .tag("txn_id", transaction_id)
132
30.1k
                    .tag("tablet_id", tablet_id)
133
30.1k
                    .tag("wait ms", wait);
134
30.1k
            std::this_thread::sleep_for(std::chrono::milliseconds(wait));
135
30.1k
        }
136
30.1k
    });
137
138
    /// Step 1: check if the transaction is already exist
139
30.1k
    do {
140
30.1k
        auto iter = txn_tablet_map.find(key);
141
30.1k
        if (iter == txn_tablet_map.end()) {
142
4.06k
            break;
143
4.06k
        }
144
145
        // exist TxnKey
146
26.0k
        auto& txn_tablet_info_map = iter->second;
147
26.0k
        auto load_itr = txn_tablet_info_map.find(tablet_info);
148
26.0k
        if (load_itr == txn_tablet_info_map.end()) {
149
26.0k
            break;
150
26.0k
        }
151
152
        // found load for txn,tablet
153
2
        auto& load_info = load_itr->second;
154
        // case 1: user commit rowset, then the load id must be equal
155
        // check if load id is equal
156
2
        if (load_info->load_id.hi() == load_id.hi() && load_info->load_id.lo() == load_id.lo() &&
157
2
            load_info->rowset != nullptr) {
158
1
            LOG(WARNING) << "find transaction exists when add to engine."
159
1
                         << "partition_id: " << key.first << ", transaction_id: " << key.second
160
1
                         << ", tablet: " << tablet_info.to_string();
161
1
            return Status::OK();
162
1
        }
163
2
    } while (false);
164
165
    /// Step 2: check if there are too many transactions on running.
166
    // check if there are too many transactions on running.
167
    // if yes, reject the request.
168
30.1k
    txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id);
169
30.1k
    if (txn_partition_map.size() > config::max_runnings_transactions_per_txn_map) {
170
0
        return Status::Error<TOO_MANY_TRANSACTIONS>("too many transactions: {}, limit: {}",
171
0
                                                    txn_tablet_map.size(),
172
0
                                                    config::max_runnings_transactions_per_txn_map);
173
0
    }
174
175
    /// Step 3: Add transaction to engine
176
    // not found load id
177
    // case 1: user start a new txn, rowset = null
178
    // case 2: loading txn from meta env
179
30.1k
    auto load_info = std::make_shared<TabletTxnInfo>(load_id, nullptr, ingest);
180
30.1k
    load_info->prepare();
181
30.1k
    if (!txn_tablet_map.contains(key)) {
182
4.06k
        g_tablet_txn_info_txn_partitions_count << 1;
183
4.06k
    }
184
30.1k
    txn_tablet_map[key][tablet_info] = std::move(load_info);
185
30.1k
    _insert_txn_partition_map_unlocked(transaction_id, partition_id);
186
30.1k
    VLOG_NOTICE << "add transaction to engine successfully."
187
11
                << "partition_id: " << key.first << ", transaction_id: " << key.second
188
11
                << ", tablet: " << tablet_info.to_string();
189
30.1k
    return Status::OK();
190
30.1k
}
191
192
Status TxnManager::commit_txn(TPartitionId partition_id, const Tablet& tablet,
193
                              TTransactionId transaction_id, const PUniqueId& load_id,
194
                              const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard,
195
                              bool is_recovery,
196
                              std::shared_ptr<PartialUpdateInfo> partial_update_info,
197
30.1k
                              std::vector<RowsetSharedPtr>* attach_rowsets) {
198
30.1k
    return commit_txn(tablet.data_dir()->get_meta(), partition_id, transaction_id,
199
30.1k
                      tablet.tablet_id(), tablet.tablet_uid(), load_id, rowset_ptr,
200
30.1k
                      std::move(guard), is_recovery, partial_update_info, attach_rowsets);
201
30.1k
}
202
203
Status TxnManager::publish_txn(TPartitionId partition_id, const TabletSharedPtr& tablet,
204
                               TTransactionId transaction_id, const Version& version,
205
                               TabletPublishStatistics* stats,
206
                               std::shared_ptr<TabletTxnInfo>& extend_tablet_txn_info,
207
29.7k
                               const int64_t commit_tso) {
208
29.7k
    return publish_txn(tablet->data_dir()->get_meta(), partition_id, transaction_id,
209
29.7k
                       tablet->tablet_id(), tablet->tablet_uid(), version, stats,
210
29.7k
                       extend_tablet_txn_info, commit_tso);
211
29.7k
}
212
213
void TxnManager::abort_txn(TPartitionId partition_id, TTransactionId transaction_id,
214
0
                           TTabletId tablet_id, TabletUid tablet_uid) {
215
0
    pair<int64_t, int64_t> key(partition_id, transaction_id);
216
0
    TabletInfo tablet_info(tablet_id, tablet_uid);
217
218
0
    std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
219
220
0
    auto& txn_tablet_map = _get_txn_tablet_map(transaction_id);
221
0
    auto it = txn_tablet_map.find(key);
222
0
    if (it == txn_tablet_map.end()) {
223
0
        return;
224
0
    }
225
226
0
    auto& tablet_txn_info_map = it->second;
227
0
    auto tablet_txn_info_iter = tablet_txn_info_map.find(tablet_info);
228
0
    if (tablet_txn_info_iter == tablet_txn_info_map.end()) {
229
0
        return;
230
0
    }
231
232
0
    auto& txn_info = tablet_txn_info_iter->second;
233
0
    txn_info->abort();
234
0
}
235
236
// delete the txn from manager if it is not committed(not have a valid rowset)
237
Status TxnManager::rollback_txn(TPartitionId partition_id, const Tablet& tablet,
238
6
                                TTransactionId transaction_id) {
239
6
    return rollback_txn(partition_id, transaction_id, tablet.tablet_id(), tablet.tablet_uid());
240
6
}
241
242
Status TxnManager::delete_txn(TPartitionId partition_id, const TabletSharedPtr& tablet,
243
336
                              TTransactionId transaction_id) {
244
336
    return delete_txn(tablet->data_dir()->get_meta(), partition_id, transaction_id,
245
336
                      tablet->tablet_id(), tablet->tablet_uid());
246
336
}
247
248
void TxnManager::set_txn_related_delete_bitmap(
249
        TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id,
250
        TabletUid tablet_uid, bool unique_key_merge_on_write, DeleteBitmapPtr delete_bitmap,
251
        const RowsetIdUnorderedSet& rowset_ids,
252
16.1k
        std::shared_ptr<PartialUpdateInfo> partial_update_info) {
253
16.1k
    pair<int64_t, int64_t> key(partition_id, transaction_id);
254
16.1k
    TabletInfo tablet_info(tablet_id, tablet_uid);
255
256
16.1k
    std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id));
257
16.1k
    {
258
        // get tx
259
16.1k
        std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id));
260
16.1k
        txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
261
16.1k
        auto it = txn_tablet_map.find(key);
262
16.1k
        if (it == txn_tablet_map.end()) {
263
1
            LOG(WARNING) << "transaction_id: " << transaction_id
264
1
                         << " partition_id: " << partition_id << " may be cleared";
265
1
            return;
266
1
        }
267
16.1k
        auto load_itr = it->second.find(tablet_info);
268
16.1k
        if (load_itr == it->second.end()) {
269
0
            LOG(WARNING) << "transaction_id: " << transaction_id
270
0
                         << " partition_id: " << partition_id << " tablet_id: " << tablet_id
271
0
                         << " may be cleared";
272
0
            return;
273
0
        }
274
16.1k
        auto& load_info = load_itr->second;
275
16.1k
        load_info->unique_key_merge_on_write = unique_key_merge_on_write;
276
16.1k
        load_info->delete_bitmap = delete_bitmap;
277
16.1k
        load_info->rowset_ids = rowset_ids;
278
16.1k
        load_info->partial_update_info = partial_update_info;
279
16.1k
    }
280
16.1k
}
281
282
Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id,
283
                              TTransactionId transaction_id, TTabletId tablet_id,
284
                              TabletUid tablet_uid, const PUniqueId& load_id,
285
                              const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard,
286
                              bool is_recovery,
287
                              std::shared_ptr<PartialUpdateInfo> partial_update_info,
288
30.2k
                              std::vector<RowsetSharedPtr>* attach_rowsets) {
289
30.2k
    if (partition_id < 1 || transaction_id < 1 || tablet_id < 1) {
290
0
        LOG(WARNING) << "invalid commit req "
291
0
                     << " partition_id=" << partition_id << " transaction_id=" << transaction_id
292
0
                     << " tablet_id=" << tablet_id;
293
0
        return Status::InternalError("invalid partition id");
294
0
    }
295
296
30.2k
    pair<int64_t, int64_t> key(partition_id, transaction_id);
297
30.2k
    TabletInfo tablet_info(tablet_id, tablet_uid);
298
30.2k
    if (rowset_ptr == nullptr) {
299
0
        return Status::Error<ROWSET_INVALID>(
300
0
                "could not commit txn because rowset ptr is null. partition_id: {}, "
301
0
                "transaction_id: {}, tablet: {}",
302
0
                key.first, key.second, tablet_info.to_string());
303
0
    }
304
305
30.2k
    DBUG_EXECUTE_IF("TxnManager.commit_txn.random_failed", {
306
30.2k
        if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
307
30.2k
            LOG_WARNING("TxnManager.commit_txn.random_failed")
308
30.2k
                    .tag("txn_id", transaction_id)
309
30.2k
                    .tag("tablet_id", tablet_id);
310
30.2k
            return Status::InternalError("debug commit txn random failed");
311
30.2k
        }
312
30.2k
    });
313
30.2k
    DBUG_EXECUTE_IF("TxnManager.commit_txn.wait", {
314
30.2k
        if (auto wait = dp->param<int>("duration", 0); wait > 0) {
315
30.2k
            LOG_WARNING("TxnManager.commit_txn.wait")
316
30.2k
                    .tag("txn_id", transaction_id)
317
30.2k
                    .tag("tablet_id", tablet_id)
318
30.2k
                    .tag("wait ms", wait);
319
30.2k
            std::this_thread::sleep_for(std::chrono::milliseconds(wait));
320
30.2k
        }
321
30.2k
    });
322
323
30.2k
    std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id));
324
    // this while loop just run only once, just for if break
325
30.2k
    do {
326
        // get tx
327
30.2k
        std::shared_lock rdlock(_get_txn_map_lock(transaction_id));
328
30.2k
        auto rs_pb = rowset_ptr->rowset_meta()->get_rowset_pb();
329
        // TODO(dx): remove log after fix partition id eq 0 bug
330
30.2k
        if (!rs_pb.has_partition_id() || rs_pb.partition_id() == 0) {
331
1
            rowset_ptr->rowset_meta()->set_partition_id(partition_id);
332
1
            LOG(WARNING) << "cant get partition id from rs pb, get from func arg partition_id="
333
1
                         << partition_id;
334
1
        }
335
30.2k
        txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
336
30.2k
        auto it = txn_tablet_map.find(key);
337
30.2k
        if (it == txn_tablet_map.end()) {
338
24
            break;
339
24
        }
340
341
30.1k
        auto load_itr = it->second.find(tablet_info);
342
30.1k
        if (load_itr == it->second.end()) {
343
48
            break;
344
48
        }
345
346
        // found load for txn,tablet
347
        // case 1: user commit rowset, then the load id must be equal
348
30.1k
        auto& load_info = load_itr->second;
349
        // check if load id is equal
350
30.1k
        if (load_info->rowset == nullptr) {
351
30.1k
            break;
352
30.1k
        }
353
354
2
        if (load_info->load_id.hi() != load_id.hi() || load_info->load_id.lo() != load_id.lo()) {
355
0
            break;
356
0
        }
357
358
        // find a rowset with same rowset id, then it means a duplicate call
359
2
        if (load_info->rowset->rowset_id() == rowset_ptr->rowset_id()) {
360
1
            LOG(INFO) << "find rowset exists when commit transaction to engine."
361
1
                      << "partition_id: " << key.first << ", transaction_id: " << key.second
362
1
                      << ", tablet: " << tablet_info.to_string()
363
1
                      << ", rowset_id: " << load_info->rowset->rowset_id();
364
            // Should not remove this rowset from pending rowsets
365
1
            load_info->pending_rs_guard = std::move(guard);
366
1
            return Status::OK();
367
1
        }
368
369
        // find a rowset with different rowset id, then it should not happen, just return errors
370
1
        return Status::Error<PUSH_TRANSACTION_ALREADY_EXIST>(
371
1
                "find rowset exists when commit transaction to engine. but rowset ids are not "
372
1
                "same. partition_id: {}, transaction_id: {}, tablet: {}, exist rowset_id: {}, new "
373
1
                "rowset_id: {}",
374
1
                key.first, key.second, tablet_info.to_string(),
375
1
                load_info->rowset->rowset_id().to_string(), rowset_ptr->rowset_id().to_string());
376
2
    } while (false);
377
378
    // if not in recovery mode, then should persist the meta to meta env
379
    // save meta need access disk, it maybe very slow, so that it is not in global txn lock
380
    // it is under a single txn lock
381
30.2k
    if (!is_recovery) {
382
30.1k
        std::optional<BinlogFormatPB> binlog_format;
383
30.1k
        std::map<RowsetId, RowsetMetaPB> attach_rowset_map;
384
30.1k
        if (attach_rowsets != nullptr) {
385
30.1k
            for (const auto& rs : *attach_rowsets) {
386
2
                if (rs == nullptr) {
387
0
                    continue;
388
0
                }
389
2
                attach_rowset_map.emplace(rs->rowset_id(), rs->rowset_meta()->get_rowset_pb());
390
2
            }
391
30.1k
            if (!attach_rowset_map.empty()) {
392
2
                binlog_format = BinlogFormatPB::ROW;
393
2
            }
394
30.1k
        }
395
30.1k
        Status save_status =
396
30.1k
                RowsetMetaManager::save(meta, tablet_uid, rowset_ptr->rowset_id(),
397
30.1k
                                        rowset_ptr->rowset_meta()->get_rowset_pb(), binlog_format,
398
30.1k
                                        attach_rowset_map.empty() ? nullptr : &attach_rowset_map);
399
30.1k
        DBUG_EXECUTE_IF("TxnManager.RowsetMetaManager.save_wait", {
400
30.1k
            if (auto wait = dp->param<int>("duration", 0); wait > 0) {
401
30.1k
                LOG_WARNING("TxnManager.RowsetMetaManager.save_wait")
402
30.1k
                        .tag("txn_id", transaction_id)
403
30.1k
                        .tag("tablet_id", tablet_id)
404
30.1k
                        .tag("wait ms", wait);
405
30.1k
                std::this_thread::sleep_for(std::chrono::milliseconds(wait));
406
30.1k
            }
407
30.1k
        });
408
30.1k
        if (!save_status.ok()) {
409
0
            save_status.append(fmt::format(", txn id: {}", transaction_id));
410
0
            return save_status;
411
0
        }
412
413
30.1k
        if (partial_update_info && partial_update_info->is_partial_update()) {
414
328
            PartialUpdateInfoPB partial_update_info_pb;
415
328
            partial_update_info->to_pb(&partial_update_info_pb);
416
328
            save_status = RowsetMetaManager::save_partial_update_info(
417
328
                    meta, tablet_id, partition_id, transaction_id, partial_update_info_pb);
418
328
            if (!save_status.ok()) {
419
0
                save_status.append(fmt::format(", txn_id: {}", transaction_id));
420
0
                return save_status;
421
0
            }
422
328
        }
423
30.1k
    }
424
425
30.2k
    TabletSharedPtr tablet;
426
30.2k
    std::shared_ptr<PartialUpdateInfo> decoded_partial_update_info {nullptr};
427
30.2k
    if (is_recovery) {
428
60
        tablet = _engine.tablet_manager()->get_tablet(tablet_id, tablet_uid);
429
60
        if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) {
430
0
            PartialUpdateInfoPB partial_update_info_pb;
431
0
            auto st = RowsetMetaManager::try_get_partial_update_info(
432
0
                    meta, tablet_id, partition_id, transaction_id, &partial_update_info_pb);
433
0
            if (st.ok()) {
434
0
                decoded_partial_update_info = std::make_shared<PartialUpdateInfo>();
435
0
                decoded_partial_update_info->from_pb(&partial_update_info_pb);
436
0
                DCHECK(decoded_partial_update_info->is_partial_update());
437
0
            } else if (!st.is<META_KEY_NOT_FOUND>()) {
438
                // the load is not a partial update
439
0
                return st;
440
0
            }
441
0
        }
442
60
    }
443
444
30.2k
    {
445
30.2k
        std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id));
446
30.2k
        auto load_info = std::make_shared<TabletTxnInfo>(load_id, rowset_ptr);
447
30.2k
        if (attach_rowsets != nullptr) {
448
30.1k
            load_info->attach_rowsets = *attach_rowsets;
449
30.1k
        }
450
30.2k
        load_info->pending_rs_guard = std::move(guard);
451
30.2k
        if (is_recovery) {
452
60
            if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) {
453
0
                load_info->unique_key_merge_on_write = true;
454
0
                load_info->delete_bitmap.reset(new DeleteBitmap(tablet->tablet_id()));
455
0
                if (decoded_partial_update_info) {
456
0
                    LOG_INFO(
457
0
                            "get partial update info from RocksDB during recovery. txn_id={}, "
458
0
                            "partition_id={}, tablet_id={}, partial_update_info=[{}]",
459
0
                            transaction_id, partition_id, tablet_id,
460
0
                            decoded_partial_update_info->summary());
461
0
                    load_info->partial_update_info = decoded_partial_update_info;
462
0
                }
463
0
            }
464
60
        }
465
466
        // For binlog<Row> txn (attached rowsets exist), binlog_delvec is only needed for publish
467
        // phase to copy delete bitmap deltas.
468
30.2k
        if (attach_rowsets != nullptr && !attach_rowsets->empty()) {
469
2
            TabletSharedPtr t = _engine.tablet_manager()->get_tablet(tablet_id, tablet_uid);
470
2
            if (t != nullptr && t->enable_unique_key_merge_on_write()) {
471
0
                load_info->binlog_delvec.reset(new DeleteBitmap(t->tablet_id()));
472
0
            }
473
2
        }
474
30.2k
        load_info->commit();
475
476
30.2k
        txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
477
30.2k
        txn_tablet_map[key][tablet_info] = std::move(load_info);
478
30.2k
        _insert_txn_partition_map_unlocked(transaction_id, partition_id);
479
30.2k
        VLOG_NOTICE << "commit transaction to engine successfully."
480
17
                    << " partition_id: " << key.first << ", transaction_id: " << key.second
481
17
                    << ", tablet: " << tablet_info.to_string()
482
17
                    << ", rowsetid: " << rowset_ptr->rowset_id()
483
17
                    << ", version: " << rowset_ptr->version().first;
484
30.2k
    }
485
30.2k
    return Status::OK();
486
30.2k
}
487
488
// remove a txn from txn manager
489
Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,
490
                               TTransactionId transaction_id, TTabletId tablet_id,
491
                               TabletUid tablet_uid, const Version& version,
492
                               TabletPublishStatistics* stats,
493
                               std::shared_ptr<TabletTxnInfo>& extend_tablet_txn_info,
494
29.7k
                               const int64_t commit_tso) {
495
29.7k
    auto tablet = _engine.tablet_manager()->get_tablet(tablet_id);
496
29.7k
    if (tablet == nullptr) {
497
0
        return Status::OK();
498
0
    }
499
29.7k
    DCHECK(stats != nullptr);
500
501
29.7k
    pair<int64_t, int64_t> key(partition_id, transaction_id);
502
29.7k
    TabletInfo tablet_info(tablet_id, tablet_uid);
503
29.7k
    RowsetSharedPtr rowset;
504
29.7k
    std::shared_ptr<TabletTxnInfo> tablet_txn_info;
505
29.7k
    int64_t t1 = MonotonicMicros();
506
    /// Step 1: get rowset, tablet_txn_info by key
507
29.7k
    {
508
29.7k
        std::shared_lock txn_rlock(_get_txn_lock(transaction_id));
509
29.7k
        std::shared_lock txn_map_rlock(_get_txn_map_lock(transaction_id));
510
29.7k
        stats->lock_wait_time_us += MonotonicMicros() - t1;
511
512
29.7k
        txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
513
29.7k
        if (auto it = txn_tablet_map.find(key); it != txn_tablet_map.end()) {
514
29.7k
            auto& tablet_map = it->second;
515
29.7k
            if (auto txn_info_iter = tablet_map.find(tablet_info);
516
29.7k
                txn_info_iter != tablet_map.end()) {
517
                // found load for txn,tablet
518
                // case 1: user commit rowset, then the load id must be equal
519
29.7k
                tablet_txn_info = txn_info_iter->second;
520
29.7k
                extend_tablet_txn_info = tablet_txn_info;
521
29.7k
                rowset = tablet_txn_info->rowset;
522
29.7k
            }
523
29.7k
        }
524
29.7k
    }
525
29.7k
    if (rowset == nullptr) {
526
1
        return Status::Error<TRANSACTION_NOT_EXIST>(
527
1
                "publish txn failed, rowset not found. partition_id={}, transaction_id={}, "
528
1
                "tablet={}, commit_tso={}",
529
1
                partition_id, transaction_id, tablet_info.to_string(), commit_tso);
530
1
    }
531
29.7k
    DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_before_save_rs_meta", {
532
29.7k
        if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
533
29.7k
            LOG_WARNING("TxnManager.publish_txn.random_failed_before_save_rs_meta")
534
29.7k
                    .tag("txn_id", transaction_id)
535
29.7k
                    .tag("tablet_id", tablet_id);
536
29.7k
            return Status::InternalError("debug publish txn before save rs meta random failed");
537
29.7k
        }
538
29.7k
    });
539
29.7k
    DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_before_save_rs_meta", {
540
29.7k
        if (auto wait = dp->param<int>("duration", 0); wait > 0) {
541
29.7k
            LOG_WARNING("TxnManager.publish_txn.wait_before_save_rs_meta")
542
29.7k
                    .tag("txn_id", transaction_id)
543
29.7k
                    .tag("tablet_id", tablet_id)
544
29.7k
                    .tag("wait ms", wait);
545
29.7k
            std::this_thread::sleep_for(std::chrono::milliseconds(wait));
546
29.7k
        }
547
29.7k
    });
548
549
    /// Step 2: make rowset visible
550
    // save meta need access disk, it maybe very slow, so that it is not in global txn lock
551
    // it is under a single txn lock
552
    // TODO(ygl): rowset is already set version here, memory is changed, if save failed
553
    // it maybe a fatal error
554
29.7k
    rowset->make_visible(version, commit_tso);
555
556
    // Make all attached rowsets visible together.
557
29.7k
    for (const auto& rs : tablet_txn_info->attach_rowsets) {
558
1
        if (rs == nullptr) {
559
0
            continue;
560
0
        }
561
1
        rs->make_visible(version, commit_tso);
562
1
    }
563
564
29.7k
    DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_after_save_rs_meta", {
565
29.7k
        if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
566
29.7k
            LOG_WARNING("TxnManager.publish_txn.random_failed_after_save_rs_meta")
567
29.7k
                    .tag("txn_id", transaction_id)
568
29.7k
                    .tag("tablet_id", tablet_id);
569
29.7k
            return Status::InternalError("debug publish txn after save rs meta random failed");
570
29.7k
        }
571
29.7k
    });
572
29.7k
    DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_after_save_rs_meta", {
573
29.7k
        if (auto wait = dp->param<int>("duration", 0); wait > 0) {
574
29.7k
            LOG_WARNING("TxnManager.publish_txn.wait_after_save_rs_meta")
575
29.7k
                    .tag("txn_id", transaction_id)
576
29.7k
                    .tag("tablet_id", tablet_id)
577
29.7k
                    .tag("wait ms", wait);
578
29.7k
            std::this_thread::sleep_for(std::chrono::milliseconds(wait));
579
29.7k
        }
580
29.7k
    });
581
    // update delete_bitmap
582
29.7k
    if (tablet_txn_info->unique_key_merge_on_write) {
583
16.0k
        int64_t t2 = MonotonicMicros();
584
16.0k
        if (rowset->num_segments() > 1 &&
585
16.0k
            !tablet_txn_info->delete_bitmap->has_calculated_for_multi_segments(
586
0
                    rowset->rowset_id())) {
587
            // delete bitmap is empty, should re-calculate delete bitmaps between segments
588
0
            std::vector<segment_v2::SegmentSharedPtr> segments;
589
0
            RETURN_IF_ERROR(std::static_pointer_cast<BetaRowset>(rowset)->load_segments(&segments));
590
0
            RETURN_IF_ERROR(tablet->calc_delete_bitmap_between_segments(
591
0
                    rowset->tablet_schema(), rowset->rowset_id(), segments,
592
0
                    tablet_txn_info->delete_bitmap));
593
0
        }
594
595
16.0k
        RETURN_IF_ERROR(
596
16.0k
                Tablet::update_delete_bitmap(tablet, tablet_txn_info.get(), transaction_id));
597
16.0k
        int64_t t3 = MonotonicMicros();
598
16.0k
        stats->calc_delete_bitmap_time_us = t3 - t2;
599
16.0k
        RETURN_IF_ERROR(TabletMetaManager::save_delete_bitmap(
600
16.0k
                tablet->data_dir(), tablet->tablet_id(), tablet_txn_info->delete_bitmap,
601
16.0k
                tablet_txn_info->binlog_delvec, version.second));
602
16.0k
        stats->save_meta_time_us = MonotonicMicros() - t3;
603
16.0k
    }
604
605
    /// Step 3:  add to binlog
606
29.7k
    std::optional<BinlogFormatPB> binlog_format;
607
29.7k
    if (tablet->enable_ccr_binlog()) {
608
0
        binlog_format = BinlogFormatPB::STATEMENT_AND_SNAPSHOT;
609
0
        auto status = rowset->add_to_binlog();
610
0
        if (!status.ok()) {
611
0
            return Status::Error<ROWSET_ADD_TO_BINLOG_FAILED>(
612
0
                    "add rowset to binlog failed. when publish txn rowset_id: {}, tablet id: {}, "
613
0
                    "txn id: {}, status: {}",
614
0
                    rowset->rowset_id().to_string(), tablet_id, transaction_id,
615
0
                    status.to_string_no_stack());
616
0
        }
617
0
    }
618
619
29.7k
    std::map<RowsetId, RowsetMetaPB> attach_rowset_map;
620
29.7k
    if (tablet_txn_info != nullptr) {
621
29.6k
        for (const auto& rs : tablet_txn_info->attach_rowsets) {
622
1
            if (rs == nullptr) {
623
0
                continue;
624
0
            }
625
1
            attach_rowset_map.emplace(rs->rowset_id(), rs->rowset_meta()->get_rowset_pb());
626
1
        }
627
29.6k
        if (!attach_rowset_map.empty()) {
628
1
            binlog_format = BinlogFormatPB::ROW;
629
1
        }
630
29.6k
    }
631
632
    /// Step 4: save meta
633
29.7k
    int64_t t5 = MonotonicMicros();
634
29.7k
    auto status = RowsetMetaManager::save(meta, tablet_uid, rowset->rowset_id(),
635
29.7k
                                          rowset->rowset_meta()->get_rowset_pb(), binlog_format,
636
29.7k
                                          attach_rowset_map.empty() ? nullptr : &attach_rowset_map);
637
29.7k
    stats->save_meta_time_us += MonotonicMicros() - t5;
638
29.7k
    if (!status.ok()) {
639
0
        status.append(fmt::format(", txn id: {}", transaction_id));
640
0
        return status;
641
0
    }
642
643
29.7k
    if (tablet_txn_info->unique_key_merge_on_write && tablet_txn_info->partial_update_info &&
644
29.7k
        tablet_txn_info->partial_update_info->is_partial_update()) {
645
328
        status = RowsetMetaManager::remove_partial_update_info(meta, tablet_id, partition_id,
646
328
                                                               transaction_id);
647
328
        if (!status) {
648
            // discard the error status and print the warning log
649
0
            LOG_WARNING(
650
0
                    "fail to remove partial update info from RocksDB. txn_id={}, rowset_id={}, "
651
0
                    "tablet_id={}, tablet_uid={}",
652
0
                    transaction_id, rowset->rowset_id().to_string(), tablet_id,
653
0
                    tablet_uid.to_string());
654
0
        }
655
328
    }
656
657
    // TODO(Drogon): remove these test codes
658
29.7k
    if (tablet->enable_ccr_binlog()) {
659
0
        auto version_str = fmt::format("{}", version.first);
660
0
        VLOG_DEBUG << fmt::format("tabletid: {}, version: {}, binlog filepath: {}", tablet_id,
661
0
                                  version_str, tablet->get_binlog_filepath(version_str));
662
0
    }
663
664
    /// Step 5: remove tablet_info from tnx_tablet_map
665
    // txn_tablet_map[key] empty, remove key from txn_tablet_map
666
29.7k
    int64_t t6 = MonotonicMicros();
667
29.7k
    std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id));
668
29.7k
    std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id));
669
29.7k
    stats->lock_wait_time_us += MonotonicMicros() - t6;
670
29.7k
    _remove_txn_tablet_info_unlocked(partition_id, transaction_id, tablet_id, tablet_uid, txn_lock,
671
29.7k
                                     wrlock);
672
18.4E
    VLOG_NOTICE << "publish txn successfully."
673
18.4E
                << " partition_id: " << key.first << ", txn_id: " << key.second
674
18.4E
                << ", tablet_id: " << tablet_info.tablet_id << ", rowsetid: " << rowset->rowset_id()
675
18.4E
                << ", version: " << version.first << "," << version.second;
676
29.7k
    return status;
677
29.7k
}
678
679
void TxnManager::_remove_txn_tablet_info_unlocked(TPartitionId partition_id,
680
                                                  TTransactionId transaction_id,
681
                                                  TTabletId tablet_id, TabletUid tablet_uid,
682
                                                  std::lock_guard<std::shared_mutex>& txn_lock,
683
29.7k
                                                  std::lock_guard<std::shared_mutex>& wrlock) {
684
29.7k
    std::pair<int64_t, int64_t> key {partition_id, transaction_id};
685
29.7k
    TabletInfo tablet_info {tablet_id, tablet_uid};
686
29.7k
    txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
687
29.7k
    if (auto it = txn_tablet_map.find(key); it != txn_tablet_map.end()) {
688
29.7k
        it->second.erase(tablet_info);
689
29.7k
        if (it->second.empty()) {
690
4.03k
            txn_tablet_map.erase(it);
691
4.03k
            g_tablet_txn_info_txn_partitions_count << -1;
692
4.03k
            _clear_txn_partition_map_unlocked(transaction_id, partition_id);
693
4.03k
        }
694
29.7k
    }
695
29.7k
}
696
697
void TxnManager::remove_txn_tablet_info(TPartitionId partition_id, TTransactionId transaction_id,
698
0
                                        TTabletId tablet_id, TabletUid tablet_uid) {
699
0
    std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id));
700
0
    std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id));
701
0
    _remove_txn_tablet_info_unlocked(partition_id, transaction_id, tablet_id, tablet_uid, txn_lock,
702
0
                                     wrlock);
703
0
}
704
705
// txn could be rollbacked if it does not have related rowset
706
// if the txn has related rowset then could not rollback it, because it
707
// may be committed in another thread and our current thread meets errors when writing to data file
708
// BE has to wait for fe call clear txn api
709
Status TxnManager::rollback_txn(TPartitionId partition_id, TTransactionId transaction_id,
710
8
                                TTabletId tablet_id, TabletUid tablet_uid) {
711
8
    pair<int64_t, int64_t> key(partition_id, transaction_id);
712
8
    TabletInfo tablet_info(tablet_id, tablet_uid);
713
714
8
    std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id));
715
8
    txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
716
717
8
    auto it = txn_tablet_map.find(key);
718
8
    if (it == txn_tablet_map.end()) {
719
0
        return Status::OK();
720
0
    }
721
722
8
    auto& tablet_txn_info_map = it->second;
723
8
    if (auto load_itr = tablet_txn_info_map.find(tablet_info);
724
8
        load_itr != tablet_txn_info_map.end()) {
725
        // found load for txn,tablet
726
        // case 1: user commit rowset, then the load id must be equal
727
8
        const auto& load_info = load_itr->second;
728
8
        if (load_info->rowset != nullptr) {
729
1
            return Status::Error<TRANSACTION_ALREADY_COMMITTED>(
730
1
                    "if rowset is not null, it means other thread may commit the rowset should "
731
1
                    "not delete txn any more");
732
1
        }
733
8
    }
734
735
7
    tablet_txn_info_map.erase(tablet_info);
736
7
    LOG(INFO) << "rollback transaction from engine successfully."
737
7
              << " partition_id: " << key.first << ", transaction_id: " << key.second
738
7
              << ", tablet: " << tablet_info.to_string();
739
7
    if (tablet_txn_info_map.empty()) {
740
7
        txn_tablet_map.erase(it);
741
7
        g_tablet_txn_info_txn_partitions_count << -1;
742
7
        _clear_txn_partition_map_unlocked(transaction_id, partition_id);
743
7
    }
744
7
    return Status::OK();
745
8
}
746
747
// fe call this api to clear unused rowsets in be
748
// could not delete the rowset if it already has a valid version
749
Status TxnManager::delete_txn(OlapMeta* meta, TPartitionId partition_id,
750
                              TTransactionId transaction_id, TTabletId tablet_id,
751
341
                              TabletUid tablet_uid) {
752
341
    pair<int64_t, int64_t> key(partition_id, transaction_id);
753
341
    TabletInfo tablet_info(tablet_id, tablet_uid);
754
341
    std::lock_guard<std::shared_mutex> txn_wrlock(_get_txn_map_lock(transaction_id));
755
341
    txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
756
341
    auto it = txn_tablet_map.find(key);
757
341
    if (it == txn_tablet_map.end()) {
758
0
        return Status::Error<TRANSACTION_NOT_EXIST>("key not founded from txn_tablet_map");
759
0
    }
760
341
    Status st = Status::OK();
761
341
    auto load_itr = it->second.find(tablet_info);
762
341
    if (load_itr != it->second.end()) {
763
        // found load for txn,tablet
764
        // case 1: user commit rowset, then the load id must be equal
765
341
        auto& load_info = load_itr->second;
766
341
        auto& rowset = load_info->rowset;
767
341
        if (rowset != nullptr && meta != nullptr) {
768
340
            if (!rowset->is_pending()) {
769
1
                st = Status::Error<TRANSACTION_ALREADY_COMMITTED>(
770
1
                        "could not delete transaction from engine, just remove it from memory not "
771
1
                        "delete from disk, because related rowset already published. partition_id: "
772
1
                        "{}, transaction_id: {}, tablet: {}, rowset id: {}, version: {}, state: {}",
773
1
                        key.first, key.second, tablet_info.to_string(),
774
1
                        rowset->rowset_id().to_string(), rowset->version().to_string(),
775
1
                        RowsetStatePB_Name(rowset->rowset_meta_state()));
776
339
            } else {
777
339
                for (const auto& attach_rowset : load_info->attach_rowsets) {
778
1
                    static_cast<void>(RowsetMetaManager::remove_row_binlog(
779
1
                            meta, tablet_uid, rowset->rowset_id(), attach_rowset->rowset_id()));
780
1
                    _engine.add_unused_rowset(attach_rowset);
781
1
                }
782
339
                static_cast<void>(RowsetMetaManager::remove(meta, tablet_uid, rowset->rowset_id()));
783
339
#ifndef BE_TEST
784
339
                _engine.add_unused_rowset(rowset);
785
339
#endif
786
339
                VLOG_NOTICE << "delete transaction from engine successfully."
787
3
                            << " partition_id: " << key.first << ", transaction_id: " << key.second
788
3
                            << ", tablet: " << tablet_info.to_string() << ", rowset: "
789
3
                            << (rowset != nullptr ? rowset->rowset_id().to_string() : "0")
790
3
                            << ", binlog<row> rowset: "
791
3
                            << (load_info->attach_rowsets.empty()
792
3
                                        ? "0"
793
3
                                        : load_info->attach_rowsets[0]->rowset_id().to_string());
794
339
            }
795
340
        }
796
341
        it->second.erase(load_itr);
797
341
    }
798
341
    if (it->second.empty()) {
799
21
        txn_tablet_map.erase(it);
800
21
        g_tablet_txn_info_txn_partitions_count << -1;
801
21
        _clear_txn_partition_map_unlocked(transaction_id, partition_id);
802
21
    }
803
341
    return st;
804
341
}
805
806
void TxnManager::get_tablet_related_txns(TTabletId tablet_id, TabletUid tablet_uid,
807
                                         int64_t* partition_id,
808
6
                                         std::set<int64_t>* transaction_ids) {
809
6
    if (partition_id == nullptr || transaction_ids == nullptr) {
810
0
        LOG(WARNING) << "parameter is null when get transactions by tablet";
811
0
        return;
812
0
    }
813
814
6
    TabletInfo tablet_info(tablet_id, tablet_uid);
815
12
    for (int32_t i = 0; i < _txn_map_shard_size; i++) {
816
6
        std::shared_lock txn_rdlock(_txn_map_locks[i]);
817
6
        txn_tablet_map_t& txn_tablet_map = _txn_tablet_maps[i];
818
6
        for (auto& it : txn_tablet_map) {
819
1
            if (it.second.find(tablet_info) != it.second.end()) {
820
1
                *partition_id = it.first.first;
821
1
                transaction_ids->insert(it.first.second);
822
1
                VLOG_NOTICE << "find transaction on tablet."
823
1
                            << "partition_id: " << it.first.first
824
1
                            << ", transaction_id: " << it.first.second
825
1
                            << ", tablet: " << tablet_info.to_string();
826
1
            }
827
1
        }
828
6
    }
829
6
}
830
831
// force drop all txns related with the tablet
832
// maybe lock error, because not get txn lock before remove from meta
833
void TxnManager::force_rollback_tablet_related_txns(OlapMeta* meta, TTabletId tablet_id,
834
4.61k
                                                    TabletUid tablet_uid) {
835
4.61k
    TabletInfo tablet_info(tablet_id, tablet_uid);
836
4.59M
    for (int32_t i = 0; i < _txn_map_shard_size; i++) {
837
4.59M
        std::lock_guard<std::shared_mutex> txn_wrlock(_txn_map_locks[i]);
838
4.59M
        txn_tablet_map_t& txn_tablet_map = _txn_tablet_maps[i];
839
4.59M
        for (auto it = txn_tablet_map.begin(); it != txn_tablet_map.end();) {
840
8.47k
            auto load_itr = it->second.find(tablet_info);
841
8.47k
            if (load_itr != it->second.end()) {
842
22
                auto& load_info = load_itr->second;
843
22
                auto& rowset = load_info->rowset;
844
22
                if (rowset != nullptr && meta != nullptr) {
845
22
                    LOG(INFO) << " delete transaction from engine "
846
22
                              << ", tablet: " << tablet_info.to_string()
847
22
                              << ", rowset id: " << rowset->rowset_id();
848
                    // clean attach rowset first
849
22
                    for (const auto& attach_rowset : load_info->attach_rowsets) {
850
0
                        Status status = RowsetMetaManager::remove_row_binlog(
851
0
                                meta, tablet_uid, rowset->rowset_id(), attach_rowset->rowset_id());
852
0
                        if (!status.ok()) {
853
0
                            if (status.is<META_KEY_NOT_FOUND>()) {
854
0
                                continue;
855
0
                            }
856
0
                        }
857
0
                    }
858
22
                    static_cast<void>(
859
22
                            RowsetMetaManager::remove(meta, tablet_uid, rowset->rowset_id()));
860
22
                }
861
22
                LOG(INFO) << "remove tablet related txn."
862
22
                          << " partition_id: " << it->first.first
863
22
                          << ", transaction_id: " << it->first.second
864
22
                          << ", tablet: " << tablet_info.to_string() << ", rowset: "
865
22
                          << (rowset != nullptr ? rowset->rowset_id().to_string() : "0")
866
22
                          << ", binlog<row> rowset: "
867
22
                          << (load_info->attach_rowsets.empty()
868
22
                                      ? "0"
869
22
                                      : load_info->attach_rowsets[0]->rowset_id().to_string());
870
22
                it->second.erase(load_itr);
871
22
            }
872
8.47k
            if (it->second.empty()) {
873
4
                _clear_txn_partition_map_unlocked(it->first.second, it->first.first);
874
4
                it = txn_tablet_map.erase(it);
875
4
                g_tablet_txn_info_txn_partitions_count << -1;
876
8.46k
            } else {
877
8.46k
                ++it;
878
8.46k
            }
879
8.47k
        }
880
4.59M
    }
881
4.61k
    if (meta != nullptr) {
882
4.61k
        Status st = RowsetMetaManager::remove_tablet_related_partial_update_info(meta, tablet_id);
883
4.61k
        if (!st.ok()) {
884
0
            LOG_WARNING("failed to partial update info, tablet_id={}, err={}", tablet_id,
885
0
                        st.to_string());
886
0
        }
887
4.61k
    }
888
4.61k
}
889
890
void TxnManager::get_txn_related_tablets(
891
        const TTransactionId transaction_id, TPartitionId partition_id,
892
        std::map<TabletInfo, RowsetSharedPtr>* tablet_infos,
893
4.07k
        std::map<TabletInfo, std::vector<RowsetSharedPtr>>* tablet_attach_rowsets) {
894
    // get tablets in this transaction
895
4.07k
    pair<int64_t, int64_t> key(partition_id, transaction_id);
896
4.07k
    std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
897
4.07k
    txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
898
4.07k
    auto it = txn_tablet_map.find(key);
899
4.07k
    if (it == txn_tablet_map.end()) {
900
2
        VLOG_NOTICE << "could not find tablet for"
901
1
                    << " partition_id=" << partition_id << ", transaction_id=" << transaction_id;
902
2
        return;
903
2
    }
904
4.07k
    auto& load_info_map = it->second;
905
906
    // each tablet
907
30.1k
    for (auto& load_info : load_info_map) {
908
30.1k
        const TabletInfo& tablet_info = load_info.first;
909
        // must not check rowset == null here, because if rowset == null
910
        // publish version should failed
911
30.1k
        tablet_infos->emplace(tablet_info, load_info.second->rowset);
912
30.1k
        if (tablet_attach_rowsets != nullptr) {
913
29.7k
            tablet_attach_rowsets->emplace(tablet_info, load_info.second->attach_rowsets);
914
29.7k
        }
915
30.1k
    }
916
4.07k
}
917
918
54
void TxnManager::get_all_related_tablets(std::set<TabletInfo>* tablet_infos) {
919
55.3k
    for (int32_t i = 0; i < _txn_map_shard_size; i++) {
920
55.2k
        std::shared_lock txn_rdlock(_txn_map_locks[i]);
921
55.2k
        for (auto& it : _txn_tablet_maps[i]) {
922
960
            for (auto& tablet_load_it : it.second) {
923
960
                tablet_infos->emplace(tablet_load_it.first);
924
960
            }
925
64
        }
926
55.2k
    }
927
54
}
928
929
void TxnManager::get_all_commit_tablet_txn_info_by_tablet(
930
1.91k
        const Tablet& tablet, CommitTabletTxnInfoVec* commit_tablet_txn_info_vec) {
931
1.93M
    for (int32_t i = 0; i < _txn_map_shard_size; i++) {
932
1.93M
        std::shared_lock txn_rdlock(_txn_map_locks[i]);
933
1.93M
        for (const auto& [txn_key, load_info_map] : _txn_tablet_maps[i]) {
934
1.12k
            auto tablet_load_it = load_info_map.find(tablet.get_tablet_info());
935
1.12k
            if (tablet_load_it != load_info_map.end()) {
936
42
                const auto& [_, load_info] = *tablet_load_it;
937
42
                const auto& rowset = load_info->rowset;
938
42
                const auto& delete_bitmap = load_info->delete_bitmap;
939
42
                if (!rowset || !delete_bitmap) {
940
25
                    continue;
941
25
                }
942
17
                commit_tablet_txn_info_vec->push_back({
943
17
                        .transaction_id = txn_key.second,
944
17
                        .partition_id = txn_key.first,
945
17
                        .delete_bitmap = delete_bitmap,
946
17
                        .rowset_ids = load_info->rowset_ids,
947
17
                        .partial_update_info = load_info->partial_update_info,
948
17
                });
949
17
            }
950
1.12k
        }
951
1.93M
    }
952
1.91k
}
953
954
147
void TxnManager::build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>* expire_txn_map) {
955
147
    int64_t now = UnixSeconds();
956
    // traverse the txn map, and get all expired txns
957
150k
    for (int32_t i = 0; i < _txn_map_shard_size; i++) {
958
150k
        std::shared_lock txn_rdlock(_txn_map_locks[i]);
959
150k
        for (auto&& [txn_key, tablet_txn_infos] : _txn_tablet_maps[i]) {
960
190
            auto txn_id = txn_key.second;
961
2.93k
            for (auto&& [tablet_info, txn_info] : tablet_txn_infos) {
962
2.93k
                double diff = difftime(now, txn_info->creation_time);
963
2.93k
                if (diff < config::pending_data_expire_time_sec) {
964
2.84k
                    continue;
965
2.84k
                }
966
967
84
                (*expire_txn_map)[tablet_info].push_back(txn_id);
968
84
                if (VLOG_IS_ON(3)) {
969
0
                    VLOG_NOTICE << "find expired txn."
970
0
                                << " tablet=" << tablet_info.to_string()
971
0
                                << " transaction_id=" << txn_id << " exist_sec=" << diff;
972
0
                }
973
84
            }
974
190
        }
975
150k
    }
976
147
}
977
978
void TxnManager::get_partition_ids(const TTransactionId transaction_id,
979
20
                                   std::vector<TPartitionId>* partition_ids) {
980
20
    std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
981
20
    txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id);
982
20
    auto it = txn_partition_map.find(transaction_id);
983
20
    if (it != txn_partition_map.end()) {
984
13
        for (int64_t partition_id : it->second) {
985
13
            partition_ids->push_back(partition_id);
986
13
        }
987
13
    }
988
20
}
989
990
60.3k
void TxnManager::_insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id) {
991
60.3k
    txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id);
992
60.3k
    auto find = txn_partition_map.find(transaction_id);
993
60.3k
    if (find == txn_partition_map.end()) {
994
4.06k
        txn_partition_map[transaction_id] = std::unordered_set<int64_t>();
995
4.06k
    }
996
60.3k
    txn_partition_map[transaction_id].insert(partition_id);
997
60.3k
}
998
999
4.06k
void TxnManager::_clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id) {
1000
4.06k
    txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id);
1001
4.06k
    auto it = txn_partition_map.find(transaction_id);
1002
4.06k
    if (it != txn_partition_map.end()) {
1003
4.06k
        it->second.erase(partition_id);
1004
4.06k
        if (it->second.empty()) {
1005
4.03k
            txn_partition_map.erase(it);
1006
4.03k
        }
1007
4.06k
    }
1008
4.06k
}
1009
1010
void TxnManager::add_txn_tablet_delta_writer(int64_t transaction_id, int64_t tablet_id,
1011
0
                                             DeltaWriter* delta_writer) {
1012
0
    std::lock_guard<std::shared_mutex> txn_wrlock(
1013
0
            _get_txn_tablet_delta_writer_map_lock(transaction_id));
1014
0
    txn_tablet_delta_writer_map_t& txn_tablet_delta_writer_map =
1015
0
            _get_txn_tablet_delta_writer_map(transaction_id);
1016
0
    auto find = txn_tablet_delta_writer_map.find(transaction_id);
1017
0
    if (find == txn_tablet_delta_writer_map.end()) {
1018
0
        txn_tablet_delta_writer_map[transaction_id] = std::map<int64_t, DeltaWriter*>();
1019
0
    }
1020
0
    txn_tablet_delta_writer_map[transaction_id][tablet_id] = delta_writer;
1021
0
}
1022
1023
void TxnManager::finish_slave_tablet_pull_rowset(int64_t transaction_id, int64_t tablet_id,
1024
0
                                                 int64_t node_id, bool is_succeed) {
1025
0
    std::lock_guard<std::shared_mutex> txn_wrlock(
1026
0
            _get_txn_tablet_delta_writer_map_lock(transaction_id));
1027
0
    txn_tablet_delta_writer_map_t& txn_tablet_delta_writer_map =
1028
0
            _get_txn_tablet_delta_writer_map(transaction_id);
1029
0
    auto find_txn = txn_tablet_delta_writer_map.find(transaction_id);
1030
0
    if (find_txn == txn_tablet_delta_writer_map.end()) {
1031
0
        LOG(WARNING) << "delta writer manager is not exist, txn_id=" << transaction_id
1032
0
                     << ", tablet_id=" << tablet_id;
1033
0
        return;
1034
0
    }
1035
0
    auto find_tablet = txn_tablet_delta_writer_map[transaction_id].find(tablet_id);
1036
0
    if (find_tablet == txn_tablet_delta_writer_map[transaction_id].end()) {
1037
0
        LOG(WARNING) << "delta writer is not exist, txn_id=" << transaction_id
1038
0
                     << ", tablet_id=" << tablet_id;
1039
0
        return;
1040
0
    }
1041
0
    DeltaWriter* delta_writer = txn_tablet_delta_writer_map[transaction_id][tablet_id];
1042
0
    delta_writer->finish_slave_tablet_pull_rowset(node_id, is_succeed);
1043
0
}
1044
1045
0
void TxnManager::clear_txn_tablet_delta_writer(int64_t transaction_id) {
1046
0
    std::lock_guard<std::shared_mutex> txn_wrlock(
1047
0
            _get_txn_tablet_delta_writer_map_lock(transaction_id));
1048
0
    txn_tablet_delta_writer_map_t& txn_tablet_delta_writer_map =
1049
0
            _get_txn_tablet_delta_writer_map(transaction_id);
1050
0
    auto it = txn_tablet_delta_writer_map.find(transaction_id);
1051
0
    if (it != txn_tablet_delta_writer_map.end()) {
1052
0
        txn_tablet_delta_writer_map.erase(it);
1053
0
    }
1054
0
    VLOG_CRITICAL << "remove delta writer manager, txn_id=" << transaction_id;
1055
0
}
1056
1057
16.0k
int64_t TxnManager::get_txn_by_tablet_version(int64_t tablet_id, int64_t version) {
1058
16.0k
    char key[16];
1059
16.0k
    memcpy(key, &tablet_id, sizeof(int64_t));
1060
16.0k
    memcpy(key + sizeof(int64_t), &version, sizeof(int64_t));
1061
16.0k
    CacheKey cache_key((const char*)&key, sizeof(key));
1062
1063
16.0k
    auto* handle = _tablet_version_cache->lookup(cache_key);
1064
16.0k
    if (handle == nullptr) {
1065
16.0k
        return -1;
1066
16.0k
    }
1067
21
    int64_t res = ((CacheValue*)_tablet_version_cache->value(handle))->value;
1068
21
    _tablet_version_cache->release(handle);
1069
21
    return res;
1070
16.0k
}
1071
1072
16.0k
void TxnManager::update_tablet_version_txn(int64_t tablet_id, int64_t version, int64_t txn_id) {
1073
16.0k
    char key[16];
1074
16.0k
    memcpy(key, &tablet_id, sizeof(int64_t));
1075
16.0k
    memcpy(key + sizeof(int64_t), &version, sizeof(int64_t));
1076
16.0k
    CacheKey cache_key((const char*)&key, sizeof(key));
1077
1078
16.0k
    auto* value = new CacheValue;
1079
16.0k
    value->value = txn_id;
1080
16.0k
    auto* handle = _tablet_version_cache->insert(cache_key, value, 1, sizeof(txn_id),
1081
16.0k
                                                 CachePriority::NORMAL);
1082
16.0k
    _tablet_version_cache->release(handle);
1083
16.0k
}
1084
1085
TxnState TxnManager::get_txn_state(TPartitionId partition_id, TTransactionId transaction_id,
1086
0
                                   TTabletId tablet_id, TabletUid tablet_uid) {
1087
0
    pair<int64_t, int64_t> key(partition_id, transaction_id);
1088
0
    TabletInfo tablet_info(tablet_id, tablet_uid);
1089
1090
0
    std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
1091
1092
0
    auto& txn_tablet_map = _get_txn_tablet_map(transaction_id);
1093
0
    auto it = txn_tablet_map.find(key);
1094
0
    if (it == txn_tablet_map.end()) {
1095
0
        return TxnState::NOT_FOUND;
1096
0
    }
1097
1098
0
    auto& tablet_txn_info_map = it->second;
1099
0
    auto tablet_txn_info_iter = tablet_txn_info_map.find(tablet_info);
1100
0
    if (tablet_txn_info_iter == tablet_txn_info_map.end()) {
1101
0
        return TxnState::NOT_FOUND;
1102
0
    }
1103
1104
0
    const auto& txn_info = tablet_txn_info_iter->second;
1105
0
    return txn_info->state;
1106
0
}
1107
1108
} // namespace doris