Coverage Report

Created: 2026-03-17 18:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_committed_rs_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_committed_rs_mgr.h"
19
20
#include <chrono>
21
22
#include "cloud/config.h"
23
#include "common/logging.h"
24
#include "storage/rowset/rowset_meta.h"
25
#include "util/thread.h"
26
27
namespace doris {
28
#include "common/compile_check_begin.h"
29
12
CloudCommittedRSMgr::CloudCommittedRSMgr() : _stop_latch(1) {}
30
31
12
CloudCommittedRSMgr::~CloudCommittedRSMgr() {
32
12
    _stop_latch.count_down();
33
12
    if (_clean_thread) {
34
0
        _clean_thread->join();
35
0
    }
36
12
}
37
38
0
Status CloudCommittedRSMgr::init() {
39
0
    auto st = Thread::create(
40
0
            "CloudCommittedRSMgr", "clean_committed_rs_thread",
41
0
            [this]() { this->_clean_thread_callback(); }, &_clean_thread);
42
0
    if (!st.ok()) {
43
0
        LOG(WARNING) << "failed to create thread for CloudCommittedRSMgr, error: " << st;
44
0
    }
45
0
    return st;
46
0
}
47
48
void CloudCommittedRSMgr::add_committed_rowset(int64_t txn_id, int64_t tablet_id,
49
                                               RowsetMetaSharedPtr rowset_meta,
50
29
                                               int64_t expiration_time) {
51
29
    int64_t txn_expiration_min =
52
29
            duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
53
29
                    .count() +
54
29
            config::tablet_txn_info_min_expired_seconds;
55
29
    expiration_time = std::max(txn_expiration_min, expiration_time);
56
29
    std::unique_lock<std::shared_mutex> wlock(_rwlock);
57
29
    TxnTabletKey key(txn_id, tablet_id);
58
29
    _committed_rs_map.insert_or_assign(key, CommittedRowsetValue(rowset_meta, expiration_time));
59
29
    _expiration_map.emplace(expiration_time, key);
60
29
    LOG(INFO) << "add pending rowset, txn_id=" << txn_id << ", tablet_id=" << tablet_id
61
29
              << ", rowset_id=" << rowset_meta->rowset_id().to_string()
62
29
              << ", expiration_time=" << expiration_time;
63
29
}
64
65
Result<std::pair<RowsetMetaSharedPtr, int64_t>> CloudCommittedRSMgr::get_committed_rowset(
66
48
        int64_t txn_id, int64_t tablet_id) {
67
48
    std::shared_lock<std::shared_mutex> rlock(_rwlock);
68
48
    TxnTabletKey key(txn_id, tablet_id);
69
48
    if (auto it = _empty_rowset_markers.find(key); it != _empty_rowset_markers.end()) {
70
10
        return std::make_pair(nullptr, it->second);
71
10
    }
72
38
    auto iter = _committed_rs_map.find(key);
73
38
    if (iter == _committed_rs_map.end()) {
74
7
        return ResultError(Status::Error<ErrorCode::NOT_FOUND>(
75
7
                "committed rowset not found, txn_id={}, tablet_id={}", txn_id, tablet_id));
76
7
    }
77
31
    return std::make_pair(iter->second.rowset_meta, iter->second.expiration_time);
78
38
}
79
80
1
void CloudCommittedRSMgr::remove_committed_rowset(int64_t txn_id, int64_t tablet_id) {
81
1
    std::unique_lock<std::shared_mutex> wlock(_rwlock);
82
1
    _committed_rs_map.erase({txn_id, tablet_id});
83
1
}
84
85
3
void CloudCommittedRSMgr::remove_expired_committed_rowsets() {
86
3
    std::unique_lock<std::shared_mutex> wlock(_rwlock);
87
3
    int64_t current_time = std::chrono::duration_cast<std::chrono::seconds>(
88
3
                                   std::chrono::system_clock::now().time_since_epoch())
89
3
                                   .count();
90
91
7
    while (!_expiration_map.empty()) {
92
6
        auto iter = _expiration_map.begin();
93
6
        if (!_committed_rs_map.contains(iter->second) &&
94
6
            !_empty_rowset_markers.contains(iter->second)) {
95
0
            _expiration_map.erase(iter);
96
0
            continue;
97
0
        }
98
6
        int64_t expiration_time = iter->first;
99
6
        if (expiration_time > current_time) {
100
2
            break;
101
2
        }
102
103
4
        auto key = iter->second;
104
4
        _expiration_map.erase(iter);
105
106
4
        auto it_rs = _committed_rs_map.find(key);
107
4
        if (it_rs != _committed_rs_map.end() && it_rs->second.expiration_time == expiration_time) {
108
2
            _committed_rs_map.erase(it_rs);
109
2
            LOG(INFO) << "clean expired pending cloud rowset, txn_id=" << key.txn_id
110
2
                      << ", tablet_id=" << key.tablet_id << ", expiration_time=" << expiration_time;
111
2
        }
112
4
        auto it_empty = _empty_rowset_markers.find(key);
113
4
        if (it_empty != _empty_rowset_markers.end() && it_empty->second == expiration_time) {
114
2
            _empty_rowset_markers.erase(it_empty);
115
2
            LOG(INFO) << "clean expired empty rowset marker, txn_id=" << key.txn_id
116
2
                      << ", tablet_id=" << key.tablet_id << ", expiration_time=" << expiration_time;
117
2
        }
118
4
    }
119
3
}
120
121
void CloudCommittedRSMgr::mark_empty_rowset(int64_t txn_id, int64_t tablet_id,
122
9
                                            int64_t txn_expiration) {
123
9
    int64_t txn_expiration_min =
124
9
            duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
125
9
                    .count() +
126
9
            config::tablet_txn_info_min_expired_seconds;
127
9
    txn_expiration = std::max(txn_expiration_min, txn_expiration);
128
129
9
    std::unique_lock<std::shared_mutex> wlock(_rwlock);
130
9
    TxnTabletKey txn_key(txn_id, tablet_id);
131
9
    _empty_rowset_markers.insert_or_assign(txn_key, txn_expiration);
132
9
    _expiration_map.emplace(txn_expiration, txn_key);
133
9
}
134
135
0
void CloudCommittedRSMgr::_clean_thread_callback() {
136
0
    do {
137
0
        remove_expired_committed_rowsets();
138
0
    } while (!_stop_latch.wait_for(
139
0
            std::chrono::seconds(config::remove_expired_tablet_txn_info_interval_seconds)));
140
0
}
141
#include "common/compile_check_end.h"
142
} // namespace doris