Coverage Report

Created: 2025-07-23 15:33

/root/doris/cloud/src/common/metric.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "metric.h"
19
20
#include <glog/logging.h>
21
#include <rapidjson/document.h>
22
#include <rapidjson/encodings.h>
23
#include <rapidjson/error/en.h>
24
25
#include <cstdint>
26
#include <memory>
27
#include <optional>
28
#include <set>
29
#include <string>
30
#include <string_view>
31
#include <unordered_map>
32
#include <vector>
33
34
#include "common/bvars.h"
35
#include "common/logging.h"
36
#include "meta-store/keys.h"
37
#include "meta-store/txn_kv.h"
38
#include "meta-store/txn_kv_error.h"
39
40
namespace doris::cloud {
41
extern std::set<std::string> get_key_prefix_contants();
42
43
// The format of the output is shown in "test/fdb_metric_example.json"
44
static const std::string FDB_STATUS_KEY = "\xff\xff/status/json";
45
46
18
static std::string get_fdb_status(TxnKv* txn_kv) {
47
18
    std::unique_ptr<Transaction> txn;
48
18
    TxnErrorCode err = txn_kv->create_txn(&txn);
49
18
    if (err != TxnErrorCode::TXN_OK) {
50
0
        LOG(WARNING) << "failed to create_txn, err=" << err;
51
0
        return "";
52
0
    }
53
18
    std::string status_val;
54
18
    err = txn->get(FDB_STATUS_KEY, &status_val);
55
18
    if (err != TxnErrorCode::TXN_OK) {
56
11
        LOG(WARNING) << "failed to get FDB_STATUS_KEY, err=" << err;
57
11
        return "";
58
11
    }
59
7
    return status_val;
60
18
}
61
62
// The format of fdb status details:
63
//
64
// Configuration:
65
//   Redundancy mode        - double
66
//   Storage engine         - ssd-2
67
//   Coordinators           - 3
68
//   Usable Regions         - 1
69
70
// Cluster:
71
//   FoundationDB processes - 15
72
//   Zones                  - 3
73
//   Machines               - 3
74
//   Memory availability    - 2.9 GB per process on machine with least available
75
//                            >>>>> (WARNING: 4.0 GB recommended) <<<<<
76
//   Retransmissions rate   - 3 Hz
77
//   Fault Tolerance        - 1 machines
78
//   Server time            - 02/16/23 16:48:14
79
80
// Data:
81
//   Replication health     - Healthy
82
//   Moving data            - 0.000 GB
83
//   Sum of key-value sizes - 4.317 GB
84
//   Disk space used        - 11.493 GB
85
86
// Operating space:
87
//   Storage server         - 462.8 GB free on most full server
88
//   Log server             - 462.8 GB free on most full server
89
90
// Workload:
91
//   Read rate              - 84 Hz
92
//   Write rate             - 4 Hz
93
//   Transactions started   - 222 Hz
94
//   Transactions committed - 4 Hz
95
//   Conflict rate          - 0 Hz
96
97
// Backup and DR:
98
//   Running backups        - 0
99
//   Running DRs            - 0
100
101
18
static void export_fdb_status_details(const std::string& status_str) {
102
18
    using namespace rapidjson;
103
18
    Document document;
104
18
    try {
105
18
        document.Parse(status_str.c_str());
106
18
        if (document.HasParseError()) {
107
11
            LOG(WARNING) << "fail to parse status str, err: "
108
11
                         << GetParseError_En(document.GetParseError());
109
11
            return;
110
11
        }
111
18
    } catch (std::exception& e) {
112
0
        LOG(WARNING) << "fail to parse status str, err: " << e.what();
113
0
        return;
114
0
    }
115
116
7
    if (!document.HasMember("cluster") || !document.HasMember("client")) {
117
2
        LOG(WARNING) << "err fdb status details";
118
2
        return;
119
2
    }
120
155
    auto get_value = [&](const std::vector<const char*>& v) -> int64_t {
121
155
        if (v.empty()) return BVAR_FDB_INVALID_VALUE;
122
155
        auto node = document.FindMember("cluster");
123
415
        for (const auto& name : v) {
124
415
            if (!node->value.HasMember(name)) return BVAR_FDB_INVALID_VALUE;
125
413
            node = node->value.FindMember(name);
126
413
        }
127
153
        if (node->value.IsInt64()) return node->value.GetInt64();
128
43
        if (node->value.IsDouble()) return static_cast<int64_t>(node->value.GetDouble());
129
13
        if (node->value.IsObject()) return node->value.MemberCount();
130
5
        if (node->value.IsArray()) return node->value.Size();
131
0
        return BVAR_FDB_INVALID_VALUE;
132
5
    };
133
25
    auto get_nanoseconds = [&](const std::vector<const char*>& v) -> int64_t {
134
25
        constexpr double NANOSECONDS = 1e9;
135
25
        auto node = document.FindMember("cluster");
136
60
        for (const auto& name : v) {
137
60
            if (!node->value.HasMember(name)) return BVAR_FDB_INVALID_VALUE;
138
60
            node = node->value.FindMember(name);
139
60
        }
140
25
        if (node->value.IsInt64()) return node->value.GetInt64() * NANOSECONDS;
141
20
        DCHECK(node->value.IsDouble());
142
20
        return static_cast<int64_t>(node->value.GetDouble() * NANOSECONDS);
143
25
    };
144
15
    auto get_process_metric = [&](std::string component) {
145
15
        auto node = document.FindMember("cluster");
146
15
        if (!node->value.HasMember("processes")) return;
147
15
        node = node->value.FindMember("processes");
148
        // process
149
240
        for (auto process_node = node->value.MemberBegin(); process_node != node->value.MemberEnd();
150
225
             process_node++) {
151
225
            const char* process_id = process_node->name.GetString();
152
225
            decltype(process_node) component_node;
153
            // get component iter
154
225
            if (!process_node->value.HasMember(component.data())) return;
155
225
            component_node = process_node->value.FindMember(component.data());
156
            // There are three cases here: int64, double, and object.
157
            // If it is double or int64, put it directly into the bvar.
158
            // If it is an object, recursively obtain the full name and corresponding value.
159
            // such as: {"disk": {"reads": {"counter": 123, "hz": 0}}}
160
            // component is "disk", the names of these two values should be "reads_counter" and "reads_hz"
161
225
            auto recursive_name_helper = [](std::string& origin_name,
162
450
                                            const char* next_level_name) -> std::string {
163
450
                return origin_name + '_' + next_level_name;
164
450
            };
165
            // proved two type lambda func to handle object and other type
166
167
            // set_bvar_value is responsible for setting integer and float values to the corresponding bvar.
168
225
            auto set_bvar_value = [&process_id, &component](
169
225
                                          std::string& name,
170
1.12k
                                          decltype(process_node)& temp_node) -> void {
171
1.12k
                if (temp_node->value.IsInt64()) {
172
900
                    g_bvar_fdb_process_status_int.put({process_id, component, name},
173
900
                                                      temp_node->value.GetInt64());
174
900
                    return;
175
900
                }
176
225
                if (temp_node->value.IsDouble()) {
177
225
                    g_bvar_fdb_process_status_float.put({process_id, component, name},
178
225
                                                        temp_node->value.GetDouble());
179
225
                    return;
180
225
                }
181
0
                LOG(WARNING) << fmt::format(
182
0
                        "Get process metrics set_bvar_value input a wrong type node {}", name);
183
0
            };
184
225
            auto object_recursive = [&set_bvar_value, &recursive_name_helper](
185
225
                                            auto&& self, std::string name,
186
1.27k
                                            decltype(process_node) temp_node) -> void {
187
                // if the node is an object, then get Member(iter) and recursive with iter as arg
188
1.27k
                if (temp_node->value.IsObject()) {
189
150
                    for (auto iter = temp_node->value.MemberBegin();
190
600
                         iter != temp_node->value.MemberEnd(); iter++) {
191
450
                        self(self, recursive_name_helper(name, iter->name.GetString()), iter);
192
450
                    }
193
150
                    return;
194
150
                }
195
                // if not object, set bvar value
196
1.12k
                set_bvar_value(name, temp_node);
197
1.12k
            };
198
            // Note that the parameter passed to set_bvar_value here is the current node, not its Member
199
            // so we can directly call object_recursive in the loop
200
225
            for (auto metric_node = component_node->value.MemberBegin();
201
1.05k
                 metric_node != component_node->value.MemberEnd(); metric_node++) {
202
825
                object_recursive(object_recursive, metric_node->name.GetString(), metric_node);
203
825
            }
204
225
        }
205
15
    };
206
    // Configuration
207
5
    g_bvar_fdb_configuration_coordinators_count.set_value(
208
5
            get_value({"configuration", "coordinators_count"}));
209
5
    g_bvar_fdb_configuration_usable_regions.set_value(
210
5
            get_value({"configuration", "usable_regions"}));
211
212
    // Cluster
213
5
    g_bvar_fdb_process_count.set_value(get_value({"processes"}));
214
5
    g_bvar_fdb_machines_count.set_value(get_value({"machines"}));
215
5
    g_bvar_fdb_fault_tolerance_count.set_value(
216
5
            get_value({"fault_tolerance", "max_zone_failures_without_losing_data"}));
217
5
    g_bvar_fdb_generation.set_value(get_value({"generation"}));
218
5
    g_bvar_fdb_incompatible_connections.set_value(get_value({"incompatible_connections"}));
219
220
    // Data/Operating space
221
5
    g_bvar_fdb_data_average_partition_size_bytes.set_value(
222
5
            get_value({"data", "average_partition_size_bytes"}));
223
5
    g_bvar_fdb_data_partition_count.set_value(get_value({"data", "partitions_count"}));
224
5
    g_bvar_fdb_data_total_disk_used_bytes.set_value(get_value({"data", "total_disk_used_bytes"}));
225
5
    g_bvar_fdb_data_total_kv_size_bytes.set_value(get_value({"data", "total_kv_size_bytes"}));
226
5
    g_bvar_fdb_data_log_server_space_bytes.set_value(
227
5
            get_value({"data", "least_operating_space_bytes_log_server"}));
228
5
    g_bvar_fdb_data_storage_server_space_bytes.set_value(
229
5
            get_value({"data", "least_operating_space_bytes_storage_server"}));
230
5
    g_bvar_fdb_data_moving_data_highest_priority.set_value(
231
5
            get_value({"data", "moving_data", "highest_priority"}));
232
5
    g_bvar_fdb_data_moving_data_in_flight_bytes.set_value(
233
5
            get_value({"data", "moving_data", "in_flight_bytes"}));
234
5
    g_bvar_fdb_data_moving_data_in_queue_bytes.set_value(
235
5
            get_value({"data", "moving_data", "in_queue_bytes"}));
236
5
    g_bvar_fdb_data_moving_total_written_bytes.set_value(
237
5
            get_value({"data", "moving_data", "total_written_bytes"}));
238
5
    g_bvar_fdb_data_state_min_replicas_remaining.set_value(
239
5
            get_value({"data", "state", "min_replicas_remaining"}));
240
241
    // Latency probe
242
5
    g_bvar_fdb_latency_probe_transaction_start_ns.set_value(
243
5
            get_nanoseconds({"latency_probe", "transaction_start_seconds"}));
244
5
    g_bvar_fdb_latency_probe_commit_ns.set_value(
245
5
            get_nanoseconds({"latency_probe", "commit_seconds"}));
246
5
    g_bvar_fdb_latency_probe_read_ns.set_value(get_nanoseconds({"latency_probe", "read_seconds"}));
247
248
    // Workload
249
5
    g_bvar_fdb_workload_conflict_rate_hz.set_value(
250
5
            get_value({"workload", "transactions", "conflicted", "hz"}));
251
5
    g_bvar_fdb_workload_location_rate_hz.set_value(
252
5
            get_value({"workload", "operations", "location_requests", "hz"}));
253
5
    g_bvar_fdb_workload_keys_read_hz.set_value(get_value({"workload", "keys", "read", "hz"}));
254
5
    g_bvar_fdb_workload_read_bytes_hz.set_value(get_value({"workload", "bytes", "read", "hz"}));
255
5
    g_bvar_fdb_workload_read_rate_hz.set_value(
256
5
            get_value({"workload", "operations", "reads", "hz"}));
257
5
    g_bvar_fdb_workload_written_bytes_hz.set_value(
258
5
            get_value({"workload", "bytes", "written", "hz"}));
259
5
    g_bvar_fdb_workload_write_rate_hz.set_value(
260
5
            get_value({"workload", "operations", "writes", "hz"}));
261
5
    g_bvar_fdb_workload_transactions_started_hz.set_value(
262
5
            get_value({"workload", "transactions", "started", "hz"}));
263
5
    g_bvar_fdb_workload_transactions_committed_hz.set_value(
264
5
            get_value({"workload", "transactions", "committed", "hz"}));
265
5
    g_bvar_fdb_workload_transactions_rejected_hz.set_value(
266
5
            get_value({"workload", "transactions", "rejected_for_queued_too_long", "hz"}));
267
268
    // QOS
269
5
    g_bvar_fdb_qos_worst_data_lag_storage_server_ns.set_value(
270
5
            get_nanoseconds({"qos", "worst_data_lag_storage_server", "seconds"}));
271
5
    g_bvar_fdb_qos_worst_durability_lag_storage_server_ns.set_value(
272
5
            get_nanoseconds({"qos", "worst_durability_lag_storage_server", "seconds"}));
273
5
    g_bvar_fdb_qos_worst_log_server_queue_bytes.set_value(
274
5
            get_value({"qos", "worst_queue_bytes_log_server"}));
275
5
    g_bvar_fdb_qos_worst_storage_server_queue_bytes.set_value(
276
5
            get_value({"qos", "worst_queue_bytes_storage_server"}));
277
278
    // Backup and DR
279
280
    // Client Count
281
5
    g_bvar_fdb_client_count.set_value(get_value({"clients", "count"}));
282
283
    // Coordinators Unreachable Count
284
5
    auto unreachable_count = 0;
285
5
    if (auto node = document.FindMember("client"); node->value.HasMember("coordinators")) {
286
5
        if (node = node->value.FindMember("coordinators"); node->value.HasMember("coordinators")) {
287
5
            if (node = node->value.FindMember("coordinators"); node->value.IsArray()) {
288
15
                for (const auto& c : node->value.GetArray()) {
289
15
                    if (c.HasMember("reachable") && c.FindMember("reachable")->value.IsBool() &&
290
15
                        !c.FindMember("reachable")->value.GetBool()) {
291
0
                        ++unreachable_count;
292
0
                    }
293
15
                }
294
5
                g_bvar_fdb_coordinators_unreachable_count.set_value(unreachable_count);
295
5
            }
296
5
        }
297
5
    }
298
299
    // Process Status
300
5
    get_process_metric("cpu");
301
5
    get_process_metric("disk");
302
5
    get_process_metric("memory");
303
5
}
304
305
// boundaries include the key category{meta, txn, recycle...}, instance_id and sub_category{rowset, txn_label...}
306
// encode look like
307
// 0x01 "txn" ${instance_id} "txn_label" ${db_id} ${label}
308
// 0x01 "meta" ${instance_id} "rowset" ${tablet_id} ${version}
309
// the func count same key to hashmap kv_range_count
310
// exmaple:
311
// kv_range_boundaries: meta|instance1|rowset|..., meta|instance1|rowset|..., meta|instance2|rowset|..., txn|instance1|txn_label|...
312
// kv_range_count output: <meta|instance1|rowset, 2>, <meta|instance2|rowset, 1>, <txn|instance1|txn_label, 1>
313
void get_kv_range_boundaries_count(std::vector<std::string>& kv_range_boundaries,
314
0
                                   std::unordered_map<std::string, size_t>& kv_range_count) {
315
0
    size_t prefix_size = FdbTxnKv::fdb_partition_key_prefix().size();
316
0
    for (auto&& boundary : kv_range_boundaries) {
317
0
        if (boundary.size() < prefix_size + 1 || boundary[prefix_size] != CLOUD_USER_KEY_SPACE01) {
318
0
            continue;
319
0
        }
320
321
0
        std::string_view user_key(boundary);
322
0
        user_key.remove_prefix(prefix_size + 1); // Skip the KEY_SPACE prefix.
323
0
        std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
324
0
        decode_key(&user_key, &out); // ignore any error, since the boundary key might be truncated.
325
326
0
        auto visitor = [](auto&& arg) -> std::string {
327
0
            using T = std::decay_t<decltype(arg)>;
328
0
            if constexpr (std::is_same_v<T, std::string>) {
329
0
                return arg;
330
0
            } else {
331
0
                return std::to_string(arg);
332
0
            }
333
0
        };
Unexecuted instantiation: metric.cpp:_ZZN5doris5cloud29get_kv_range_boundaries_countERSt6vectorINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESaIS7_EERSt13unordered_mapIS7_mSt4hashIS7_ESt8equal_toIS7_ESaISt4pairIKS7_mEEEENK3$_0clIRlEES7_OT_
Unexecuted instantiation: metric.cpp:_ZZN5doris5cloud29get_kv_range_boundaries_countERSt6vectorINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEESaIS7_EERSt13unordered_mapIS7_mSt4hashIS7_ESt8equal_toIS7_ESaISt4pairIKS7_mEEEENK3$_0clIRS7_EES7_OT_
334
335
0
        if (!out.empty()) {
336
0
            std::string key;
337
            // whatever the boundary's category have similar encode part:
338
            // category, instance_id, sub_category
339
            // we can distinguish boundary using the three parts
340
            // some boundaries do not contain all three parts, so restrictions based on size are also necessary
341
0
            for (size_t i = 0; i < 3 && i < out.size(); ++i) {
342
0
                key += std::visit(visitor, std::get<0>(out[i])) + '|';
343
0
            }
344
0
            key.pop_back();
345
0
            kv_range_count[key]++;
346
0
        }
347
0
    }
348
0
}
349
350
18
static void export_fdb_kv_ranges_details(TxnKv* kv) {
351
18
    auto* txn_kv = dynamic_cast<FdbTxnKv*>(kv);
352
18
    if (!txn_kv) {
353
18
        LOG(WARNING) << "this method only support fdb txn kv";
354
18
        return;
355
18
    }
356
357
0
    std::vector<std::string> partition_boundaries;
358
0
    TxnErrorCode code = txn_kv->get_partition_boundaries(&partition_boundaries);
359
0
    if (code != TxnErrorCode::TXN_OK) {
360
0
        auto msg = fmt::format("failed to get boundaries, code={}", code);
361
0
        return;
362
0
    }
363
364
0
    std::unordered_map<std::string, size_t> partition_count;
365
0
    get_kv_range_boundaries_count(partition_boundaries, partition_count);
366
367
0
    auto key_prefix_set = get_key_prefix_contants();
368
0
    std::unordered_map<std::string, int64_t> category_count;
369
0
    for (auto&& [key, count] : partition_count) {
370
0
        std::vector<std::string> keys;
371
0
        size_t pos {};
372
        // split key with '|'
373
0
        do {
374
0
            size_t p = std::min(key.size(), key.find('|', pos));
375
0
            keys.emplace_back(key.substr(pos, p - pos));
376
0
            pos = p + 1;
377
0
        } while (pos < key.size());
378
0
        keys.resize(3);
379
0
        if (key_prefix_set.contains(keys[0])) {
380
0
            category_count[keys[0]] += count;
381
0
            g_bvar_fdb_kv_ranges_count.put({keys[0], keys[1], keys[2]}, count);
382
0
        } else {
383
0
            LOG(WARNING) << fmt::format("Unknow meta range type: {}", keys[0]);
384
0
            continue;
385
0
        }
386
0
    }
387
0
}
388
389
18
void FdbMetricExporter::export_fdb_metrics(TxnKv* txn_kv) {
390
18
    int64_t busyness = 0;
391
18
    std::string fdb_status = get_fdb_status(txn_kv);
392
18
    export_fdb_status_details(fdb_status);
393
18
    export_fdb_kv_ranges_details(txn_kv);
394
18
    if (auto* kv = dynamic_cast<FdbTxnKv*>(txn_kv); kv != nullptr) {
395
0
        busyness = static_cast<int64_t>(kv->get_client_thread_busyness() * 100);
396
0
        g_bvar_fdb_client_thread_busyness_percent.set_value(busyness);
397
0
    }
398
18
    LOG(INFO) << "finish to collect fdb metric, client busyness: " << busyness << "%";
399
18
}
400
401
8
FdbMetricExporter::~FdbMetricExporter() {
402
8
    stop();
403
8
}
404
405
7
int FdbMetricExporter::start() {
406
7
    if (txn_kv_ == nullptr) return -1;
407
7
    std::unique_lock lock(running_mtx_);
408
7
    if (running_) {
409
0
        return 0;
410
0
    }
411
412
7
    running_ = true;
413
7
    thread_ = std::make_unique<std::thread>([this] {
414
25
        while (running_.load(std::memory_order_acquire)) {
415
18
            export_fdb_metrics(txn_kv_.get());
416
18
            std::unique_lock l(running_mtx_);
417
18
            running_cond_.wait_for(l, std::chrono::milliseconds(sleep_interval_ms_),
418
31
                                   [this]() { return !running_.load(std::memory_order_acquire); });
419
18
        }
420
7
    });
421
7
    pthread_setname_np(thread_->native_handle(), "fdb_metrics_exporter");
422
7
    return 0;
423
7
}
424
425
14
void FdbMetricExporter::stop() {
426
14
    {
427
14
        std::unique_lock lock(running_mtx_);
428
14
        running_.store(false);
429
14
        running_cond_.notify_all();
430
14
    }
431
432
14
    if (thread_ != nullptr && thread_->joinable()) {
433
7
        thread_->join();
434
7
        thread_.reset();
435
7
    }
436
14
}
437
438
} // namespace doris::cloud