be/src/util/bvar_windowed_adder.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <bthread/mutex.h> |
21 | | #include <bvar/bvar.h> |
22 | | #include <bvar/multi_dimension.h> |
23 | | #include <bvar/window.h> |
24 | | |
25 | | #include <cstdint> |
26 | | #include <list> |
27 | | #include <map> |
28 | | #include <memory> |
29 | | #include <mutex> |
30 | | #include <string> |
31 | | #include <utility> |
32 | | #include <vector> |
33 | | |
34 | | namespace doris { |
35 | | |
36 | | /** |
37 | | * Multi-dimension windowed adder. |
38 | | * |
39 | | * For each dimension value combination (e.g., job_id), automatically creates: |
40 | | * - A bvar::Adder (cumulative counter managed by MultiDimension) |
41 | | * - Multiple bvar::Window instances (sliding window views at different time scales) |
42 | | * |
43 | | * Windows are lazily created on first write to a dimension value. |
44 | | * |
45 | | * @example |
46 | | * MBvarWindowedAdder requested_seg_num( |
47 | | * "warmup_ed_requested_segment_num", |
48 | | * {"job_id"}, |
49 | | * {300, 1800, 7200} |
50 | | * ); |
51 | | * requested_seg_num.put({"13419"}, 1); |
52 | | */ |
53 | | class MBvarWindowedAdder { |
54 | | public: |
55 | | MBvarWindowedAdder(const std::string& name, const std::initializer_list<std::string>& dim_names, |
56 | | std::vector<int> window_seconds, bool expose = true) |
57 | 21 | : name_(name), |
58 | 21 | window_seconds_(std::move(window_seconds)), |
59 | 21 | md_total_(std::list<std::string>(dim_names)), |
60 | 21 | expose_(expose) { |
61 | 21 | if (expose_) { |
62 | 9 | md_total_.expose(name_ + "_total"); |
63 | 9 | } |
64 | 21 | } |
65 | | |
66 | 12 | void put(const std::initializer_list<std::string>& dim_values, int64_t value) { |
67 | 12 | auto* adder = md_total_.get_stats(std::list<std::string>(dim_values)); |
68 | 12 | if (!adder) return; |
69 | 12 | ensure_windows(dim_values, adder); |
70 | 12 | *adder << value; |
71 | 12 | } |
72 | | |
73 | | /** Get the current window value for the specified dimension and window index. */ |
74 | | int64_t get_window_value(const std::initializer_list<std::string>& dim_values, |
75 | 8 | size_t window_idx) { |
76 | 8 | std::lock_guard<bthread::Mutex> lock(mutex_); |
77 | 8 | auto it = dims_.find(make_key(dim_values)); |
78 | 8 | if (it == dims_.end() || window_idx >= it->second.windows.size()) { |
79 | 4 | return 0; |
80 | 4 | } |
81 | 4 | return it->second.windows[window_idx]->get_value(); |
82 | 8 | } |
83 | | |
84 | | /** Overload accepting a pre-built key string (e.g., "job_id,table_id"). */ |
85 | 4 | int64_t get_window_value(const std::string& dim_key, size_t window_idx) { |
86 | 4 | std::lock_guard<bthread::Mutex> lock(mutex_); |
87 | 4 | auto it = dims_.find(dim_key); |
88 | 4 | if (it == dims_.end() || window_idx >= it->second.windows.size()) { |
89 | 2 | return 0; |
90 | 2 | } |
91 | 2 | return it->second.windows[window_idx]->get_value(); |
92 | 4 | } |
93 | | |
94 | | /** List all dimension key strings that have been seen. */ |
95 | 4 | std::vector<std::string> list_dimensions() const { |
96 | 4 | std::lock_guard<bthread::Mutex> lock(mutex_); |
97 | 4 | std::vector<std::string> result; |
98 | 4 | result.reserve(dims_.size()); |
99 | 5 | for (auto& [key, _] : dims_) { |
100 | 5 | result.push_back(key); |
101 | 5 | } |
102 | 4 | return result; |
103 | 4 | } |
104 | | |
105 | 0 | void hide() { |
106 | 0 | std::lock_guard<bthread::Mutex> lock(mutex_); |
107 | 0 | if (!expose_) { |
108 | 0 | return; |
109 | 0 | } |
110 | 0 | expose_ = false; |
111 | 0 | md_total_.hide(); |
112 | 0 | for (auto& [_, entry] : dims_) { |
113 | 0 | for (auto& window : entry.windows) { |
114 | 0 | window->hide(); |
115 | 0 | } |
116 | 0 | } |
117 | 0 | } |
118 | | |
119 | | private: |
120 | | struct DimEntry { |
121 | | bvar::Adder<int64_t>* adder; // owned by MultiDimension |
122 | | std::vector<std::unique_ptr<bvar::Window<bvar::Adder<int64_t>>>> windows; |
123 | | }; |
124 | | |
125 | | void ensure_windows(const std::initializer_list<std::string>& dim_values, |
126 | 12 | bvar::Adder<int64_t>* adder) { |
127 | 12 | std::string key = make_key(dim_values); |
128 | 12 | std::lock_guard<bthread::Mutex> lock(mutex_); |
129 | 12 | if (dims_.count(key)) return; |
130 | 9 | DimEntry entry; |
131 | 9 | entry.adder = adder; |
132 | 11 | for (int ws : window_seconds_) { |
133 | 11 | if (expose_) { |
134 | 11 | std::string wname = name_ + "_" + std::to_string(ws) + "s_" + key; |
135 | 11 | entry.windows.emplace_back( |
136 | 11 | std::make_unique<bvar::Window<bvar::Adder<int64_t>>>(wname, adder, ws)); |
137 | 11 | } else { |
138 | 0 | entry.windows.emplace_back( |
139 | 0 | std::make_unique<bvar::Window<bvar::Adder<int64_t>>>(adder, ws)); |
140 | 0 | } |
141 | 11 | } |
142 | 9 | dims_[key] = std::move(entry); |
143 | 9 | } |
144 | | |
145 | 20 | static std::string make_key(const std::initializer_list<std::string>& dim_values) { |
146 | 20 | std::string result; |
147 | 21 | for (auto& v : dim_values) { |
148 | 21 | if (!result.empty()) result += ","; |
149 | 21 | result += v; |
150 | 21 | } |
151 | 20 | return result; |
152 | 20 | } |
153 | | |
154 | | std::string name_; |
155 | | std::vector<int> window_seconds_; |
156 | | bvar::MultiDimension<bvar::Adder<int64_t>> md_total_; |
157 | | bool expose_; |
158 | | mutable bthread::Mutex mutex_; |
159 | | std::map<std::string, DimEntry> dims_; |
160 | | }; |
161 | | |
162 | | } // namespace doris |