Coverage Report

Created: 2025-05-20 16:26

/root/doris/cloud/src/common/metric.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "metric.h"
19
20
#include <rapidjson/document.h>
21
#include <rapidjson/encodings.h>
22
#include <rapidjson/error/en.h>
23
24
#include <memory>
25
#include <optional>
26
#include <string>
27
#include <string_view>
28
#include <vector>
29
30
#include "common/bvars.h"
31
#include "meta-service/txn_kv.h"
32
#include "meta-service/txn_kv_error.h"
33
34
namespace doris::cloud {
35
36
// The format of the output is shown in "test/fdb_metric_example.json"
37
static const std::string FDB_STATUS_KEY = "\xff\xff/status/json";
38
39
18
static std::string get_fdb_status(TxnKv* txn_kv) {
40
18
    std::unique_ptr<Transaction> txn;
41
18
    TxnErrorCode err = txn_kv->create_txn(&txn);
42
18
    if (err != TxnErrorCode::TXN_OK) {
43
0
        LOG(WARNING) << "failed to create_txn, err=" << err;
44
0
        return "";
45
0
    }
46
18
    std::string status_val;
47
18
    err = txn->get(FDB_STATUS_KEY, &status_val);
48
18
    if (err != TxnErrorCode::TXN_OK) {
49
11
        LOG(WARNING) << "failed to get FDB_STATUS_KEY, err=" << err;
50
11
        return "";
51
11
    }
52
7
    return status_val;
53
18
}
54
55
// The format of fdb status details:
56
//
57
// Configuration:
58
//   Redundancy mode        - double
59
//   Storage engine         - ssd-2
60
//   Coordinators           - 3
61
//   Usable Regions         - 1
62
63
// Cluster:
64
//   FoundationDB processes - 15
65
//   Zones                  - 3
66
//   Machines               - 3
67
//   Memory availability    - 2.9 GB per process on machine with least available
68
//                            >>>>> (WARNING: 4.0 GB recommended) <<<<<
69
//   Retransmissions rate   - 3 Hz
70
//   Fault Tolerance        - 1 machines
71
//   Server time            - 02/16/23 16:48:14
72
73
// Data:
74
//   Replication health     - Healthy
75
//   Moving data            - 0.000 GB
76
//   Sum of key-value sizes - 4.317 GB
77
//   Disk space used        - 11.493 GB
78
79
// Operating space:
80
//   Storage server         - 462.8 GB free on most full server
81
//   Log server             - 462.8 GB free on most full server
82
83
// Workload:
84
//   Read rate              - 84 Hz
85
//   Write rate             - 4 Hz
86
//   Transactions started   - 222 Hz
87
//   Transactions committed - 4 Hz
88
//   Conflict rate          - 0 Hz
89
90
// Backup and DR:
91
//   Running backups        - 0
92
//   Running DRs            - 0
93
94
18
static void export_fdb_status_details(const std::string& status_str) {
95
18
    using namespace rapidjson;
96
18
    Document document;
97
18
    try {
98
18
        document.Parse(status_str.c_str());
99
18
        if (document.HasParseError()) {
100
11
            LOG(WARNING) << "fail to parse status str, err: "
101
11
                         << GetParseError_En(document.GetParseError());
102
11
            return;
103
11
        }
104
18
    } catch (std::exception& e) {
105
0
        LOG(WARNING) << "fail to parse status str, err: " << e.what();
106
0
        return;
107
0
    }
108
109
7
    if (!document.HasMember("cluster") || !document.HasMember("client")) {
110
2
        LOG(WARNING) << "err fdb status details";
111
2
        return;
112
2
    }
113
155
    auto get_value = [&](const std::vector<const char*>& v) -> int64_t {
114
155
        if (v.empty()) return BVAR_FDB_INVALID_VALUE;
115
155
        auto node = document.FindMember("cluster");
116
415
        for (const auto& name : v) {
117
415
            if (!node->value.HasMember(name)) return BVAR_FDB_INVALID_VALUE;
118
413
            node = node->value.FindMember(name);
119
413
        }
120
153
        if (node->value.IsInt64()) return node->value.GetInt64();
121
43
        if (node->value.IsDouble()) return static_cast<int64_t>(node->value.GetDouble());
122
13
        if (node->value.IsObject()) return node->value.MemberCount();
123
5
        if (node->value.IsArray()) return node->value.Size();
124
0
        return BVAR_FDB_INVALID_VALUE;
125
5
    };
126
25
    auto get_nanoseconds = [&](const std::vector<const char*>& v) -> int64_t {
127
25
        constexpr double NANOSECONDS = 1e9;
128
25
        auto node = document.FindMember("cluster");
129
60
        for (const auto& name : v) {
130
60
            if (!node->value.HasMember(name)) return BVAR_FDB_INVALID_VALUE;
131
60
            node = node->value.FindMember(name);
132
60
        }
133
25
        if (node->value.IsInt64()) return node->value.GetInt64() * NANOSECONDS;
134
20
        DCHECK(node->value.IsDouble());
135
20
        return static_cast<int64_t>(node->value.GetDouble() * NANOSECONDS);
136
25
    };
137
    // Configuration
138
5
    g_bvar_fdb_configuration_coordinators_count.set_value(
139
5
            get_value({"configuration", "coordinators_count"}));
140
5
    g_bvar_fdb_configuration_usable_regions.set_value(
141
5
            get_value({"configuration", "usable_regions"}));
142
143
    // Cluster
144
5
    g_bvar_fdb_process_count.set_value(get_value({"processes"}));
145
5
    g_bvar_fdb_machines_count.set_value(get_value({"machines"}));
146
5
    g_bvar_fdb_fault_tolerance_count.set_value(
147
5
            get_value({"fault_tolerance", "max_zone_failures_without_losing_data"}));
148
5
    g_bvar_fdb_generation.set_value(get_value({"generation"}));
149
5
    g_bvar_fdb_incompatible_connections.set_value(get_value({"incompatible_connections"}));
150
151
    // Data/Operating space
152
5
    g_bvar_fdb_data_average_partition_size_bytes.set_value(
153
5
            get_value({"data", "average_partition_size_bytes"}));
154
5
    g_bvar_fdb_data_partition_count.set_value(get_value({"data", "partitions_count"}));
155
5
    g_bvar_fdb_data_total_disk_used_bytes.set_value(get_value({"data", "total_disk_used_bytes"}));
156
5
    g_bvar_fdb_data_total_kv_size_bytes.set_value(get_value({"data", "total_kv_size_bytes"}));
157
5
    g_bvar_fdb_data_log_server_space_bytes.set_value(
158
5
            get_value({"data", "least_operating_space_bytes_log_server"}));
159
5
    g_bvar_fdb_data_storage_server_space_bytes.set_value(
160
5
            get_value({"data", "least_operating_space_bytes_storage_server"}));
161
5
    g_bvar_fdb_data_moving_data_highest_priority.set_value(
162
5
            get_value({"data", "moving_data", "highest_priority"}));
163
5
    g_bvar_fdb_data_moving_data_in_flight_bytes.set_value(
164
5
            get_value({"data", "moving_data", "in_flight_bytes"}));
165
5
    g_bvar_fdb_data_moving_data_in_queue_bytes.set_value(
166
5
            get_value({"data", "moving_data", "in_queue_bytes"}));
167
5
    g_bvar_fdb_data_moving_total_written_bytes.set_value(
168
5
            get_value({"data", "moving_data", "total_written_bytes"}));
169
5
    g_bvar_fdb_data_state_min_replicas_remaining.set_value(
170
5
            get_value({"data", "state", "min_replicas_remaining"}));
171
172
    // Latency probe
173
5
    g_bvar_fdb_latency_probe_transaction_start_ns.set_value(
174
5
            get_nanoseconds({"latency_probe", "transaction_start_seconds"}));
175
5
    g_bvar_fdb_latency_probe_commit_ns.set_value(
176
5
            get_nanoseconds({"latency_probe", "commit_seconds"}));
177
5
    g_bvar_fdb_latency_probe_read_ns.set_value(get_nanoseconds({"latency_probe", "read_seconds"}));
178
179
    // Workload
180
5
    g_bvar_fdb_workload_conflict_rate_hz.set_value(
181
5
            get_value({"workload", "transactions", "conflicted", "hz"}));
182
5
    g_bvar_fdb_workload_location_rate_hz.set_value(
183
5
            get_value({"workload", "operations", "location_requests", "hz"}));
184
5
    g_bvar_fdb_workload_keys_read_hz.set_value(get_value({"workload", "keys", "read", "hz"}));
185
5
    g_bvar_fdb_workload_read_bytes_hz.set_value(get_value({"workload", "bytes", "read", "hz"}));
186
5
    g_bvar_fdb_workload_read_rate_hz.set_value(
187
5
            get_value({"workload", "operations", "reads", "hz"}));
188
5
    g_bvar_fdb_workload_written_bytes_hz.set_value(
189
5
            get_value({"workload", "bytes", "written", "hz"}));
190
5
    g_bvar_fdb_workload_write_rate_hz.set_value(
191
5
            get_value({"workload", "operations", "writes", "hz"}));
192
5
    g_bvar_fdb_workload_transactions_started_hz.set_value(
193
5
            get_value({"workload", "transactions", "started", "hz"}));
194
5
    g_bvar_fdb_workload_transactions_committed_hz.set_value(
195
5
            get_value({"workload", "transactions", "committed", "hz"}));
196
5
    g_bvar_fdb_workload_transactions_rejected_hz.set_value(
197
5
            get_value({"workload", "transactions", "rejected_for_queued_too_long", "hz"}));
198
199
    // QOS
200
5
    g_bvar_fdb_qos_worst_data_lag_storage_server_ns.set_value(
201
5
            get_nanoseconds({"qos", "worst_data_lag_storage_server", "seconds"}));
202
5
    g_bvar_fdb_qos_worst_durability_lag_storage_server_ns.set_value(
203
5
            get_nanoseconds({"qos", "worst_durability_lag_storage_server", "seconds"}));
204
5
    g_bvar_fdb_qos_worst_log_server_queue_bytes.set_value(
205
5
            get_value({"qos", "worst_queue_bytes_log_server"}));
206
5
    g_bvar_fdb_qos_worst_storage_server_queue_bytes.set_value(
207
5
            get_value({"qos", "worst_queue_bytes_storage_server"}));
208
209
    // Backup and DR
210
211
    // Client Count
212
5
    g_bvar_fdb_client_count.set_value(get_value({"clients", "count"}));
213
214
    // Coordinators Unreachable Count
215
5
    auto unreachable_count = 0;
216
5
    if (auto node = document.FindMember("client"); node->value.HasMember("coordinators")) {
217
5
        if (node = node->value.FindMember("coordinators"); node->value.HasMember("coordinators")) {
218
5
            if (node = node->value.FindMember("coordinators"); node->value.IsArray()) {
219
15
                for (const auto& c : node->value.GetArray()) {
220
15
                    if (c.HasMember("reachable") && c.FindMember("reachable")->value.IsBool() &&
221
15
                        !c.FindMember("reachable")->value.GetBool()) {
222
0
                        ++unreachable_count;
223
0
                    }
224
15
                }
225
5
                g_bvar_fdb_coordinators_unreachable_count.set_value(unreachable_count);
226
5
            }
227
5
        }
228
5
    }
229
5
}
230
231
18
void FdbMetricExporter::export_fdb_metrics(TxnKv* txn_kv) {
232
18
    int64_t busyness = 0;
233
18
    std::string fdb_status = get_fdb_status(txn_kv);
234
18
    export_fdb_status_details(fdb_status);
235
18
    if (auto* kv = dynamic_cast<FdbTxnKv*>(txn_kv); kv != nullptr) {
236
0
        busyness = static_cast<int64_t>(kv->get_client_thread_busyness() * 100);
237
0
        g_bvar_fdb_client_thread_busyness_percent.set_value(busyness);
238
0
    }
239
18
    LOG(INFO) << "finish to collect fdb metric, client busyness: " << busyness << "%";
240
18
}
241
242
7
FdbMetricExporter::~FdbMetricExporter() {
243
7
    stop();
244
7
}
245
246
6
int FdbMetricExporter::start() {
247
6
    if (txn_kv_ == nullptr) return -1;
248
6
    std::unique_lock lock(running_mtx_);
249
6
    if (running_) {
250
0
        return 0;
251
0
    }
252
253
6
    running_ = true;
254
6
    thread_ = std::make_unique<std::thread>([this] {
255
24
        while (running_.load(std::memory_order_acquire)) {
256
18
            export_fdb_metrics(txn_kv_.get());
257
18
            std::unique_lock l(running_mtx_);
258
18
            running_cond_.wait_for(l, std::chrono::milliseconds(sleep_interval_ms_),
259
33
                                   [this]() { return !running_.load(std::memory_order_acquire); });
260
18
        }
261
6
    });
262
6
    pthread_setname_np(thread_->native_handle(), "fdb_metrics_exporter");
263
6
    return 0;
264
6
}
265
266
12
void FdbMetricExporter::stop() {
267
12
    {
268
12
        std::unique_lock lock(running_mtx_);
269
12
        running_.store(false);
270
12
        running_cond_.notify_all();
271
12
    }
272
273
12
    if (thread_ != nullptr && thread_->joinable()) {
274
6
        thread_->join();
275
6
        thread_.reset();
276
6
    }
277
12
}
278
279
} // namespace doris::cloud