Coverage Report

Created: 2026-03-15 17:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_cluster_info.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <atomic>
21
#include <condition_variable>
22
#include <mutex>
23
#include <shared_mutex>
24
#include <string>
25
#include <thread>
26
#include <unordered_map>
27
28
#include "runtime/cluster_info.h"
29
30
namespace doris {
31
32
class CloudTablet;
33
34
// Cached cluster status information
35
struct ClusterStatusCache {
36
    int32_t status {0};   // ClusterStatus enum value
37
    int64_t mtime_ms {0}; // Timestamp when status was last changed
38
};
39
40
class CloudClusterInfo : public ClusterInfo {
41
public:
42
    ~CloudClusterInfo();
43
44
6
    bool is_in_standby() const { return _is_in_standby; }
45
2
    void set_is_in_standby(bool flag) { _is_in_standby = flag; }
46
47
    // Get this BE's cluster ID
48
16
    std::string my_cluster_id() const {
49
16
        std::shared_lock lock(_mutex);
50
16
        return _my_cluster_id;
51
16
    }
52
12
    void set_my_cluster_id(const std::string& id) {
53
12
        std::unique_lock lock(_mutex);
54
12
        _my_cluster_id = id;
55
12
    }
56
57
    // Get cached cluster status, returns false if not found
58
13
    bool get_cluster_status(const std::string& id, ClusterStatusCache* cache) const {
59
13
        std::shared_lock lock(_mutex);
60
13
        auto it = _cluster_status_cache.find(id);
61
13
        if (it != _cluster_status_cache.end()) {
62
9
            *cache = it->second;
63
9
            return true;
64
9
        }
65
4
        return false;
66
13
    }
67
68
    // Update cluster status cache
69
8
    void set_cluster_status(const std::string& id, int32_t status, int64_t mtime_ms) {
70
8
        std::unique_lock lock(_mutex);
71
8
        _cluster_status_cache[id] = {status, mtime_ms};
72
8
    }
73
74
    // Clear all cached cluster status
75
1
    void clear_cluster_status_cache() {
76
1
        std::unique_lock lock(_mutex);
77
1
        _cluster_status_cache.clear();
78
1
    }
79
80
    // Start background refresh thread
81
    void start_bg_worker();
82
    // Stop background refresh thread
83
    void stop_bg_worker();
84
85
    // Check if this cluster should skip compaction for the given tablet
86
    // Returns true if should skip (i.e., another cluster should do the compaction)
87
    bool should_skip_compaction(CloudTablet* tablet) const;
88
89
private:
90
    void _bg_worker_func();
91
    void _refresh_cluster_status();
92
93
    bool _is_in_standby = false;
94
95
    mutable std::shared_mutex _mutex;
96
    std::string _my_cluster_id;
97
    std::unordered_map<std::string, ClusterStatusCache> _cluster_status_cache;
98
99
    // Background worker
100
    std::thread _bg_worker;
101
    std::atomic<bool> _bg_worker_stopped {true};
102
    std::mutex _bg_worker_mutex;
103
    std::condition_variable _bg_worker_cv;
104
};
105
106
} // namespace doris