Coverage Report

Created: 2026-03-18 22:15

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_txn_delete_bitmap_cache.h"
19
20
#include <fmt/core.h>
21
22
#include <chrono>
23
#include <memory>
24
#include <shared_mutex>
25
26
#include "cloud/config.h"
27
#include "common/status.h"
28
#include "cpp/sync_point.h"
29
#include "storage/olap_common.h"
30
#include "storage/rowset/rowset_fwd.h"
31
#include "storage/tablet/tablet_meta.h"
32
#include "storage/txn/txn_manager.h"
33
34
namespace doris {
35
36
CloudTxnDeleteBitmapCache::CloudTxnDeleteBitmapCache(size_t size_in_bytes)
37
0
        : LRUCachePolicy(CachePolicy::CacheType::CLOUD_TXN_DELETE_BITMAP_CACHE, size_in_bytes,
38
0
                         LRUCacheType::SIZE, /*stale_sweep_time_s*/ 86400, /*num_shards*/ 4,
39
0
                         /*element_count_capacity*/ 0, /*enable_prune*/ true,
40
0
                         /*is_lru_k*/ false),
41
0
          _stop_latch(1) {}
42
43
0
CloudTxnDeleteBitmapCache::~CloudTxnDeleteBitmapCache() {
44
0
    _stop_latch.count_down();
45
0
    _clean_thread->join();
46
0
}
47
48
0
Status CloudTxnDeleteBitmapCache::init() {
49
0
    auto st = Thread::create(
50
0
            "CloudTxnDeleteBitmapCache", "clean_txn_dbm_thread",
51
0
            [this]() { this->_clean_thread_callback(); }, &_clean_thread);
52
0
    if (!st.ok()) {
53
0
        LOG(WARNING) << "failed to create thread for CloudTxnDeleteBitmapCache, error: " << st;
54
0
    }
55
0
    return st;
56
0
}
57
58
Status CloudTxnDeleteBitmapCache::get_tablet_txn_info(
59
        TTransactionId transaction_id, int64_t tablet_id, RowsetSharedPtr* rowset,
60
        DeleteBitmapPtr* delete_bitmap, RowsetIdUnorderedSet* rowset_ids, int64_t* txn_expiration,
61
        std::shared_ptr<PartialUpdateInfo>* partial_update_info,
62
0
        std::shared_ptr<PublishStatus>* publish_status, TxnPublishInfo* previous_publish_info) {
63
0
    {
64
0
        std::shared_lock<std::shared_mutex> rlock(_rwlock);
65
0
        TxnKey key(transaction_id, tablet_id);
66
0
        DBUG_EXECUTE_IF("CloudTxnDeleteBitmapCache.get_tablet_txn_info.not_found", {
67
0
            return Status::Error<ErrorCode::NOT_FOUND>(
68
0
                    "not found txn info for test, tablet_id={}, transaction_id={}", tablet_id,
69
0
                    transaction_id);
70
0
        });
71
0
        auto iter = _txn_map.find(key);
72
0
        if (iter == _txn_map.end()) {
73
0
            return Status::Error<ErrorCode::NOT_FOUND, false>(
74
0
                    "not found txn info, tablet_id={}, transaction_id={}", tablet_id,
75
0
                    transaction_id);
76
0
        }
77
0
        *rowset = iter->second.rowset;
78
0
        *txn_expiration = iter->second.txn_expiration;
79
0
        *partial_update_info = iter->second.partial_update_info;
80
0
        *publish_status = iter->second.publish_status;
81
0
        *previous_publish_info = iter->second.publish_info;
82
0
    }
83
84
0
    auto st = get_delete_bitmap(transaction_id, tablet_id, delete_bitmap, rowset_ids, nullptr);
85
86
0
    if (st.is<ErrorCode::NOT_FOUND>()) {
87
        // Because of the rowset_ids become empty, all delete bitmap
88
        // will be recalculate in CalcDeleteBitmapTask
89
0
        if (delete_bitmap != nullptr) {
90
0
            *delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id);
91
0
        }
92
        // to avoid to skip calculating
93
0
        **publish_status = PublishStatus::INIT;
94
95
0
        return Status::OK();
96
0
    }
97
0
    return st;
98
0
}
99
100
Result<std::pair<RowsetSharedPtr, DeleteBitmapPtr>>
101
CloudTxnDeleteBitmapCache::get_rowset_and_delete_bitmap(TTransactionId transaction_id,
102
0
                                                        int64_t tablet_id) {
103
0
    RowsetSharedPtr rowset;
104
0
    {
105
0
        std::shared_lock<std::shared_mutex> rlock(_rwlock);
106
0
        TxnKey txn_key(transaction_id, tablet_id);
107
0
        if (_empty_rowset_markers.contains(txn_key)) {
108
0
            return std::make_pair(nullptr, nullptr);
109
0
        }
110
0
        auto iter = _txn_map.find(txn_key);
111
0
        if (iter == _txn_map.end()) {
112
0
            return ResultError(Status::InternalError<false>(""));
113
0
        }
114
0
        if (!(iter->second.publish_status &&
115
0
              *(iter->second.publish_status) == PublishStatus::SUCCEED)) {
116
0
            return ResultError(Status::InternalError<false>(""));
117
0
        }
118
0
        rowset = iter->second.rowset;
119
0
    }
120
121
0
    std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
122
0
    CacheKey key(key_str);
123
0
    Cache::Handle* handle = lookup(key);
124
125
0
    DBUG_EXECUTE_IF("CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss", {
126
0
        handle = nullptr;
127
0
        LOG(INFO) << "CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss, make cache missed "
128
0
                     "when get delete bitmap, txn_id:"
129
0
                  << transaction_id << ", tablet_id: " << tablet_id;
130
0
    });
131
0
    DeleteBitmapCacheValue* val =
132
0
            handle == nullptr ? nullptr : reinterpret_cast<DeleteBitmapCacheValue*>(value(handle));
133
0
    if (!val) {
134
0
        return ResultError(Status::InternalError<false>(""));
135
0
    }
136
0
    Defer defer {[this, handle] { release(handle); }};
137
0
    return std::make_pair(rowset, val->delete_bitmap);
138
0
}
139
140
Status CloudTxnDeleteBitmapCache::get_delete_bitmap(
141
        TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr* delete_bitmap,
142
0
        RowsetIdUnorderedSet* rowset_ids, std::shared_ptr<PublishStatus>* publish_status) {
143
0
    if (publish_status) {
144
0
        std::shared_lock<std::shared_mutex> rlock(_rwlock);
145
0
        TxnKey txn_key(transaction_id, tablet_id);
146
0
        auto iter = _txn_map.find(txn_key);
147
0
        if (iter == _txn_map.end()) {
148
0
            return Status::Error<ErrorCode::NOT_FOUND, false>(
149
0
                    "not found txn info, tablet_id={}, transaction_id={}", tablet_id,
150
0
                    transaction_id);
151
0
        }
152
0
        *publish_status = iter->second.publish_status;
153
0
    }
154
0
    std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
155
0
    CacheKey key(key_str);
156
0
    Cache::Handle* handle = lookup(key);
157
158
0
    DBUG_EXECUTE_IF("CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss", {
159
0
        handle = nullptr;
160
0
        LOG(INFO) << "CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss, make cache missed "
161
0
                     "when get delete bitmap, txn_id:"
162
0
                  << transaction_id << ", tablet_id: " << tablet_id;
163
0
    });
164
165
0
    DeleteBitmapCacheValue* val =
166
0
            handle == nullptr ? nullptr : reinterpret_cast<DeleteBitmapCacheValue*>(value(handle));
167
0
    if (val) {
168
0
        *delete_bitmap = val->delete_bitmap;
169
0
        if (rowset_ids) {
170
0
            *rowset_ids = val->rowset_ids;
171
0
        }
172
        // must call release handle to reduce the reference count,
173
        // otherwise there will be memory leak
174
0
        release(handle);
175
0
    } else {
176
0
        LOG_INFO("cache missed when get delete bitmap")
177
0
                .tag("txn_id", transaction_id)
178
0
                .tag("tablet_id", tablet_id);
179
0
        return Status::Error<ErrorCode::NOT_FOUND, false>(
180
0
                "cache missed when get delete bitmap, tablet_id={}, transaction_id={}", tablet_id,
181
0
                transaction_id);
182
0
    }
183
0
    return Status::OK();
184
0
}
185
186
void CloudTxnDeleteBitmapCache::set_tablet_txn_info(
187
        TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr delete_bitmap,
188
        const RowsetIdUnorderedSet& rowset_ids, RowsetSharedPtr rowset, int64_t txn_expiration,
189
0
        std::shared_ptr<PartialUpdateInfo> partial_update_info) {
190
0
    int64_t txn_expiration_min =
191
0
            duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
192
0
                    .count() +
193
0
            config::tablet_txn_info_min_expired_seconds;
194
0
    txn_expiration = std::max(txn_expiration_min, txn_expiration);
195
0
    {
196
0
        std::unique_lock<std::shared_mutex> wlock(_rwlock);
197
0
        TxnKey txn_key(transaction_id, tablet_id);
198
0
        std::shared_ptr<PublishStatus> publish_status =
199
0
                std::make_shared<PublishStatus>(PublishStatus::INIT);
200
0
        _txn_map[txn_key] = TxnVal(rowset, txn_expiration, std::move(partial_update_info),
201
0
                                   std::move(publish_status));
202
0
        _expiration_txn.emplace(txn_expiration, txn_key);
203
0
    }
204
0
    std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
205
0
    CacheKey key(key_str);
206
207
0
    auto val = new DeleteBitmapCacheValue(delete_bitmap, rowset_ids);
208
0
    size_t charge = sizeof(DeleteBitmapCacheValue);
209
0
    for (auto& [k, v] : val->delete_bitmap->delete_bitmap) {
210
0
        charge += v.getSizeInBytes();
211
0
    }
212
0
    auto* handle = insert(key, val, charge, charge, CachePriority::NORMAL);
213
    // must call release handle to reduce the reference count,
214
    // otherwise there will be memory leak
215
0
    release(handle);
216
0
    LOG_INFO("set txn related delete bitmap")
217
0
            .tag("txn_id", transaction_id)
218
0
            .tag("expiration", txn_expiration)
219
0
            .tag("tablet_id", tablet_id)
220
0
            .tag("delete_bitmap_size", charge)
221
0
            .tag("delete_bitmap_count", delete_bitmap->get_delete_bitmap_count())
222
0
            .tag("delete_bitmap_cardinality", delete_bitmap->cardinality());
223
0
}
224
225
Status CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId transaction_id,
226
                                                         int64_t tablet_id,
