Coverage Report

Created: 2026-04-14 13:01

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_committed_rs_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_committed_rs_mgr.h"
19
20
#include <chrono>
21
22
#include "cloud/config.h"
23
#include "common/logging.h"
24
#include "storage/rowset/rowset_meta.h"
25
#include "util/thread.h"
26
27
namespace doris {
28
13
CloudCommittedRSMgr::CloudCommittedRSMgr() : _stop_latch(1) {}
29
30
12
CloudCommittedRSMgr::~CloudCommittedRSMgr() {
31
12
    _stop_latch.count_down();
32
12
    if (_clean_thread) {
33
0
        _clean_thread->join();
34
0
    }
35
12
}
36
37
1
Status CloudCommittedRSMgr::init() {
38
1
    auto st = Thread::create(
39
1
            "CloudCommittedRSMgr", "clean_committed_rs_thread",
40
1
            [this]() { this->_clean_thread_callback(); }, &_clean_thread);
41
1
    if (!st.ok()) {
42
0
        LOG(WARNING) << "failed to create thread for CloudCommittedRSMgr, error: " << st;
43
0
    }
44
1
    return st;
45
1
}
46
47
void CloudCommittedRSMgr::add_committed_rowset(int64_t txn_id, int64_t tablet_id,
48
                                               RowsetMetaSharedPtr rowset_meta,
49
35.2k
                                               int64_t expiration_time) {
50
35.2k
    int64_t txn_expiration_min =
51
35.2k
            duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
52
35.2k
                    .count() +
53
35.2k
            config::tablet_txn_info_min_expired_seconds;
54
35.2k
    expiration_time = std::max(txn_expiration_min, expiration_time);
55
35.2k
    std::unique_lock<std::shared_mutex> wlock(_rwlock);
56
35.2k
    TxnTabletKey key(txn_id, tablet_id);
57
35.2k
    _committed_rs_map.insert_or_assign(key, CommittedRowsetValue(rowset_meta, expiration_time));
58
35.2k
    _expiration_map.emplace(expiration_time, key);
59
35.2k
    LOG(INFO) << "add pending rowset, txn_id=" << txn_id << ", tablet_id=" << tablet_id
60
35.2k
              << ", rowset_id=" << rowset_meta->rowset_id().to_string()
61
35.2k
              << ", expiration_time=" << expiration_time;
62
35.2k
}
63
64
Result<std::pair<RowsetMetaSharedPtr, int64_t>> CloudCommittedRSMgr::get_committed_rowset(
65
118k
        int64_t txn_id, int64_t tablet_id) {
66
118k
    std::shared_lock<std::shared_mutex> rlock(_rwlock);
67
118k
    TxnTabletKey key(txn_id, tablet_id);
68
118k
    if (auto it = _empty_rowset_markers.find(key); it != _empty_rowset_markers.end()) {
69
83.8k
        return std::make_pair(nullptr, it->second);
70
83.8k
    }
71
34.6k
    auto iter = _committed_rs_map.find(key);
72
34.6k
    if (iter == _committed_rs_map.end()) {
73
7
        return ResultError(Status::Error<ErrorCode::NOT_FOUND>(
74
7
                "committed rowset not found, txn_id={}, tablet_id={}", txn_id, tablet_id));
75
7
    }
76
34.6k
    return std::make_pair(iter->second.rowset_meta, iter->second.expiration_time);
77
34.6k
}
78
79
118k
void CloudCommittedRSMgr::remove_committed_rowset(int64_t txn_id, int64_t tablet_id) {
80
118k
    std::unique_lock<std::shared_mutex> wlock(_rwlock);
81
118k
    _committed_rs_map.erase({txn_id, tablet_id});
82
118k
}
83
84
17
void CloudCommittedRSMgr::remove_expired_committed_rowsets() {
85
17
    std::unique_lock<std::shared_mutex> wlock(_rwlock);
86
17
    int64_t current_time = std::chrono::duration_cast<std::chrono::seconds>(
87
17
                                   std::chrono::system_clock::now().time_since_epoch())
88
17
                                   .count();
89
90
1.61k
    while (!_expiration_map.empty()) {
91
1.61k
        auto iter = _expiration_map.begin();
92
1.61k
        if (!_committed_rs_map.contains(iter->second) &&
93
1.61k
            !_empty_rowset_markers.contains(iter->second)) {
94
1.49k
            _expiration_map.erase(iter);
95
1.49k
            continue;
96
1.49k
        }
97
114
        int64_t expiration_time = iter->first;
98
114
        if (expiration_time > current_time) {
99
15
            break;
100
15
        }
101
102
99
        auto key = iter->second;
103
99
        _expiration_map.erase(iter);
104
105
99
        auto it_rs = _committed_rs_map.find(key);
106
99
        if (it_rs != _committed_rs_map.end() && it_rs->second.expiration_time == expiration_time) {
107
16
            _committed_rs_map.erase(it_rs);
108
16
            LOG(INFO) << "clean expired pending cloud rowset, txn_id=" << key.txn_id
109
16
                      << ", tablet_id=" << key.tablet_id << ", expiration_time=" << expiration_time;
110
16
        }
111
99
        auto it_empty = _empty_rowset_markers.find(key);
112
99
        if (it_empty != _empty_rowset_markers.end() && it_empty->second == expiration_time) {
113
83
            _empty_rowset_markers.erase(it_empty);
114
83
            LOG(INFO) << "clean expired empty rowset marker, txn_id=" << key.txn_id
115
83
                      << ", tablet_id=" << key.tablet_id << ", expiration_time=" << expiration_time;
116
83
        }
117
99
    }
118
17
}
119
120
void CloudCommittedRSMgr::mark_empty_rowset(int64_t txn_id, int64_t tablet_id,
121
85.7k
                                            int64_t txn_expiration) {
122
85.7k
    int64_t txn_expiration_min =
123
85.7k
            duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
124
85.7k
                    .count() +
125
85.7k
            config::tablet_txn_info_min_expired_seconds;
126
85.7k
    txn_expiration = std::max(txn_expiration_min, txn_expiration);
127
128
85.7k
    std::unique_lock<std::shared_mutex> wlock(_rwlock);
129
85.7k
    TxnTabletKey txn_key(txn_id, tablet_id);
130
85.7k
    _empty_rowset_markers.insert_or_assign(txn_key, txn_expiration);
131
85.7k
    _expiration_map.emplace(txn_expiration, txn_key);
132
85.7k
}
133
134
1
void CloudCommittedRSMgr::_clean_thread_callback() {
135
14
    do {
136
14
        remove_expired_committed_rowsets();
137
14
    } while (!_stop_latch.wait_for(
138
14
            std::chrono::seconds(config::remove_expired_tablet_txn_info_interval_seconds)));
139
1
}
140
} // namespace doris