Coverage Report

Created: 2026-03-15 22:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_txn_delete_bitmap_cache.h"
19
20
#include <fmt/core.h>
21
22
#include <chrono>
23
#include <memory>
24
#include <shared_mutex>
25
26
#include "cloud/config.h"
27
#include "common/status.h"
28
#include "cpp/sync_point.h"
29
#include "storage/olap_common.h"
30
#include "storage/tablet/tablet_meta.h"
31
#include "storage/txn/txn_manager.h"
32
33
namespace doris {
34
35
CloudTxnDeleteBitmapCache::CloudTxnDeleteBitmapCache(size_t size_in_bytes)
36
0
        : LRUCachePolicy(CachePolicy::CacheType::CLOUD_TXN_DELETE_BITMAP_CACHE, size_in_bytes,
37
0
                         LRUCacheType::SIZE, /*stale_sweep_time_s*/ 86400, /*num_shards*/ 4,
38
0
                         /*element_count_capacity*/ 0, /*enable_prune*/ true,
39
0
                         /*is_lru_k*/ false),
40
0
          _stop_latch(1) {}
41
42
0
CloudTxnDeleteBitmapCache::~CloudTxnDeleteBitmapCache() {
43
0
    _stop_latch.count_down();
44
0
    _clean_thread->join();
45
0
}
46
47
0
Status CloudTxnDeleteBitmapCache::init() {
48
0
    auto st = Thread::create(
49
0
            "CloudTxnDeleteBitmapCache", "clean_txn_dbm_thread",
50
0
            [this]() { this->_clean_thread_callback(); }, &_clean_thread);
51
0
    if (!st.ok()) {
52
0
        LOG(WARNING) << "failed to create thread for CloudTxnDeleteBitmapCache, error: " << st;
53
0
    }
54
0
    return st;
55
0
}
56
57
Status CloudTxnDeleteBitmapCache::get_tablet_txn_info(
58
        TTransactionId transaction_id, int64_t tablet_id, RowsetSharedPtr* rowset,
59
        DeleteBitmapPtr* delete_bitmap, RowsetIdUnorderedSet* rowset_ids, int64_t* txn_expiration,
60
        std::shared_ptr<PartialUpdateInfo>* partial_update_info,
61
0
        std::shared_ptr<PublishStatus>* publish_status, TxnPublishInfo* previous_publish_info) {
62
0
    {
63
0
        std::shared_lock<std::shared_mutex> rlock(_rwlock);
64
0
        TxnKey key(transaction_id, tablet_id);
65
0
        DBUG_EXECUTE_IF("CloudTxnDeleteBitmapCache.get_tablet_txn_info.not_found", {
66
0
            return Status::Error<ErrorCode::NOT_FOUND>(
67
0
                    "not found txn info for test, tablet_id={}, transaction_id={}", tablet_id,
68
0
                    transaction_id);
69
0
        });
70
0
        auto iter = _txn_map.find(key);
71
0
        if (iter == _txn_map.end()) {
72
0
            return Status::Error<ErrorCode::NOT_FOUND, false>(
73
0
                    "not found txn info, tablet_id={}, transaction_id={}", tablet_id,
74
0
                    transaction_id);
75
0
        }
76
0
        *rowset = iter->second.rowset;
77
0
        *txn_expiration = iter->second.txn_expiration;
78
0
        *partial_update_info = iter->second.partial_update_info;
79
0
        *publish_status = iter->second.publish_status;
80
0
        *previous_publish_info = iter->second.publish_info;
81
0
    }
82
83
0
    auto st = get_delete_bitmap(transaction_id, tablet_id, delete_bitmap, rowset_ids, nullptr);
84
85
0
    if (st.is<ErrorCode::NOT_FOUND>()) {
86
        // Because of the rowset_ids become empty, all delete bitmap
87
        // will be recalculate in CalcDeleteBitmapTask
88
0
        if (delete_bitmap != nullptr) {
89
0
            *delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id);
90
0
        }
91
        // to avoid to skip calculating
92
0
        **publish_status = PublishStatus::INIT;
93
94
0
        return Status::OK();
95
0
    }
96
0
    return st;
97
0
}
98
99
Status CloudTxnDeleteBitmapCache::get_delete_bitmap(
100
        TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr* delete_bitmap,
101
0
        RowsetIdUnorderedSet* rowset_ids, std::shared_ptr<PublishStatus>* publish_status) {
102
0
    if (publish_status) {
103
0
        std::shared_lock<std::shared_mutex> rlock(_rwlock);
104
0
        TxnKey txn_key(transaction_id, tablet_id);
105
0
        auto iter = _txn_map.find(txn_key);
106
0
        if (iter == _txn_map.end()) {
107
0
            return Status::Error<ErrorCode::NOT_FOUND, false>(
108
0
                    "not found txn info, tablet_id={}, transaction_id={}", tablet_id,
109
0
                    transaction_id);
110
0
        }
111
0
        *publish_status = iter->second.publish_status;
112
0
    }
113
0
    std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
114
0
    CacheKey key(key_str);
115
0
    Cache::Handle* handle = lookup(key);
116
117
0
    DBUG_EXECUTE_IF("CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss", {
118
0
        handle = nullptr;
119
0
        LOG(INFO) << "CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss, make cache missed "
120
0
                     "when get delete bitmap, txn_id:"
121
0
                  << transaction_id << ", tablet_id: " << tablet_id;
122
0
    });
123
124
0
    DeleteBitmapCacheValue* val =
125
0
            handle == nullptr ? nullptr : reinterpret_cast<DeleteBitmapCacheValue*>(value(handle));
126
0
    if (val) {
127
0
        *delete_bitmap = val->delete_bitmap;
128
0
        if (rowset_ids) {
129
0
            *rowset_ids = val->rowset_ids;
130
0
        }
131
        // must call release handle to reduce the reference count,
132
        // otherwise there will be memory leak
133
0
        release(handle);
134
0
    } else {
135
0
        LOG_INFO("cache missed when get delete bitmap")
136
0
                .tag("txn_id", transaction_id)
137
0
                .tag("tablet_id", tablet_id);
138
0
        return Status::Error<ErrorCode::NOT_FOUND, false>(
139
0
                "cache missed when get delete bitmap, tablet_id={}, transaction_id={}", tablet_id,
140
0
                transaction_id);
141
0
    }
142
0
    return Status::OK();
143
0
}
144
145
void CloudTxnDeleteBitmapCache::set_tablet_txn_info(
146
        TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr delete_bitmap,
147
        const RowsetIdUnorderedSet& rowset_ids, RowsetSharedPtr rowset, int64_t txn_expiration,
148
0
        std::shared_ptr<PartialUpdateInfo> partial_update_info) {
149
0
    int64_t txn_expiration_min =
150
0
            duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
151
0
                    .count() +
152
0
            config::tablet_txn_info_min_expired_seconds;
153
0
    txn_expiration = std::max(txn_expiration_min, txn_expiration);
154
0
    {
155
0
        std::unique_lock<std::shared_mutex> wlock(_rwlock);
156
0
        TxnKey txn_key(transaction_id, tablet_id);
157
0
        std::shared_ptr<PublishStatus> publish_status =
158
0
                std::make_shared<PublishStatus>(PublishStatus::INIT);
159
0
        _txn_map[txn_key] = TxnVal(rowset, txn_expiration, std::move(partial_update_info),
160
0
                                   std::move(publish_status));
161
0
        _expiration_txn.emplace(txn_expiration, txn_key);
162
0
    }
163
0
    std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
164
0
    CacheKey key(key_str);
165
166
0
    auto val = new DeleteBitmapCacheValue(delete_bitmap, rowset_ids);
167
0
    size_t charge = sizeof(DeleteBitmapCacheValue);
168
0
    for (auto& [k, v] : val->delete_bitmap->delete_bitmap) {
169
0
        charge += v.getSizeInBytes();
170
0
    }
171
0
    auto* handle = insert(key, val, charge, charge, CachePriority::NORMAL);
172
    // must call release handle to reduce the reference count,
173
    // otherwise there will be memory leak
174
0
    release(handle);
175
0
    LOG_INFO("set txn related delete bitmap")
176
0
            .tag("txn_id", transaction_id)
177
0
            .tag("expiration", txn_expiration)
178
0
            .tag("tablet_id", tablet_id)
179
0
            .tag("delete_bitmap_size", charge)
180
0
            .tag("delete_bitmap_count", delete_bitmap->get_delete_bitmap_count())
181
0
            .tag("delete_bitmap_cardinality", delete_bitmap->cardinality());
182
0
}
183
184
Status CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId transaction_id,
185
                                                         int64_t tablet_id,