227
                                                         DeleteBitmapPtr delete_bitmap,
228
                                                         const RowsetIdUnorderedSet& rowset_ids,
229
                                                         PublishStatus publish_status,
230
0
                                                         TxnPublishInfo publish_info) {
231
0
    {
232
0
        std::unique_lock<std::shared_mutex> wlock(_rwlock);
233
0
        TxnKey txn_key(transaction_id, tablet_id);
234
0
        if (!_txn_map.contains(txn_key)) {
235
0
            return Status::Error<ErrorCode::NOT_FOUND, false>(
236
0
                    "not found txn info, tablet_id={}, transaction_id={}, may be expired and be "
237
0
                    "removed",
238
0
                    tablet_id, transaction_id);
239
0
        }
240
0
        TxnVal& txn_val = _txn_map[txn_key];
241
0
        *(txn_val.publish_status) = publish_status;
242
0
        if (publish_status == PublishStatus::SUCCEED) {
243
0
            txn_val.publish_info = publish_info;
244
0
        }
245
0
    }
246
0
    std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
247
0
    CacheKey key(key_str);
248
249
0
    auto val = new DeleteBitmapCacheValue(delete_bitmap, rowset_ids);
250
0
    size_t charge = sizeof(DeleteBitmapCacheValue);
251
0
    for (auto& [k, v] : val->delete_bitmap->delete_bitmap) {
252
0
        charge += v.getSizeInBytes();
253
0
    }
254
0
    auto* handle = insert(key, val, charge, charge, CachePriority::NORMAL);
255
    // must call release handle to reduce the reference count,
256
    // otherwise there will be memory leak
257
0
    release(handle);
258
0
    if (config::enable_mow_verbose_log) {
259
0
        LOG_INFO("update txn related delete bitmap")
260
0
                .tag("txn_id", transaction_id)
261
0
                .tag("tablt_id", tablet_id)
262
0
                .tag("delete_bitmap_size", charge)
263
0
                .tag("delete_bitmap_count", delete_bitmap->get_delete_bitmap_count())
264
0
                .tag("delete_bitmap_cardinality", delete_bitmap->cardinality())
265
0
                .tag("publish_status", static_cast<int>(publish_status));
266
0
    }
267
0
    return Status::OK();
268
0
}
269
270
0
void CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info() {
271
0
    TEST_SYNC_POINT_RETURN_WITH_VOID("CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info");
272
0
    std::unique_lock<std::shared_mutex> wlock(_rwlock);
273
0
    while (!_expiration_txn.empty()) {
274
0
        auto iter = _expiration_txn.begin();
275
0
        bool in_txn_map = _txn_map.find(iter->second) != _txn_map.end();
276
0
        bool in_markers = _empty_rowset_markers.find(iter->second) != _empty_rowset_markers.end();
277
0
        if (!in_txn_map && !in_markers) {
278
0
            _expiration_txn.erase(iter);
279
0
            continue;
280
0
        }
281
0
        int64_t current_time = duration_cast<std::chrono::seconds>(
282
0
                                       std::chrono::system_clock::now().time_since_epoch())
283
0
                                       .count();
284
0
        if (iter->first > current_time) {
285
0
            break;
286
0
        }
287
        // Clean from _txn_map if exists
288
0
        auto txn_iter = _txn_map.find(iter->second);
289
0
        if ((txn_iter != _txn_map.end()) && (iter->first == txn_iter->second.txn_expiration)) {
290
0
            LOG_INFO("clean expired delete bitmap")
291
0
                    .tag("txn_id", txn_iter->first.txn_id)
292
0
                    .tag("expiration", txn_iter->second.txn_expiration)
293
0
                    .tag("tablt_id", txn_iter->first.tablet_id);
294
0
            std::string key_str = std::to_string(txn_iter->first.txn_id) + "/" +
295
0
                                  std::to_string(txn_iter->first.tablet_id); // Cache key container
296
0
            CacheKey cache_key(key_str);
297
0
            erase(cache_key);
298
0
            _txn_map.erase(iter->second);
299
0
        }
300
        // Clean from _empty_rowset_markers if exists
301
0
        auto marker_iter = _empty_rowset_markers.find(iter->second);
302
0
        if (marker_iter != _empty_rowset_markers.end()) {
303
0
            LOG_INFO("clean expired empty rowset marker")
304
0
                    .tag("txn_id", iter->second.txn_id)
305
0
                    .tag("tablet_id", iter->second.tablet_id);
306
0
            _empty_rowset_markers.erase(marker_iter);
307
0
        }
308
0
        _expiration_txn.erase(iter);
309
0
    }
310
0
}
311
312
void CloudTxnDeleteBitmapCache::remove_unused_tablet_txn_info(TTransactionId transaction_id,
313
0
                                                              int64_t tablet_id) {
314
0
    std::unique_lock<std::shared_mutex> wlock(_rwlock);
315
0
    TxnKey txn_key(transaction_id, tablet_id);
316
0
    auto txn_iter = _txn_map.find(txn_key);
317
0
    if (txn_iter != _txn_map.end()) {
318
0
        LOG_INFO("remove unused tablet txn info")
319
0
                .tag("txn_id", txn_iter->first.txn_id)
320
0
                .tag("tablt_id", txn_iter->first.tablet_id);
321
0
        std::string key_str = std::to_string(txn_iter->first.txn_id) + "/" +
322
0
                              std::to_string(txn_iter->first.tablet_id); // Cache key container
323
0
        CacheKey cache_key(key_str);
324
0
        erase(cache_key);
325
0
        _txn_map.erase(txn_key);
326
0
    }
327
0
}
328
329
void CloudTxnDeleteBitmapCache::mark_empty_rowset(TTransactionId txn_id, int64_t tablet_id,
330
0
                                                  int64_t txn_expiration) {
331
0
    int64_t txn_expiration_min =
332
0
            duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
333
0
                    .count() +
334
0
            config::tablet_txn_info_min_expired_seconds;
335
0
    txn_expiration = std::max(txn_expiration_min, txn_expiration);
336
337
0
    if (config::enable_mow_verbose_log) {
338
0
        LOG_INFO("mark empty rowset")
339
0
                .tag("txn_id", txn_id)
340
0
                .tag("tablet_id", tablet_id)
341
0
                .tag("expiration", txn_expiration);
342
0
    }
343
0
    std::unique_lock<std::shared_mutex> wlock(_rwlock);
344
0
    TxnKey txn_key(txn_id, tablet_id);
345
0
    _empty_rowset_markers.emplace(txn_key);
346
0
    _expiration_txn.emplace(txn_expiration, txn_key);
347
0
}
348
349
0
bool CloudTxnDeleteBitmapCache::is_empty_rowset(TTransactionId txn_id, int64_t tablet_id) {
350
0
    std::shared_lock<std::shared_mutex> rlock(_rwlock);
351
0
    TxnKey txn_key(txn_id, tablet_id);
352
0
    return _empty_rowset_markers.contains(txn_key);
353
0
}
354
355
0
void CloudTxnDeleteBitmapCache::_clean_thread_callback() {
356
0
    do {
357
0
        remove_expired_tablet_txn_info();
358
0
    } while (!_stop_latch.wait_for(
359
0
            std::chrono::seconds(config::remove_expired_tablet_txn_info_interval_seconds)));
360
0
}
361
362
} // namespace doris