Coverage Report

Created: 2025-04-22 12:18

/root/doris/be/src/agent/utils.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "agent/utils.h"
19
20
// IWYU pragma: no_include <bthread/errno.h>
21
#include <errno.h> // IWYU pragma: keep
22
#include <fmt/format.h>
23
#include <gen_cpp/FrontendService.h>
24
#include <gen_cpp/HeartbeatService_types.h>
25
#include <gen_cpp/Types_types.h>
26
#include <glog/logging.h>
27
#include <rapidjson/document.h>
28
#include <rapidjson/encodings.h>
29
#include <rapidjson/rapidjson.h>
30
#include <rapidjson/stringbuffer.h>
31
#include <rapidjson/writer.h>
32
#include <stdint.h>
33
#include <stdlib.h>
34
#include <string.h>
35
#include <thrift/transport/TTransportException.h>
36
37
#include <cstdio>
38
#include <exception>
39
#include <fstream>
40
#include <memory>
41
#include <utility>
42
43
#include "common/config.h"
44
#include "common/status.h"
45
#include "runtime/client_cache.h"
46
#include "runtime/cluster_info.h"
47
48
namespace doris {
49
class TConfirmUnusedRemoteFilesRequest;
50
class TConfirmUnusedRemoteFilesResult;
51
class TFinishTaskRequest;
52
class TMasterResult;
53
class TReportRequest;
54
} // namespace doris
55
56
using apache::thrift::transport::TTransportException;
57
58
namespace doris {
59
60
static std::unique_ptr<MasterServerClient> s_client;
61
62
2
MasterServerClient* MasterServerClient::create(const ClusterInfo* cluster_info) {
63
2
    s_client.reset(new MasterServerClient(cluster_info));
64
2
    return s_client.get();
65
2
}
66
67
23.0k
MasterServerClient* MasterServerClient::instance() {
68
23.0k
    return s_client.get();
69
23.0k
}
70
71
MasterServerClient::MasterServerClient(const ClusterInfo* cluster_info)
72
        : _cluster_info(cluster_info),
73
          _client_cache(std::make_unique<FrontendServiceClientCache>(
74
2
                  config::max_master_fe_client_cache_size)) {
75
2
    _client_cache->init_metrics("master_fe");
76
2
}
77
78
23.0k
Status MasterServerClient::finish_task(const TFinishTaskRequest& request, TMasterResult* result) {
79
23.0k
    Status client_status;
80
23.0k
    FrontendServiceConnection client(_client_cache.get(), _cluster_info->master_fe_addr,
81
23.0k
                                     config::thrift_rpc_timeout_ms, &client_status);
82
83
23.0k
    if (!client_status.ok()) {
84
0
        LOG(WARNING) << "fail to get master client from cache. "
85
0
                     << "host=" << _cluster_info->master_fe_addr.hostname
86
0
                     << ", port=" << _cluster_info->master_fe_addr.port
87
0
                     << ", code=" << client_status.code();
88
0
        return Status::InternalError("Failed to get master client");
89
0
    }
90
91
23.0k
    try {
92
23.0k
        try {
93
23.0k
            client->finishTask(*result, request);
94
23.0k
        } catch ([[maybe_unused]] TTransportException& e) {
95
#ifndef ADDRESS_SANITIZER
96
            LOG(WARNING) << "master client, retry finishTask: " << e.what();
97
#endif
98
0
            client_status = client.reopen(config::thrift_rpc_timeout_ms);
99
0
            if (!client_status.ok()) {
100
#ifndef ADDRESS_SANITIZER
101
                LOG(WARNING) << "fail to get master client from cache. "
102
                             << "host=" << _cluster_info->master_fe_addr.hostname
103
                             << ", port=" << _cluster_info->master_fe_addr.port
104
                             << ", code=" << client_status.code();
105
#endif
106
0
                return Status::RpcError("Master client finish task failed");
107
0
            }
108
0
            client->finishTask(*result, request);
109
0
        }
110
23.0k
    } catch (std::exception& e) {
111
0
        RETURN_IF_ERROR(client.reopen(config::thrift_rpc_timeout_ms));
112
0
        LOG(WARNING) << "fail to finish_task. "
113
0
                     << "host=" << _cluster_info->master_fe_addr.hostname
114
0
                     << ", port=" << _cluster_info->master_fe_addr.port << ", error=" << e.what();
115
0
        return Status::InternalError("Fail to finish task");
116
0
    }
117
118
23.0k
    return Status::OK();
119
23.0k
}
120
121
57
Status MasterServerClient::report(const TReportRequest& request, TMasterResult* result) {
122
57
    Status client_status;
123
57
    FrontendServiceConnection client(_client_cache.get(), _cluster_info->master_fe_addr,
124
57
                                     config::thrift_rpc_timeout_ms, &client_status);
125
126
57
    if (!client_status.ok()) {
127
0
        LOG(WARNING) << "fail to get master client from cache. "
128
0
                     << "host=" << _cluster_info->master_fe_addr.hostname
129
0
                     << ", port=" << _cluster_info->master_fe_addr.port
130
0
                     << ", code=" << client_status;
131
0
        return Status::InternalError("Fail to get master client from cache");
132
0
    }
133
134
57
    try {
135
57
        try {
136
57
            client->report(*result, request);
137
57
        } catch (TTransportException& e) {
138
3
            TTransportException::TTransportExceptionType type = e.getType();
139
3
            if (type != TTransportException::TTransportExceptionType::TIMED_OUT) {
140
#ifndef ADDRESS_SANITIZER
141
                // if not TIMED_OUT, retry
142
                LOG(WARNING) << "master client, retry finishTask: " << e.what();
143
#endif
144
145
3
                client_status = client.reopen(config::thrift_rpc_timeout_ms);
146
3
                if (!client_status.ok()) {
147
#ifndef ADDRESS_SANITIZER
148
                    LOG(WARNING) << "fail to get master client from cache. "
149
                                 << "host=" << _cluster_info->master_fe_addr.hostname
150
                                 << ", port=" << _cluster_info->master_fe_addr.port
151
                                 << ", code=" << client_status.code();
152
#endif
153
3
                    return Status::InternalError("Fail to get master client from cache");
154
3
                }
155
156
0
                client->report(*result, request);
157
0
            } else {
158
                // TIMED_OUT exception. do not retry
159
                // actually we don't care what FE returns.
160
#ifndef ADDRESS_SANITIZER
161
                LOG(WARNING) << "fail to report to master: " << e.what();
162
#endif
163
0
                return Status::InternalError("Fail to report to master");
164
0
            }
165
3
        }
166
57
    } catch (std::exception& e) {
167
0
        RETURN_IF_ERROR(client.reopen(config::thrift_rpc_timeout_ms));
168
0
        LOG(WARNING) << "fail to report to master. "
169
0
                     << "host=" << _cluster_info->master_fe_addr.hostname
170
0
                     << ", port=" << _cluster_info->master_fe_addr.port
171
0
                     << ", code=" << client_status.code() << ", reason=" << e.what();
172
0
        return Status::InternalError("Fail to report to master");
173
0
    }
174
175
54
    return Status::OK();
176
57
}
177
178
Status MasterServerClient::confirm_unused_remote_files(
179
0
        const TConfirmUnusedRemoteFilesRequest& request, TConfirmUnusedRemoteFilesResult* result) {
180
0
    Status client_status;
181
0
    FrontendServiceConnection client(_client_cache.get(), _cluster_info->master_fe_addr,
182
0
                                     config::thrift_rpc_timeout_ms, &client_status);
183
184
0
    if (!client_status.ok()) {
185
0
        return Status::InternalError(
186
0
                "fail to get master client from cache. host={}, port={}, code={}",
187
0
                _cluster_info->master_fe_addr.hostname, _cluster_info->master_fe_addr.port,
188
0
                client_status.code());
189
0
    }
190
0
    try {
191
0
        try {
192
0
            client->confirmUnusedRemoteFiles(*result, request);
193
0
        } catch (TTransportException& e) {
194
0
            TTransportException::TTransportExceptionType type = e.getType();
195
0
            if (type != TTransportException::TTransportExceptionType::TIMED_OUT) {
196
#ifndef ADDRESS_SANITIZER
197
                // if not TIMED_OUT, retry
198
                LOG(WARNING) << "master client, retry finishTask: " << e.what();
199
#endif
200
201
0
                client_status = client.reopen(config::thrift_rpc_timeout_ms);
202
0
                if (!client_status.ok()) {
203
0
                    return Status::InternalError(
204
0
                            "fail to get master client from cache. host={}, port={}, code={}",
205
0
                            _cluster_info->master_fe_addr.hostname,
206
0
                            _cluster_info->master_fe_addr.port, client_status.code());
207
0
                }
208
209
0
                client->confirmUnusedRemoteFiles(*result, request);
210
0
            } else {
211
                // TIMED_OUT exception. do not retry
212
                // actually we don't care what FE returns.
213
0
                return Status::InternalError(
214
0
                        "fail to confirm unused remote files. host={}, port={}, code={}, reason={}",
215
0
                        _cluster_info->master_fe_addr.hostname, _cluster_info->master_fe_addr.port,
216
0
                        client_status.code(), e.what());
217
0
            }
218
0
        }
219
0
    } catch (std::exception& e) {
220
0
        RETURN_IF_ERROR(client.reopen(config::thrift_rpc_timeout_ms));
221
0
        return Status::InternalError(
222
0
                "fail to confirm unused remote files. host={}, port={}, code={}, reason={}",
223
0
                _cluster_info->master_fe_addr.hostname, _cluster_info->master_fe_addr.port,
224
0
                client_status.code(), e.what());
225
0
    }
226
227
0
    return Status::OK();
228
0
}
229
230
0
bool AgentUtils::exec_cmd(const std::string& command, std::string* errmsg, bool redirect_stderr) {
231
    // The exit status of the command.
232
0
    uint32_t rc = 0;
233
234
    // Redirect stderr to stdout to get error message.
235
0
    std::string cmd = command;
236
0
    if (redirect_stderr) {
237
0
        cmd += " 2>&1";
238
0
    }
239
240
    // Execute command.
241
0
    FILE* fp = popen(cmd.c_str(), "r");
242
0
    if (fp == nullptr) {
243
0
        *errmsg = fmt::format("popen failed. {}, with errno: {}.\n", strerror(errno), errno);
244
0
        return false;
245
0
    }
246
247
    // Get command output.
248
0
    char result[1024] = {'\0'};
249
0
    while (fgets(result, sizeof(result), fp) != nullptr) {
250
0
        *errmsg += result;
251
0
    }
252
253
    // Waits for the associated process to terminate and returns.
254
0
    rc = pclose(fp);
255
0
    if (rc == -1) {
256
0
        if (errno == ECHILD) {
257
0
            *errmsg += "pclose cannot obtain the child status.\n";
258
0
        } else {
259
0
            *errmsg += fmt::format("Close popen failed. {}, with errno: {}.\n", strerror(errno),
260
0
                                   errno);
261
0
        }
262
0
        return false;
263
0
    }
264
265
    // Get return code of command.
266
0
    int32_t status_child = WEXITSTATUS(rc);
267
0
    if (status_child == 0) {
268
0
        return true;
269
0
    } else {
270
0
        return false;
271
0
    }
272
0
}
273
274
bool AgentUtils::write_json_to_file(const std::map<std::string, std::string>& info,
275
0
                                    const std::string& path) {
276
0
    rapidjson::Document json_info(rapidjson::kObjectType);
277
0
    for (auto& it : info) {
278
0
        json_info.AddMember(rapidjson::Value(it.first.c_str(), json_info.GetAllocator()).Move(),
279
0
                            rapidjson::Value(it.second.c_str(), json_info.GetAllocator()).Move(),
280
0
                            json_info.GetAllocator());
281
0
    }
282
0
    rapidjson::StringBuffer json_info_str;
283
0
    rapidjson::Writer<rapidjson::StringBuffer> writer(json_info_str);
284
0
    json_info.Accept(writer);
285
0
    std::ofstream fp(path);
286
0
    if (!fp) {
287
0
        return false;
288
0
    }
289
0
    fp << json_info_str.GetString() << std::endl;
290
0
    fp.close();
291
292
0
    return true;
293
0
}
294
295
} // namespace doris