Coverage Report

Created: 2026-06-01 14:24

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/bvar_windowed_adder.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <bthread/mutex.h>
21
#include <bvar/bvar.h>
22
#include <bvar/multi_dimension.h>
23
#include <bvar/window.h>
24
25
#include <cstdint>
26
#include <list>
27
#include <map>
28
#include <memory>
29
#include <mutex>
30
#include <string>
31
#include <utility>
32
#include <vector>
33
34
namespace doris {
35
36
/**
37
 * Multi-dimension windowed adder.
38
 *
39
 * For each dimension value combination (e.g., job_id), automatically creates:
40
 *   - A bvar::Adder (cumulative counter managed by MultiDimension)
41
 *   - Multiple bvar::Window instances (sliding window views at different time scales)
42
 *
43
 * Windows are lazily created on first write to a dimension value.
44
 *
45
 * @example
46
 *   MBvarWindowedAdder requested_seg_num(
47
 *       "warmup_ed_requested_segment_num",
48
 *       {"job_id"},
49
 *       {300, 1800, 7200}
50
 *   );
51
 *   requested_seg_num.put({"13419"}, 1);
52
 */
53
class MBvarWindowedAdder {
54
public:
55
    MBvarWindowedAdder(const std::string& name, const std::initializer_list<std::string>& dim_names,
56
                       std::vector<int> window_seconds, bool expose = true)
57
21
            : name_(name),
58
21
              window_seconds_(std::move(window_seconds)),
59
21
              md_total_(std::list<std::string>(dim_names)),
60
21
              expose_(expose) {
61
21
        if (expose_) {
62
9
            md_total_.expose(name_ + "_total");
63
9
        }
64
21
    }
65
66
12
    void put(const std::initializer_list<std::string>& dim_values, int64_t value) {
67
12
        auto* adder = md_total_.get_stats(std::list<std::string>(dim_values));
68
12
        if (!adder) return;
69
12
        ensure_windows(dim_values, adder);
70
12
        *adder << value;
71
12
    }
72
73
    /** Get the current window value for the specified dimension and window index. */
74
    int64_t get_window_value(const std::initializer_list<std::string>& dim_values,
75
8
                             size_t window_idx) {
76
8
        std::lock_guard<bthread::Mutex> lock(mutex_);
77
8
        auto it = dims_.find(make_key(dim_values));
78
8
        if (it == dims_.end() || window_idx >= it->second.windows.size()) {
79
4
            return 0;
80
4
        }
81
4
        return it->second.windows[window_idx]->get_value();
82
8
    }
83
84
    /** Overload accepting a pre-built key string (e.g., "job_id,table_id"). */
85
4
    int64_t get_window_value(const std::string& dim_key, size_t window_idx) {
86
4
        std::lock_guard<bthread::Mutex> lock(mutex_);
87
4
        auto it = dims_.find(dim_key);
88
4
        if (it == dims_.end() || window_idx >= it->second.windows.size()) {
89
2
            return 0;
90
2
        }
91
2
        return it->second.windows[window_idx]->get_value();
92
4
    }
93
94
    /** List all dimension key strings that have been seen. */
95
4
    std::vector<std::string> list_dimensions() const {
96
4
        std::lock_guard<bthread::Mutex> lock(mutex_);
97
4
        std::vector<std::string> result;
98
4
        result.reserve(dims_.size());
99
5
        for (auto& [key, _] : dims_) {
100
5
            result.push_back(key);
101
5
        }
102
4
        return result;
103
4
    }
104
105
0
    void hide() {
106
0
        std::lock_guard<bthread::Mutex> lock(mutex_);
107
0
        if (!expose_) {
108
0
            return;
109
0
        }
110
0
        expose_ = false;
111
0
        md_total_.hide();
112
0
        for (auto& [_, entry] : dims_) {
113
0
            for (auto& window : entry.windows) {
114
0
                window->hide();
115
0
            }
116
0
        }
117
0
    }
118
119
private:
120
    struct DimEntry {
121
        bvar::Adder<int64_t>* adder; // owned by MultiDimension
122
        std::vector<std::unique_ptr<bvar::Window<bvar::Adder<int64_t>>>> windows;
123
    };
124
125
    void ensure_windows(const std::initializer_list<std::string>& dim_values,
126
12
                        bvar::Adder<int64_t>* adder) {
127
12
        std::string key = make_key(dim_values);
128
12
        std::lock_guard<bthread::Mutex> lock(mutex_);
129
12
        if (dims_.count(key)) return;
130
9
        DimEntry entry;
131
9
        entry.adder = adder;
132
11
        for (int ws : window_seconds_) {
133
11
            if (expose_) {
134
11
                std::string wname = name_ + "_" + std::to_string(ws) + "s_" + key;
135
11
                entry.windows.emplace_back(
136
11
                        std::make_unique<bvar::Window<bvar::Adder<int64_t>>>(wname, adder, ws));
137
11
            } else {
138
0
                entry.windows.emplace_back(
139
0
                        std::make_unique<bvar::Window<bvar::Adder<int64_t>>>(adder, ws));
140
0
            }
141
11
        }
142
9
        dims_[key] = std::move(entry);
143
9
    }
144
145
20
    static std::string make_key(const std::initializer_list<std::string>& dim_values) {
146
20
        std::string result;
147
21
        for (auto& v : dim_values) {
148
21
            if (!result.empty()) result += ",";
149
21
            result += v;
150
21
        }
151
20
        return result;
152
20
    }
153
154
    std::string name_;
155
    std::vector<int> window_seconds_;
156
    bvar::MultiDimension<bvar::Adder<int64_t>> md_total_;
157
    bool expose_;
158
    mutable bthread::Mutex mutex_;
159
    std::map<std::string, DimEntry> dims_;
160
};
161
162
} // namespace doris