/root/doris/cloud/src/common/metric.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "metric.h" |
19 | | |
20 | | #include <glog/logging.h> |
21 | | #include <rapidjson/document.h> |
22 | | #include <rapidjson/encodings.h> |
23 | | #include <rapidjson/error/en.h> |
24 | | |
25 | | #include <cstdint> |
26 | | #include <memory> |
27 | | #include <optional> |
28 | | #include <set> |
29 | | #include <string> |
30 | | #include <string_view> |
31 | | #include <unordered_map> |
32 | | #include <vector> |
33 | | |
34 | | #include "common/bvars.h" |
35 | | #include "common/logging.h" |
36 | | #include "meta-store/keys.h" |
37 | | #include "meta-store/txn_kv.h" |
38 | | #include "meta-store/txn_kv_error.h" |
39 | | |
40 | | namespace doris::cloud { |
41 | | extern std::set<std::string> get_key_prefix_contants(); |
42 | | |
43 | | // The format of the output is shown in "test/fdb_metric_example.json" |
44 | | static const std::string FDB_STATUS_KEY = "\xff\xff/status/json"; |
45 | | |
46 | 19 | static std::string get_fdb_status(TxnKv* txn_kv) { |
47 | 19 | std::unique_ptr<Transaction> txn; |
48 | 19 | TxnErrorCode err = txn_kv->create_txn(&txn); |
49 | 19 | if (err != TxnErrorCode::TXN_OK) { |
50 | 0 | LOG(WARNING) << "failed to create_txn, err=" << err; |
51 | 0 | return ""; |
52 | 0 | } |
53 | 19 | std::string status_val; |
54 | 19 | err = txn->get(FDB_STATUS_KEY, &status_val); |
55 | 19 | if (err != TxnErrorCode::TXN_OK) { |
56 | 9 | LOG(WARNING) << "failed to get FDB_STATUS_KEY, err=" << err; |
57 | 9 | return ""; |
58 | 9 | } |
59 | 10 | return status_val; |
60 | 19 | } |
61 | | |
62 | | // The format of fdb status details: |
63 | | // |
64 | | // Configuration: |
65 | | // Redundancy mode - double |
66 | | // Storage engine - ssd-2 |
67 | | // Coordinators - 3 |
68 | | // Usable Regions - 1 |
69 | | |
70 | | // Cluster: |
71 | | // FoundationDB processes - 15 |
72 | | // Zones - 3 |
73 | | // Machines - 3 |
74 | | // Memory availability - 2.9 GB per process on machine with least available |
75 | | // >>>>> (WARNING: 4.0 GB recommended) <<<<< |
76 | | // Retransmissions rate - 3 Hz |
77 | | // Fault Tolerance - 1 machines |
78 | | // Server time - 02/16/23 16:48:14 |
79 | | |
80 | | // Data: |
81 | | // Replication health - Healthy |
82 | | // Moving data - 0.000 GB |
83 | | // Sum of key-value sizes - 4.317 GB |
84 | | // Disk space used - 11.493 GB |
85 | | |
86 | | // Operating space: |
87 | | // Storage server - 462.8 GB free on most full server |
88 | | // Log server - 462.8 GB free on most full server |
89 | | |
90 | | // Workload: |
91 | | // Read rate - 84 Hz |
92 | | // Write rate - 4 Hz |
93 | | // Transactions started - 222 Hz |
94 | | // Transactions committed - 4 Hz |
95 | | // Conflict rate - 0 Hz |
96 | | |
97 | | // Backup and DR: |
98 | | // Running backups - 0 |
99 | | // Running DRs - 0 |
100 | | |
101 | 19 | static void export_fdb_status_details(const std::string& status_str) { |
102 | 19 | using namespace rapidjson; |
103 | 19 | Document document; |
104 | 19 | try { |
105 | 19 | document.Parse(status_str.c_str()); |
106 | 19 | if (document.HasParseError()) { |
107 | 9 | LOG(WARNING) << "fail to parse status str, err: " |
108 | 9 | << GetParseError_En(document.GetParseError()); |
109 | 9 | return; |
110 | 9 | } |
111 | 19 | } catch (std::exception& e) { |
112 | 0 | LOG(WARNING) << "fail to parse status str, err: " << e.what(); |
113 | 0 | return; |
114 | 0 | } |
115 | | |
116 | 10 | if (!document.HasMember("cluster") || !document.HasMember("client")) { |
117 | 2 | LOG(WARNING) << "err fdb status details"; |
118 | 2 | return; |
119 | 2 | } |
120 | 168 | auto get_value = [&](const std::vector<const char*>& v) -> int64_t { |
121 | 168 | if (v.empty()) return BVAR_FDB_INVALID_VALUE; |
122 | 168 | auto node = document.FindMember("cluster"); |
123 | 344 | for (const auto& name : v) { |
124 | 344 | if (!node->value.HasMember(name)) return BVAR_FDB_INVALID_VALUE; |
125 | 342 | node = node->value.FindMember(name); |
126 | 342 | } |
127 | 166 | if (node->value.IsInt64()) return node->value.GetInt64(); |
128 | 22 | if (node->value.IsDouble()) return static_cast<int64_t>(node->value.GetDouble()); |
129 | 22 | if (node->value.IsObject()) return node->value.MemberCount(); |
130 | 8 | if (node->value.IsArray()) return node->value.Size(); |
131 | 0 | return BVAR_FDB_INVALID_VALUE; |
132 | 8 | }; |
133 | 8 | auto get_string_value = [&](const std::vector<const char*>& v) -> std::string { |
134 | 8 | if (v.empty()) return "invalid"; |
135 | 8 | auto node = document.FindMember("cluster"); |
136 | 24 | for (const auto& name : v) { |
137 | 24 | if (!node->value.HasMember(name)) return "invalid"; |
138 | 24 | node = node->value.FindMember(name); |
139 | 24 | } |
140 | 8 | if (node->value.IsString()) return node->value.GetString(); |
141 | 0 | return "invalid"; |
142 | 8 | }; |
143 | 40 | auto get_nanoseconds = [&](const std::vector<const char*>& v) -> int64_t { |
144 | 40 | constexpr double NANOSECONDS = 1e9; |
145 | 40 | auto node = document.FindMember("cluster"); |
146 | 96 | for (const auto& name : v) { |
147 | 96 | if (!node->value.HasMember(name)) return BVAR_FDB_INVALID_VALUE; |
148 | 96 | node = node->value.FindMember(name); |
149 | 96 | } |
150 | 40 | if (node->value.IsInt64()) return node->value.GetInt64() * NANOSECONDS; |
151 | 32 | DCHECK(node->value.IsDouble()); |
152 | 32 | return static_cast<int64_t>(node->value.GetDouble() * NANOSECONDS); |
153 | 40 | }; |
154 | | // Configuration |
155 | 8 | g_bvar_fdb_configuration_coordinators_count.set_value( |
156 | 8 | get_value({"configuration", "coordinators_count"})); |
157 | 8 | g_bvar_fdb_configuration_usable_regions.set_value( |
158 | 8 | get_value({"configuration", "usable_regions"})); |
159 | | |
160 | | // Cluster |
161 | 8 | g_bvar_fdb_process_count.set_value(get_value({"processes"})); |
162 | 8 | g_bvar_fdb_machines_count.set_value(get_value({"machines"})); |
163 | 8 | g_bvar_fdb_fault_tolerance_count.set_value( |
164 | 8 | get_value({"fault_tolerance", "max_zone_failures_without_losing_data"})); |
165 | 8 | g_bvar_fdb_generation.set_value(get_value({"generation"})); |
166 | 8 | g_bvar_fdb_incompatible_connections.set_value(get_value({"incompatible_connections"})); |
167 | | |
168 | | // Data/Operating space |
169 | 8 | g_bvar_fdb_data_average_partition_size_bytes.set_value( |
170 | 8 | get_value({"data", "average_partition_size_bytes"})); |
171 | 8 | g_bvar_fdb_data_partition_count.set_value(get_value({"data", "partitions_count"})); |
172 | 8 | g_bvar_fdb_data_total_disk_used_bytes.set_value(get_value({"data", "total_disk_used_bytes"})); |
173 | 8 | g_bvar_fdb_data_total_kv_size_bytes.set_value(get_value({"data", "total_kv_size_bytes"})); |
174 | 8 | g_bvar_fdb_data_log_server_space_bytes.set_value( |
175 | 8 | get_value({"data", "least_operating_space_bytes_log_server"})); |
176 | 8 | g_bvar_fdb_data_storage_server_space_bytes.set_value( |
177 | 8 | get_value({"data", "least_operating_space_bytes_storage_server"})); |
178 | 8 | g_bvar_fdb_data_moving_data_highest_priority.set_value( |
179 | 8 | get_value({"data", "moving_data", "highest_priority"})); |
180 | 8 | g_bvar_fdb_data_moving_data_in_flight_bytes.set_value( |
181 | 8 | get_value({"data", "moving_data", "in_flight_bytes"})); |
182 | 8 | g_bvar_fdb_data_moving_data_in_queue_bytes.set_value( |
183 | 8 | get_value({"data", "moving_data", "in_queue_bytes"})); |
184 | 8 | g_bvar_fdb_data_moving_total_written_bytes.set_value( |
185 | 8 | get_value({"data", "moving_data", "total_written_bytes"})); |
186 | 8 | g_bvar_fdb_data_state_min_replicas_remaining.set_value( |
187 | 8 | get_value({"data", "state", "min_replicas_remaining"})); |
188 | | |
189 | | // Latency probe |
190 | 8 | g_bvar_fdb_latency_probe_transaction_start_ns.set_value( |
191 | 8 | get_nanoseconds({"latency_probe", "transaction_start_seconds"})); |
192 | 8 | g_bvar_fdb_latency_probe_commit_ns.set_value( |
193 | 8 | get_nanoseconds({"latency_probe", "commit_seconds"})); |
194 | 8 | g_bvar_fdb_latency_probe_read_ns.set_value(get_nanoseconds({"latency_probe", "read_seconds"})); |
195 | | |
196 | | // QOS |
197 | 8 | g_bvar_fdb_qos_worst_data_lag_storage_server_ns.set_value( |
198 | 8 | get_nanoseconds({"qos", "worst_data_lag_storage_server", "seconds"})); |
199 | 8 | g_bvar_fdb_qos_worst_durability_lag_storage_server_ns.set_value( |
200 | 8 | get_nanoseconds({"qos", "worst_durability_lag_storage_server", "seconds"})); |
201 | 8 | g_bvar_fdb_qos_worst_log_server_queue_bytes.set_value( |
202 | 8 | get_value({"qos", "worst_queue_bytes_log_server"})); |
203 | 8 | g_bvar_fdb_qos_worst_storage_server_queue_bytes.set_value( |
204 | 8 | get_value({"qos", "worst_queue_bytes_storage_server"})); |
205 | | |
206 | | // Backup and DR |
207 | | |
208 | | // Performance Limited By |
209 | | // invalid or not-workload, the final value is -1 |
210 | 8 | int64_t performance_val = |
211 | 8 | get_string_value({"qos", "performance_limited_by", "name"}) == "workload" ? 0 : -1; |
212 | 8 | g_bvar_fdb_performance_limited_by_name.set_value(performance_val); |
213 | | |
214 | | // Client Count |
215 | 8 | g_bvar_fdb_client_count.set_value(get_value({"clients", "count"})); |
216 | | |
217 | | // Coordinators Unreachable Count |
218 | 8 | auto unreachable_count = 0; |
219 | 8 | if (auto node = document.FindMember("client"); node->value.HasMember("coordinators")) { |
220 | 8 | if (node = node->value.FindMember("coordinators"); node->value.HasMember("coordinators")) { |
221 | 8 | if (node = node->value.FindMember("coordinators"); node->value.IsArray()) { |
222 | 24 | for (const auto& c : node->value.GetArray()) { |
223 | 24 | if (c.HasMember("reachable") && c.FindMember("reachable")->value.IsBool() && |
224 | 24 | !c.FindMember("reachable")->value.GetBool()) { |
225 | 0 | ++unreachable_count; |
226 | 0 | } |
227 | 24 | } |
228 | 8 | g_bvar_fdb_coordinators_unreachable_count.set_value(unreachable_count); |
229 | 8 | } |
230 | 8 | } |
231 | 8 | } |
232 | | |
233 | | // Helper function for recursive name construction |
234 | | // such as: {"disk": {"reads": {"counter": 123, "hz": 0}}} |
235 | | // component is "disk", the names of these two values should be "reads_counter" and "reads_hz" |
236 | 8 | auto recursive_name_helper = [](std::string& origin_name, |
237 | 1.10k | const char* next_level_name) -> std::string { |
238 | 1.10k | return origin_name + '_' + next_level_name; |
239 | 1.10k | }; |
240 | | |
241 | | // Generic recursive function to traverse JSON node and set bvar values |
242 | | // There are three cases here: int64, double, and object. |
243 | | // If it is double or int64, put it directly into the bvar. |
244 | | // If it is an object, recursively obtain the full name and corresponding value. |
245 | 8 | auto recursive_traverse_and_set = [&recursive_name_helper](auto&& set_value_callback, |
246 | 8 | auto&& self, std::string name, |
247 | 2.55k | auto temp_node) -> void { |
248 | | // if the node is an object, then get Member(iter) and recursive with iter as arg |
249 | 2.55k | if (temp_node->value.IsObject()) { |
250 | 1.47k | for (auto iter = temp_node->value.MemberBegin(); iter != temp_node->value.MemberEnd(); |
251 | 1.10k | iter++) { |
252 | 1.10k | self(set_value_callback, self, recursive_name_helper(name, iter->name.GetString()), |
253 | 1.10k | iter); |
254 | 1.10k | } |
255 | 368 | return; |
256 | 368 | } |
257 | | // if not object, set bvar value |
258 | 2.18k | set_value_callback(name, temp_node); |
259 | 2.18k | }; metric.cpp:_ZZN5doris5cloudL25export_fdb_status_detailsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEENK3$_0clIRZZNS0_L25export_fdb_status_detailsES8_ENK3$_1clES6_EUlRS6_RN9rapidjson21GenericMemberIteratorILb0ENSD_4UTF8IcEENSD_19MemoryPoolAllocatorINSD_12CrtAllocatorEEEEEE_RS9_SK_EEvOT_OT0_S6_T1_ Line | Count | Source | 247 | 2.04k | auto temp_node) -> void { | 248 | | // if the node is an object, then get Member(iter) and recursive with iter as arg | 249 | 2.04k | if (temp_node->value.IsObject()) { | 250 | 960 | for (auto iter = temp_node->value.MemberBegin(); iter != temp_node->value.MemberEnd(); | 251 | 720 | iter++) { | 252 | 720 | self(set_value_callback, self, recursive_name_helper(name, iter->name.GetString()), | 253 | 720 | iter); | 254 | 720 | } | 255 | 240 | return; | 256 | 240 | } | 257 | | // if not object, set bvar value | 258 | 1.80k | set_value_callback(name, temp_node); | 259 | 1.80k | }; |
metric.cpp:_ZZN5doris5cloudL25export_fdb_status_detailsERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEENK3$_0clIRZZNS0_L25export_fdb_status_detailsES8_ENK3$_2clES6_EUlRS6_RN9rapidjson21GenericMemberIteratorILb0ENSD_4UTF8IcEENSD_19MemoryPoolAllocatorINSD_12CrtAllocatorEEEEEE_RS9_SK_EEvOT_OT0_S6_T1_ Line | Count | Source | 247 | 512 | auto temp_node) -> void { | 248 | | // if the node is an object, then get Member(iter) and recursive with iter as arg | 249 | 512 | if (temp_node->value.IsObject()) { | 250 | 512 | for (auto iter = temp_node->value.MemberBegin(); iter != temp_node->value.MemberEnd(); | 251 | 384 | iter++) { | 252 | 384 | self(set_value_callback, self, recursive_name_helper(name, iter->name.GetString()), | 253 | 384 | iter); | 254 | 384 | } | 255 | 128 | return; | 256 | 128 | } | 257 | | // if not object, set bvar value | 258 | 384 | set_value_callback(name, temp_node); | 259 | 384 | }; |
|
260 | | |
261 | 24 | auto get_process_metric = [&](std::string component) { |
262 | 24 | auto node = document.FindMember("cluster"); |
263 | 24 | if (!node->value.HasMember("processes")) return; |
264 | 24 | node = node->value.FindMember("processes"); |
265 | | // process |
266 | 384 | for (auto process_node = node->value.MemberBegin(); process_node != node->value.MemberEnd(); |
267 | 360 | process_node++) { |
268 | 360 | const char* process_id = process_node->name.GetString(); |
269 | 360 | decltype(process_node) component_node; |
270 | | // get component iter |
271 | 360 | if (!process_node->value.HasMember(component.data())) continue; |
272 | 360 | component_node = process_node->value.FindMember(component.data()); |
273 | | |
274 | | // set_bvar_value is responsible for setting integer and float values to the corresponding bvar. |
275 | 360 | auto set_bvar_value = [&process_id, &component]( |
276 | 360 | std::string& name, |
277 | 1.80k | decltype(process_node)& temp_node) -> void { |
278 | 1.80k | if (temp_node->value.IsInt64()) { |
279 | 1.44k | g_bvar_fdb_cluster_processes.put( |
280 | 1.44k | {process_id, component, name}, |
281 | 1.44k | static_cast<double>(temp_node->value.GetInt64())); |
282 | 1.44k | return; |
283 | 1.44k | } |
284 | 360 | if (temp_node->value.IsDouble()) { |
285 | 360 | g_bvar_fdb_cluster_processes.put({process_id, component, name}, |
286 | 360 | temp_node->value.GetDouble()); |
287 | 360 | return; |
288 | 360 | } |
289 | 0 | LOG(WARNING) << fmt::format( |
290 | 0 | "Get process metrics set_bvar_value input a wrong type node {}", name); |
291 | 0 | }; |
292 | | // Note that the parameter passed to set_bvar_value here is the current node, not its Member |
293 | | // so we can directly call recursive_traverse_and_set in the loop |
294 | 360 | for (auto metric_node = component_node->value.MemberBegin(); |
295 | 1.68k | metric_node != component_node->value.MemberEnd(); metric_node++) { |
296 | 1.32k | recursive_traverse_and_set(set_bvar_value, recursive_traverse_and_set, |
297 | 1.32k | metric_node->name.GetString(), metric_node); |
298 | 1.32k | } |
299 | 360 | } |
300 | 24 | }; |
301 | | |
302 | 32 | auto get_workload_metric = [&](std::string component) { |
303 | 32 | auto node = document.FindMember("cluster"); |
304 | 32 | if (!node->value.HasMember("workload")) return; |
305 | 32 | node = node->value.FindMember("workload"); |
306 | | |
307 | 32 | if (!node->value.HasMember(component.data())) return; |
308 | 32 | auto component_node = node->value.FindMember(component.data()); |
309 | | |
310 | | // set_bvar_value is responsible for setting integer and float values to the corresponding bvar. |
311 | 32 | auto set_bvar_value = [&component](std::string& name, |
312 | 384 | decltype(component_node)& temp_node) -> void { |
313 | 384 | if (temp_node->value.IsInt64()) { |
314 | 240 | g_bvar_fdb_cluster_workload.put({component, name}, |
315 | 240 | static_cast<double>(temp_node->value.GetInt64())); |
316 | 240 | return; |
317 | 240 | } |
318 | 144 | if (temp_node->value.IsDouble()) { |
319 | 144 | g_bvar_fdb_cluster_workload.put({component, name}, temp_node->value.GetDouble()); |
320 | 144 | return; |
321 | 144 | } |
322 | 0 | LOG(WARNING) << fmt::format( |
323 | 0 | "Get workload metrics set_bvar_value input a wrong type node {}", name); |
324 | 0 | }; |
325 | | |
326 | | // Reuse the common recursive_traverse_and_set function |
327 | 32 | for (auto metric_node = component_node->value.MemberBegin(); |
328 | 160 | metric_node != component_node->value.MemberEnd(); metric_node++) { |
329 | 128 | recursive_traverse_and_set(set_bvar_value, recursive_traverse_and_set, |
330 | 128 | metric_node->name.GetString(), metric_node); |
331 | 128 | } |
332 | 32 | }; |
333 | | |
334 | | // Process Status |
335 | 8 | get_process_metric("cpu"); |
336 | 8 | get_process_metric("disk"); |
337 | 8 | get_process_metric("memory"); |
338 | | |
339 | | // Workload Status |
340 | 8 | get_workload_metric("keys"); |
341 | 8 | get_workload_metric("bytes"); |
342 | 8 | get_workload_metric("operations"); |
343 | 8 | get_workload_metric("transactions"); |
344 | 8 | } |
345 | | |
346 | | // boundaries include the key category{meta, txn, recycle...}, instance_id and sub_category{rowset, txn_label...} |
347 | | // encode look like |
348 | | // 0x01 "txn" ${instance_id} "txn_label" ${db_id} ${label} |
349 | | // 0x01 "meta" ${instance_id} "rowset" ${tablet_id} ${version} |
350 | | // the func count same key to hashmap kv_range_count |
351 | | // exmaple: |
352 | | // kv_range_boundaries: meta|instance1|rowset|..., meta|instance1|rowset|..., meta|instance2|rowset|..., txn|instance1|txn_label|... |
353 | | // kv_range_count output: <meta|instance1|rowset, 2>, <meta|instance2|rowset, 1>, <txn|instance1|txn_label, 1> |
354 | | void get_kv_range_boundaries_count(std::vector<std::string>& kv_range_boundaries, |
355 | 0 | std::unordered_map<std::string, size_t>& kv_range_count) { |
356 | 0 | size_t prefix_size = FdbTxnKv::fdb_partition_key_prefix().size(); |
357 | 0 | for (auto&& boundary : kv_range_boundaries) { |
358 | 0 | if (boundary.size() < prefix_size + 1 || boundary[prefix_size] != CLOUD_USER_KEY_SPACE01) { |
359 | 0 | continue; |
360 | 0 | } |
361 | | |
362 | 0 | std::string_view user_key(boundary); |
363 | 0 | user_key.remove_prefix(prefix_size + 1); // Skip the KEY_SPACE prefix. |
364 | 0 | std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out; |
365 | 0 | decode_key(&user_key, &out); // ignore any error, since the boundary key might be truncated. |
366 | |
|
367 | 0 | auto visitor = [](auto&& arg) -> std::string { |
368 | 0 | using T = std::decay_t<decltype(arg)>; |
369 | 0 | if constexpr (std::is_same_v<T, std::string>) { |
370 | 0 | return arg; |
371 | 0 | } else { |
372 | 0 | return std::to_string(arg); |
373 | 0 | } |
374 | 0 | }; Unexecuted instantiation: metric.cpp:_ZZN5doris5cloud29get_kv_range_boundaries_countERSt6vectorINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESaIS7_EERSt13unordered_mapIS7_mSt4hashIS7_ESt8equal_toIS7_ESaISt4pairIKS7_mEEEENK3$_0clIRlEES7_OT_ Unexecuted instantiation: metric.cpp:_ZZN5doris5cloud29get_kv_range_boundaries_countERSt6vectorINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESaIS7_EERSt13unordered_mapIS7_mSt4hashIS7_ESt8equal_toIS7_ESaISt4pairIKS7_mEEEENK3$_0clIRS7_EES7_OT_ |
375 | |
|
376 | 0 | if (!out.empty()) { |
377 | 0 | std::string key; |
378 | | // whatever the boundary's category have similar encode part: |
379 | | // category, instance_id, sub_category |
380 | | // we can distinguish boundary using the three parts |
381 | | // some boundaries do not contain all three parts, so restrictions based on size are also necessary |
382 | 0 | for (size_t i = 0; i < 3 && i < out.size(); ++i) { |
383 | 0 | key += std::visit(visitor, std::get<0>(out[i])) + '|'; |
384 | 0 | } |
385 | 0 | key.pop_back(); |
386 | 0 | kv_range_count[key]++; |
387 | 0 | } |
388 | 0 | } |
389 | 0 | } |
390 | | |
391 | 19 | static void export_fdb_kv_ranges_details(TxnKv* kv) { |
392 | 19 | auto* txn_kv = dynamic_cast<FdbTxnKv*>(kv); |
393 | 19 | if (!txn_kv) { |
394 | 19 | LOG(WARNING) << "this method only support fdb txn kv"; |
395 | 19 | return; |
396 | 19 | } |
397 | | |
398 | 0 | std::vector<std::string> partition_boundaries; |
399 | 0 | TxnErrorCode code = txn_kv->get_partition_boundaries(&partition_boundaries); |
400 | 0 | if (code != TxnErrorCode::TXN_OK) { |
401 | 0 | auto msg = fmt::format("failed to get boundaries, code={}", code); |
402 | 0 | return; |
403 | 0 | } |
404 | | |
405 | 0 | std::unordered_map<std::string, size_t> partition_count; |
406 | 0 | get_kv_range_boundaries_count(partition_boundaries, partition_count); |
407 | |
|
408 | 0 | auto key_prefix_set = get_key_prefix_contants(); |
409 | 0 | std::unordered_map<std::string, int64_t> category_count; |
410 | 0 | for (auto&& [key, count] : partition_count) { |
411 | 0 | std::vector<std::string> keys; |
412 | 0 | size_t pos {}; |
413 | | // split key with '|' |
414 | 0 | do { |
415 | 0 | size_t p = std::min(key.size(), key.find('|', pos)); |
416 | 0 | keys.emplace_back(key.substr(pos, p - pos)); |
417 | 0 | pos = p + 1; |
418 | 0 | } while (pos < key.size()); |
419 | 0 | keys.resize(3); |
420 | 0 | if (key_prefix_set.contains(keys[0])) { |
421 | 0 | category_count[keys[0]] += count; |
422 | 0 | g_bvar_fdb_kv_ranges_count.put({keys[0], keys[1], keys[2]}, count); |
423 | 0 | } else { |
424 | 0 | LOG(WARNING) << fmt::format("Unknow meta range type: {}", keys[0]); |
425 | 0 | continue; |
426 | 0 | } |
427 | 0 | } |
428 | 0 | } |
429 | | |
430 | 19 | void FdbMetricExporter::export_fdb_metrics(TxnKv* txn_kv) { |
431 | 19 | int64_t busyness = 0; |
432 | 19 | std::string fdb_status = get_fdb_status(txn_kv); |
433 | 19 | export_fdb_status_details(fdb_status); |
434 | 19 | export_fdb_kv_ranges_details(txn_kv); |
435 | 19 | if (auto* kv = dynamic_cast<FdbTxnKv*>(txn_kv); kv != nullptr) { |
436 | 0 | busyness = static_cast<int64_t>(kv->get_client_thread_busyness() * 100); |
437 | 0 | g_bvar_fdb_client_thread_busyness_percent.set_value(busyness); |
438 | 0 | } |
439 | 19 | LOG(INFO) << "finish to collect fdb metric, client busyness: " << busyness << "%"; |
440 | 19 | } |
441 | | |
442 | 7 | FdbMetricExporter::~FdbMetricExporter() { |
443 | 7 | stop(); |
444 | 7 | } |
445 | | |
446 | 6 | int FdbMetricExporter::start() { |
447 | 6 | if (txn_kv_ == nullptr) return -1; |
448 | 6 | std::unique_lock lock(running_mtx_); |
449 | 6 | if (running_) { |
450 | 0 | return 0; |
451 | 0 | } |
452 | | |
453 | 6 | running_ = true; |
454 | 6 | thread_ = std::make_unique<std::thread>([this] { |
455 | 25 | while (running_.load(std::memory_order_acquire)) { |
456 | 19 | export_fdb_metrics(txn_kv_.get()); |
457 | 19 | std::unique_lock l(running_mtx_); |
458 | 19 | running_cond_.wait_for(l, std::chrono::milliseconds(sleep_interval_ms_), |
459 | 34 | [this]() { return !running_.load(std::memory_order_acquire); }); |
460 | 19 | } |
461 | 6 | }); |
462 | 6 | pthread_setname_np(thread_->native_handle(), "fdb_metrics_exporter"); |
463 | 6 | return 0; |
464 | 6 | } |
465 | | |
466 | 13 | void FdbMetricExporter::stop() { |
467 | 13 | { |
468 | 13 | std::unique_lock lock(running_mtx_); |
469 | 13 | running_.store(false); |
470 | 13 | running_cond_.notify_all(); |
471 | 13 | } |
472 | | |
473 | 13 | if (thread_ != nullptr && thread_->joinable()) { |
474 | 6 | thread_->join(); |
475 | 6 | thread_.reset(); |
476 | 6 | } |
477 | 13 | } |
478 | | |
479 | | } // namespace doris::cloud |