be/src/cloud/cloud_committed_rs_mgr.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "cloud/cloud_committed_rs_mgr.h" |
19 | | |
20 | | #include <chrono> |
21 | | |
22 | | #include "cloud/config.h" |
23 | | #include "common/logging.h" |
24 | | #include "storage/rowset/rowset_meta.h" |
25 | | #include "util/thread.h" |
26 | | |
27 | | namespace doris { |
28 | | #include "common/compile_check_begin.h" |
29 | 13 | CloudCommittedRSMgr::CloudCommittedRSMgr() : _stop_latch(1) {} |
30 | | |
31 | 12 | CloudCommittedRSMgr::~CloudCommittedRSMgr() { |
32 | 12 | _stop_latch.count_down(); |
33 | 12 | if (_clean_thread) { |
34 | 0 | _clean_thread->join(); |
35 | 0 | } |
36 | 12 | } |
37 | | |
38 | 1 | Status CloudCommittedRSMgr::init() { |
39 | 1 | auto st = Thread::create( |
40 | 1 | "CloudCommittedRSMgr", "clean_committed_rs_thread", |
41 | 1 | [this]() { this->_clean_thread_callback(); }, &_clean_thread); |
42 | 1 | if (!st.ok()) { |
43 | 0 | LOG(WARNING) << "failed to create thread for CloudCommittedRSMgr, error: " << st; |
44 | 0 | } |
45 | 1 | return st; |
46 | 1 | } |
47 | | |
48 | | void CloudCommittedRSMgr::add_committed_rowset(int64_t txn_id, int64_t tablet_id, |
49 | | RowsetMetaSharedPtr rowset_meta, |
50 | 34.8k | int64_t expiration_time) { |
51 | 34.8k | int64_t txn_expiration_min = |
52 | 34.8k | duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()) |
53 | 34.8k | .count() + |
54 | 34.8k | config::tablet_txn_info_min_expired_seconds; |
55 | 34.8k | expiration_time = std::max(txn_expiration_min, expiration_time); |
56 | 34.8k | std::unique_lock<std::shared_mutex> wlock(_rwlock); |
57 | 34.8k | TxnTabletKey key(txn_id, tablet_id); |
58 | 34.8k | _committed_rs_map.insert_or_assign(key, CommittedRowsetValue(rowset_meta, expiration_time)); |
59 | 34.8k | _expiration_map.emplace(expiration_time, key); |
60 | 34.8k | LOG(INFO) << "add pending rowset, txn_id=" << txn_id << ", tablet_id=" << tablet_id |
61 | 34.8k | << ", rowset_id=" << rowset_meta->rowset_id().to_string() |
62 | 34.8k | << ", expiration_time=" << expiration_time; |
63 | 34.8k | } |
64 | | |
65 | | Result<std::pair<RowsetMetaSharedPtr, int64_t>> CloudCommittedRSMgr::get_committed_rowset( |
66 | 125k | int64_t txn_id, int64_t tablet_id) { |
67 | 125k | std::shared_lock<std::shared_mutex> rlock(_rwlock); |
68 | 125k | TxnTabletKey key(txn_id, tablet_id); |
69 | 125k | if (auto it = _empty_rowset_markers.find(key); it != _empty_rowset_markers.end()) { |
70 | 91.1k | return std::make_pair(nullptr, it->second); |
71 | 91.1k | } |
72 | 34.1k | auto iter = _committed_rs_map.find(key); |
73 | 34.1k | if (iter == _committed_rs_map.end()) { |
74 | 7 | return ResultError(Status::Error<ErrorCode::NOT_FOUND>( |
75 | 7 | "committed rowset not found, txn_id={}, tablet_id={}", txn_id, tablet_id)); |
76 | 7 | } |
77 | 34.0k | return std::make_pair(iter->second.rowset_meta, iter->second.expiration_time); |
78 | 34.1k | } |
79 | | |
80 | 125k | void CloudCommittedRSMgr::remove_committed_rowset(int64_t txn_id, int64_t tablet_id) { |
81 | 125k | std::unique_lock<std::shared_mutex> wlock(_rwlock); |
82 | 125k | _committed_rs_map.erase({txn_id, tablet_id}); |
83 | 125k | } |
84 | | |
85 | 20 | void CloudCommittedRSMgr::remove_expired_committed_rowsets() { |
86 | 20 | std::unique_lock<std::shared_mutex> wlock(_rwlock); |
87 | 20 | int64_t current_time = std::chrono::duration_cast<std::chrono::seconds>( |
88 | 20 | std::chrono::system_clock::now().time_since_epoch()) |
89 | 20 | .count(); |
90 | | |
91 | 1.36k | while (!_expiration_map.empty()) { |
92 | 1.35k | auto iter = _expiration_map.begin(); |
93 | 1.35k | if (!_committed_rs_map.contains(iter->second) && |
94 | 1.35k | !_empty_rowset_markers.contains(iter->second)) { |
95 | 1.22k | _expiration_map.erase(iter); |
96 | 1.22k | continue; |
97 | 1.22k | } |
98 | 131 | int64_t expiration_time = iter->first; |
99 | 131 | if (expiration_time > current_time) { |
100 | 18 | break; |
101 | 18 | } |
102 | | |
103 | 113 | auto key = iter->second; |
104 | 113 | _expiration_map.erase(iter); |
105 | | |
106 | 113 | auto it_rs = _committed_rs_map.find(key); |
107 | 113 | if (it_rs != _committed_rs_map.end() && it_rs->second.expiration_time == expiration_time) { |
108 | 15 | _committed_rs_map.erase(it_rs); |
109 | 15 | LOG(INFO) << "clean expired pending cloud rowset, txn_id=" << key.txn_id |
110 | 15 | << ", tablet_id=" << key.tablet_id << ", expiration_time=" << expiration_time; |
111 | 15 | } |
112 | 113 | auto it_empty = _empty_rowset_markers.find(key); |
113 | 113 | if (it_empty != _empty_rowset_markers.end() && it_empty->second == expiration_time) { |
114 | 98 | _empty_rowset_markers.erase(it_empty); |
115 | 98 | LOG(INFO) << "clean expired empty rowset marker, txn_id=" << key.txn_id |
116 | 98 | << ", tablet_id=" << key.tablet_id << ", expiration_time=" << expiration_time; |
117 | 98 | } |
118 | 113 | } |
119 | 20 | } |
120 | | |
121 | | void CloudCommittedRSMgr::mark_empty_rowset(int64_t txn_id, int64_t tablet_id, |
122 | 93.0k | int64_t txn_expiration) { |
123 | 93.0k | int64_t txn_expiration_min = |
124 | 93.0k | duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()) |
125 | 93.0k | .count() + |
126 | 93.0k | config::tablet_txn_info_min_expired_seconds; |
127 | 93.0k | txn_expiration = std::max(txn_expiration_min, txn_expiration); |
128 | | |
129 | 93.0k | std::unique_lock<std::shared_mutex> wlock(_rwlock); |
130 | 93.0k | TxnTabletKey txn_key(txn_id, tablet_id); |
131 | 93.0k | _empty_rowset_markers.insert_or_assign(txn_key, txn_expiration); |
132 | 93.0k | _expiration_map.emplace(txn_expiration, txn_key); |
133 | 93.0k | } |
134 | | |
135 | 1 | void CloudCommittedRSMgr::_clean_thread_callback() { |
136 | 17 | do { |
137 | 17 | remove_expired_committed_rowsets(); |
138 | 17 | } while (!_stop_latch.wait_for( |
139 | 17 | std::chrono::seconds(config::remove_expired_tablet_txn_info_interval_seconds))); |
140 | 1 | } |
141 | | #include "common/compile_check_end.h" |
142 | | } // namespace doris |