186
                                                         DeleteBitmapPtr delete_bitmap,
187
                                                         const RowsetIdUnorderedSet& rowset_ids,
188
                                                         PublishStatus publish_status,
189
0
                                                         TxnPublishInfo publish_info) {
190
0
    {
191
0
        std::unique_lock<std::shared_mutex> wlock(_rwlock);
192
0
        TxnKey txn_key(transaction_id, tablet_id);
193
0
        if (!_txn_map.contains(txn_key)) {
194
0
            return Status::Error<ErrorCode::NOT_FOUND, false>(
195
0
                    "not found txn info, tablet_id={}, transaction_id={}, may be expired and be "
196
0
                    "removed",
197
0
                    tablet_id, transaction_id);
198
0
        }
199
0
        TxnVal& txn_val = _txn_map[txn_key];
200
0
        *(txn_val.publish_status) = publish_status;
201
0
        if (publish_status == PublishStatus::SUCCEED) {
202
0
            txn_val.publish_info = publish_info;
203
0
        }
204
0
    }
205
0
    std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
206
0
    CacheKey key(key_str);
207
208
0
    auto val = new DeleteBitmapCacheValue(delete_bitmap, rowset_ids);
209
0
    size_t charge = sizeof(DeleteBitmapCacheValue);
210
0
    for (auto& [k, v] : val->delete_bitmap->delete_bitmap) {
211
0
        charge += v.getSizeInBytes();
212
0
    }
213
0
    auto* handle = insert(key, val, charge, charge, CachePriority::NORMAL);
214
    // must call release handle to reduce the reference count,
215
    // otherwise there will be memory leak
216
0
    release(handle);
217
0
    if (config::enable_mow_verbose_log) {
218
0
        LOG_INFO("update txn related delete bitmap")
219
0
                .tag("txn_id", transaction_id)
220
0
                .tag("tablt_id", tablet_id)
221
0
                .tag("delete_bitmap_size", charge)
222
0
                .tag("delete_bitmap_count", delete_bitmap->get_delete_bitmap_count())
223
0
                .tag("delete_bitmap_cardinality", delete_bitmap->cardinality())
224
0
                .tag("publish_status", static_cast<int>(publish_status));
225
0
    }
226
0
    return Status::OK();
227
0
}
228
229
0
void CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info() {
230
0
    TEST_SYNC_POINT_RETURN_WITH_VOID("CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info");
231
0
    std::unique_lock<std::shared_mutex> wlock(_rwlock);
232
0
    while (!_expiration_txn.empty()) {
233
0
        auto iter = _expiration_txn.begin();
234
0
        bool in_txn_map = _txn_map.find(iter->second) != _txn_map.end();
235
0
        bool in_markers = _empty_rowset_markers.find(iter->second) != _empty_rowset_markers.end();
236
0
        if (!in_txn_map && !in_markers) {
237
0
            _expiration_txn.erase(iter);
238
0
            continue;
239
0
        }
240
0
        int64_t current_time = duration_cast<std::chrono::seconds>(
241
0
                                       std::chrono::system_clock::now().time_since_epoch())
242
0
                                       .count();
243
0
        if (iter->first > current_time) {
244
0
            break;
245
0
        }
246
        // Clean from _txn_map if exists
247
0
        auto txn_iter = _txn_map.find(iter->second);
248
0
        if ((txn_iter != _txn_map.end()) && (iter->first == txn_iter->second.txn_expiration)) {
249
0
            LOG_INFO("clean expired delete bitmap")
250
0
                    .tag("txn_id", txn_iter->first.txn_id)
251
0
                    .tag("expiration", txn_iter->second.txn_expiration)
252
0
                    .tag("tablt_id", txn_iter->first.tablet_id);
253
0
            std::string key_str = std::to_string(txn_iter->first.txn_id) + "/" +
254
0
                                  std::to_string(txn_iter->first.tablet_id); // Cache key container
255
0
            CacheKey cache_key(key_str);
256
0
            erase(cache_key);
257
0
            _txn_map.erase(iter->second);
258
0
        }
259
        // Clean from _empty_rowset_markers if exists
260
0
        auto marker_iter = _empty_rowset_markers.find(iter->second);
261
0
        if (marker_iter != _empty_rowset_markers.end()) {
262
0
            LOG_INFO("clean expired empty rowset marker")
263
0
                    .tag("txn_id", iter->second.txn_id)
264
0
                    .tag("tablet_id", iter->second.tablet_id);
265
0
            _empty_rowset_markers.erase(marker_iter);
266
0
        }
267
0
        _expiration_txn.erase(iter);
268
0
    }
269
0
}
270
271
void CloudTxnDeleteBitmapCache::remove_unused_tablet_txn_info(TTransactionId transaction_id,
272
0
                                                              int64_t tablet_id) {
273
0
    std::unique_lock<std::shared_mutex> wlock(_rwlock);
274
0
    TxnKey txn_key(transaction_id, tablet_id);
275
0
    auto txn_iter = _txn_map.find(txn_key);
276
0
    if (txn_iter != _txn_map.end()) {
277
0
        LOG_INFO("remove unused tablet txn info")
278
0
                .tag("txn_id", txn_iter->first.txn_id)
279
0
                .tag("tablt_id", txn_iter->first.tablet_id);
280
0
        std::string key_str = std::to_string(txn_iter->first.txn_id) + "/" +
281
0
                              std::to_string(txn_iter->first.tablet_id); // Cache key container
282
0
        CacheKey cache_key(key_str);
283
0
        erase(cache_key);
284
0
        _txn_map.erase(txn_key);
285
0
    }
286
0
}
287
288
void CloudTxnDeleteBitmapCache::mark_empty_rowset(TTransactionId txn_id, int64_t tablet_id,
289
0
                                                  int64_t txn_expiration) {
290
0
    int64_t txn_expiration_min =
291
0
            duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
292
0
                    .count() +
293
0
            config::tablet_txn_info_min_expired_seconds;
294
0
    txn_expiration = std::max(txn_expiration_min, txn_expiration);
295
296
0
    if (config::enable_mow_verbose_log) {
297
0
        LOG_INFO("mark empty rowset")
298
0
                .tag("txn_id", txn_id)
299
0
                .tag("tablet_id", tablet_id)
300
0
                .tag("expiration", txn_expiration);
301
0
    }
302
0
    std::unique_lock<std::shared_mutex> wlock(_rwlock);
303
0
    TxnKey txn_key(txn_id, tablet_id);
304
0
    _empty_rowset_markers.emplace(txn_key);
305
0
    _expiration_txn.emplace(txn_expiration, txn_key);
306
0
}
307
308
0
bool CloudTxnDeleteBitmapCache::is_empty_rowset(TTransactionId txn_id, int64_t tablet_id) {
309
0
    std::shared_lock<std::shared_mutex> rlock(_rwlock);
310
0
    TxnKey txn_key(txn_id, tablet_id);
311
0
    return _empty_rowset_markers.contains(txn_key);
312
0
}
313
314
0
void CloudTxnDeleteBitmapCache::_clean_thread_callback() {
315
0
    do {
316
0
        remove_expired_tablet_txn_info();
317
0
    } while (!_stop_latch.wait_for(
318
0
            std::chrono::seconds(config::remove_expired_tablet_txn_info_interval_seconds)));
319
0
}
320
321
} // namespace doris