be/src/cloud/cloud_committed_rs_mgr.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "cloud/cloud_committed_rs_mgr.h" |
19 | | |
20 | | #include <chrono> |
21 | | |
22 | | #include "cloud/config.h" |
23 | | #include "common/logging.h" |
24 | | #include "storage/rowset/rowset_meta.h" |
25 | | #include "util/thread.h" |
26 | | |
27 | | namespace doris { |
28 | 13 | CloudCommittedRSMgr::CloudCommittedRSMgr() : _stop_latch(1) {} |
29 | | |
30 | 12 | CloudCommittedRSMgr::~CloudCommittedRSMgr() { |
31 | 12 | _stop_latch.count_down(); |
32 | 12 | if (_clean_thread) { |
33 | 0 | _clean_thread->join(); |
34 | 0 | } |
35 | 12 | } |
36 | | |
37 | 1 | Status CloudCommittedRSMgr::init() { |
38 | 1 | auto st = Thread::create( |
39 | 1 | "CloudCommittedRSMgr", "clean_committed_rs_thread", |
40 | 1 | [this]() { this->_clean_thread_callback(); }, &_clean_thread); |
41 | 1 | if (!st.ok()) { |
42 | 0 | LOG(WARNING) << "failed to create thread for CloudCommittedRSMgr, error: " << st; |
43 | 0 | } |
44 | 1 | return st; |
45 | 1 | } |
46 | | |
47 | | void CloudCommittedRSMgr::add_committed_rowset(int64_t txn_id, int64_t tablet_id, |
48 | | RowsetMetaSharedPtr rowset_meta, |
49 | 35.2k | int64_t expiration_time) { |
50 | 35.2k | int64_t txn_expiration_min = |
51 | 35.2k | duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()) |
52 | 35.2k | .count() + |
53 | 35.2k | config::tablet_txn_info_min_expired_seconds; |
54 | 35.2k | expiration_time = std::max(txn_expiration_min, expiration_time); |
55 | 35.2k | std::unique_lock<std::shared_mutex> wlock(_rwlock); |
56 | 35.2k | TxnTabletKey key(txn_id, tablet_id); |
57 | 35.2k | _committed_rs_map.insert_or_assign(key, CommittedRowsetValue(rowset_meta, expiration_time)); |
58 | 35.2k | _expiration_map.emplace(expiration_time, key); |
59 | 35.2k | LOG(INFO) << "add pending rowset, txn_id=" << txn_id << ", tablet_id=" << tablet_id |
60 | 35.2k | << ", rowset_id=" << rowset_meta->rowset_id().to_string() |
61 | 35.2k | << ", expiration_time=" << expiration_time; |
62 | 35.2k | } |
63 | | |
64 | | Result<std::pair<RowsetMetaSharedPtr, int64_t>> CloudCommittedRSMgr::get_committed_rowset( |
65 | 118k | int64_t txn_id, int64_t tablet_id) { |
66 | 118k | std::shared_lock<std::shared_mutex> rlock(_rwlock); |
67 | 118k | TxnTabletKey key(txn_id, tablet_id); |
68 | 118k | if (auto it = _empty_rowset_markers.find(key); it != _empty_rowset_markers.end()) { |
69 | 83.8k | return std::make_pair(nullptr, it->second); |
70 | 83.8k | } |
71 | 34.6k | auto iter = _committed_rs_map.find(key); |
72 | 34.6k | if (iter == _committed_rs_map.end()) { |
73 | 7 | return ResultError(Status::Error<ErrorCode::NOT_FOUND>( |
74 | 7 | "committed rowset not found, txn_id={}, tablet_id={}", txn_id, tablet_id)); |
75 | 7 | } |
76 | 34.6k | return std::make_pair(iter->second.rowset_meta, iter->second.expiration_time); |
77 | 34.6k | } |
78 | | |
79 | 118k | void CloudCommittedRSMgr::remove_committed_rowset(int64_t txn_id, int64_t tablet_id) { |
80 | 118k | std::unique_lock<std::shared_mutex> wlock(_rwlock); |
81 | 118k | _committed_rs_map.erase({txn_id, tablet_id}); |
82 | 118k | } |
83 | | |
84 | 17 | void CloudCommittedRSMgr::remove_expired_committed_rowsets() { |
85 | 17 | std::unique_lock<std::shared_mutex> wlock(_rwlock); |
86 | 17 | int64_t current_time = std::chrono::duration_cast<std::chrono::seconds>( |
87 | 17 | std::chrono::system_clock::now().time_since_epoch()) |
88 | 17 | .count(); |
89 | | |
90 | 1.61k | while (!_expiration_map.empty()) { |
91 | 1.61k | auto iter = _expiration_map.begin(); |
92 | 1.61k | if (!_committed_rs_map.contains(iter->second) && |
93 | 1.61k | !_empty_rowset_markers.contains(iter->second)) { |
94 | 1.49k | _expiration_map.erase(iter); |
95 | 1.49k | continue; |
96 | 1.49k | } |
97 | 114 | int64_t expiration_time = iter->first; |
98 | 114 | if (expiration_time > current_time) { |
99 | 15 | break; |
100 | 15 | } |
101 | | |
102 | 99 | auto key = iter->second; |
103 | 99 | _expiration_map.erase(iter); |
104 | | |
105 | 99 | auto it_rs = _committed_rs_map.find(key); |
106 | 99 | if (it_rs != _committed_rs_map.end() && it_rs->second.expiration_time == expiration_time) { |
107 | 16 | _committed_rs_map.erase(it_rs); |
108 | 16 | LOG(INFO) << "clean expired pending cloud rowset, txn_id=" << key.txn_id |
109 | 16 | << ", tablet_id=" << key.tablet_id << ", expiration_time=" << expiration_time; |
110 | 16 | } |
111 | 99 | auto it_empty = _empty_rowset_markers.find(key); |
112 | 99 | if (it_empty != _empty_rowset_markers.end() && it_empty->second == expiration_time) { |
113 | 83 | _empty_rowset_markers.erase(it_empty); |
114 | 83 | LOG(INFO) << "clean expired empty rowset marker, txn_id=" << key.txn_id |
115 | 83 | << ", tablet_id=" << key.tablet_id << ", expiration_time=" << expiration_time; |
116 | 83 | } |
117 | 99 | } |
118 | 17 | } |
119 | | |
120 | | void CloudCommittedRSMgr::mark_empty_rowset(int64_t txn_id, int64_t tablet_id, |
121 | 85.7k | int64_t txn_expiration) { |
122 | 85.7k | int64_t txn_expiration_min = |
123 | 85.7k | duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()) |
124 | 85.7k | .count() + |
125 | 85.7k | config::tablet_txn_info_min_expired_seconds; |
126 | 85.7k | txn_expiration = std::max(txn_expiration_min, txn_expiration); |
127 | | |
128 | 85.7k | std::unique_lock<std::shared_mutex> wlock(_rwlock); |
129 | 85.7k | TxnTabletKey txn_key(txn_id, tablet_id); |
130 | 85.7k | _empty_rowset_markers.insert_or_assign(txn_key, txn_expiration); |
131 | 85.7k | _expiration_map.emplace(txn_expiration, txn_key); |
132 | 85.7k | } |
133 | | |
134 | 1 | void CloudCommittedRSMgr::_clean_thread_callback() { |
135 | 14 | do { |
136 | 14 | remove_expired_committed_rowsets(); |
137 | 14 | } while (!_stop_latch.wait_for( |
138 | 14 | std::chrono::seconds(config::remove_expired_tablet_txn_info_interval_seconds))); |
139 | 1 | } |
140 | | } // namespace doris |