Coverage Report

Created: 2024-11-21 11:46

/root/doris/be/src/agent/utils.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "agent/utils.h"
19
20
// IWYU pragma: no_include <bthread/errno.h>
21
#include <errno.h> // IWYU pragma: keep
22
#include <fmt/format.h>
23
#include <gen_cpp/FrontendService.h>
24
#include <gen_cpp/HeartbeatService_types.h>
25
#include <gen_cpp/Types_types.h>
26
#include <glog/logging.h>
27
#include <rapidjson/document.h>
28
#include <rapidjson/encodings.h>
29
#include <rapidjson/rapidjson.h>
30
#include <rapidjson/stringbuffer.h>
31
#include <rapidjson/writer.h>
32
#include <stdint.h>
33
#include <stdlib.h>
34
#include <string.h>
35
#include <thrift/transport/TTransportException.h>
36
37
#include <cstdio>
38
#include <exception>
39
#include <fstream>
40
#include <memory>
41
#include <utility>
42
43
#include "common/config.h"
44
#include "common/status.h"
45
#include "runtime/client_cache.h"
46
#include "runtime/cluster_info.h"
47
48
namespace doris {
49
class TConfirmUnusedRemoteFilesRequest;
50
class TConfirmUnusedRemoteFilesResult;
51
class TFinishTaskRequest;
52
class TMasterResult;
53
class TReportRequest;
54
} // namespace doris
55
56
using std::map;
57
using std::string;
58
using apache::thrift::transport::TTransportException;
59
60
namespace doris {
61
62
static FrontendServiceClientCache s_client_cache;
63
static std::unique_ptr<MasterServerClient> s_client;
64
65
0
MasterServerClient* MasterServerClient::create(const ClusterInfo* cluster_info) {
66
0
    s_client.reset(new MasterServerClient(cluster_info));
67
0
    return s_client.get();
68
0
}
69
70
0
MasterServerClient* MasterServerClient::instance() {
71
0
    return s_client.get();
72
0
}
73
74
MasterServerClient::MasterServerClient(const ClusterInfo* cluster_info)
75
0
        : _cluster_info(cluster_info) {}
76
77
0
Status MasterServerClient::finish_task(const TFinishTaskRequest& request, TMasterResult* result) {
78
0
    Status client_status;
79
0
    FrontendServiceConnection client(&s_client_cache, _cluster_info->master_fe_addr,
80
0
                                     config::thrift_rpc_timeout_ms, &client_status);
81
82
0
    if (!client_status.ok()) {
83
0
        LOG(WARNING) << "fail to get master client from cache. "
84
0
                     << "host=" << _cluster_info->master_fe_addr.hostname
85
0
                     << ", port=" << _cluster_info->master_fe_addr.port
86
0
                     << ", code=" << client_status.code();
87
0
        return Status::InternalError("Failed to get master client");
88
0
    }
89
90
0
    try {
91
0
        try {
92
0
            client->finishTask(*result, request);
93
0
        } catch ([[maybe_unused]] TTransportException& e) {
94
0
#ifdef ADDRESS_SANITIZER
95
0
            LOG(WARNING) << "master client, retry finishTask: " << e.what();
96
0
#endif
97
0
            client_status = client.reopen(config::thrift_rpc_timeout_ms);
98
0
            if (!client_status.ok()) {
99
0
#ifdef ADDRESS_SANITIZER
100
0
                LOG(WARNING) << "fail to get master client from cache. "
101
0
                             << "host=" << _cluster_info->master_fe_addr.hostname
102
0
                             << ", port=" << _cluster_info->master_fe_addr.port
103
0
                             << ", code=" << client_status.code();
104
0
#endif
105
0
                return Status::RpcError("Master client finish task failed");
106
0
            }
107
0
            client->finishTask(*result, request);
108
0
        }
109
0
    } catch (std::exception& e) {
110
0
        RETURN_IF_ERROR(client.reopen(config::thrift_rpc_timeout_ms));
111
0
        LOG(WARNING) << "fail to finish_task. "
112
0
                     << "host=" << _cluster_info->master_fe_addr.hostname
113
0
                     << ", port=" << _cluster_info->master_fe_addr.port << ", error=" << e.what();
114
0
        return Status::InternalError("Fail to finish task");
115
0
    }
116
117
0
    return Status::OK();
118
0
}
119
120
0
Status MasterServerClient::report(const TReportRequest& request, TMasterResult* result) {
121
0
    Status client_status;
122
0
    FrontendServiceConnection client(&s_client_cache, _cluster_info->master_fe_addr,
123
0
                                     config::thrift_rpc_timeout_ms, &client_status);
124
125
0
    if (!client_status.ok()) {
126
0
        LOG(WARNING) << "fail to get master client from cache. "
127
0
                     << "host=" << _cluster_info->master_fe_addr.hostname
128
0
                     << ", port=" << _cluster_info->master_fe_addr.port
129
0
                     << ", code=" << client_status;
130
0
        return Status::InternalError("Fail to get master client from cache");
131
0
    }
132
133
0
    try {
134
0
        try {
135
0
            client->report(*result, request);
136
0
        } catch (TTransportException& e) {
137
0
            TTransportException::TTransportExceptionType type = e.getType();
138
0
            if (type != TTransportException::TTransportExceptionType::TIMED_OUT) {
139
0
#ifdef ADDRESS_SANITIZER
140
                // if not TIMED_OUT, retry
141
0
                LOG(WARNING) << "master client, retry finishTask: " << e.what();
142
0
#endif
143
144
0
                client_status = client.reopen(config::thrift_rpc_timeout_ms);
145
0
                if (!client_status.ok()) {
146
0
#ifdef ADDRESS_SANITIZER
147
0
                    LOG(WARNING) << "fail to get master client from cache. "
148
0
                                 << "host=" << _cluster_info->master_fe_addr.hostname
149
0
                                 << ", port=" << _cluster_info->master_fe_addr.port
150
0
                                 << ", code=" << client_status.code();
151
0
#endif
152
0
                    return Status::InternalError("Fail to get master client from cache");
153
0
                }
154
155
0
                client->report(*result, request);
156
0
            } else {
157
                // TIMED_OUT exception. do not retry
158
                // actually we don't care what FE returns.
159
0
#ifdef ADDRESS_SANITIZER
160
0
                LOG(WARNING) << "fail to report to master: " << e.what();
161
0
#endif
162
0
                return Status::InternalError("Fail to report to master");
163
0
            }
164
0
        }
165
0
    } catch (std::exception& e) {
166
0
        RETURN_IF_ERROR(client.reopen(config::thrift_rpc_timeout_ms));
167
0
        LOG(WARNING) << "fail to report to master. "
168
0
                     << "host=" << _cluster_info->master_fe_addr.hostname
169
0
                     << ", port=" << _cluster_info->master_fe_addr.port
170
0
                     << ", code=" << client_status.code() << ", reason=" << e.what();
171
0
        return Status::InternalError("Fail to report to master");
172
0
    }
173
174
0
    return Status::OK();
175
0
}
176
177
Status MasterServerClient::confirm_unused_remote_files(
178
0
        const TConfirmUnusedRemoteFilesRequest& request, TConfirmUnusedRemoteFilesResult* result) {
179
0
    Status client_status;
180
0
    FrontendServiceConnection client(&s_client_cache, _cluster_info->master_fe_addr,
181
0
                                     config::thrift_rpc_timeout_ms, &client_status);
182
183
0
    if (!client_status.ok()) {
184
0
        return Status::InternalError(
185
0
                "fail to get master client from cache. host={}, port={}, code={}",
186
0
                _cluster_info->master_fe_addr.hostname, _cluster_info->master_fe_addr.port,
187
0
                client_status.code());
188
0
    }
189
0
    try {
190
0
        try {
191
0
            client->confirmUnusedRemoteFiles(*result, request);
192
0
        } catch (TTransportException& e) {
193
0
            TTransportException::TTransportExceptionType type = e.getType();
194
0
            if (type != TTransportException::TTransportExceptionType::TIMED_OUT) {
195
                // if not TIMED_OUT, retry
196
0
                LOG(WARNING) << "master client, retry finishTask: " << e.what();
197
198
0
                client_status = client.reopen(config::thrift_rpc_timeout_ms);
199
0
                if (!client_status.ok()) {
200
0
                    return Status::InternalError(
201
0
                            "fail to get master client from cache. host={}, port={}, code={}",
202
0
                            _cluster_info->master_fe_addr.hostname,
203
0
                            _cluster_info->master_fe_addr.port, client_status.code());
204
0
                }
205
206
0
                client->confirmUnusedRemoteFiles(*result, request);
207
0
            } else {
208
                // TIMED_OUT exception. do not retry
209
                // actually we don't care what FE returns.
210
0
                return Status::InternalError(
211
0
                        "fail to confirm unused remote files. host={}, port={}, code={}, reason={}",
212
0
                        _cluster_info->master_fe_addr.hostname, _cluster_info->master_fe_addr.port,
213
0
                        client_status.code(), e.what());
214
0
            }
215
0
        }
216
0
    } catch (std::exception& e) {
217
0
        RETURN_IF_ERROR(client.reopen(config::thrift_rpc_timeout_ms));
218
0
        return Status::InternalError(
219
0
                "fail to confirm unused remote files. host={}, port={}, code={}, reason={}",
220
0
                _cluster_info->master_fe_addr.hostname, _cluster_info->master_fe_addr.port,
221
0
                client_status.code(), e.what());
222
0
    }
223
224
0
    return Status::OK();
225
0
}
226
227
0
bool AgentUtils::exec_cmd(const string& command, string* errmsg, bool redirect_stderr) {
228
    // The exit status of the command.
229
0
    uint32_t rc = 0;
230
231
    // Redirect stderr to stdout to get error message.
232
0
    string cmd = command;
233
0
    if (redirect_stderr) {
234
0
        cmd += " 2>&1";
235
0
    }
236
237
    // Execute command.
238
0
    FILE* fp = popen(cmd.c_str(), "r");
239
0
    if (fp == nullptr) {
240
0
        *errmsg = fmt::format("popen failed. {}, with errno: {}.\n", strerror(errno), errno);
241
0
        return false;
242
0
    }
243
244
    // Get command output.
245
0
    char result[1024] = {'\0'};
246
0
    while (fgets(result, sizeof(result), fp) != nullptr) {
247
0
        *errmsg += result;
248
0
    }
249
250
    // Waits for the associated process to terminate and returns.
251
0
    rc = pclose(fp);
252
0
    if (rc == -1) {
253
0
        if (errno == ECHILD) {
254
0
            *errmsg += "pclose cannot obtain the child status.\n";
255
0
        } else {
256
0
            *errmsg += fmt::format("Close popen failed. {}, with errno: {}.\n", strerror(errno),
257
0
                                   errno);
258
0
        }
259
0
        return false;
260
0
    }
261
262
    // Get return code of command.
263
0
    int32_t status_child = WEXITSTATUS(rc);
264
0
    if (status_child == 0) {
265
0
        return true;
266
0
    } else {
267
0
        return false;
268
0
    }
269
0
}
270
271
0
bool AgentUtils::write_json_to_file(const map<string, string>& info, const string& path) {
272
0
    rapidjson::Document json_info(rapidjson::kObjectType);
273
0
    for (auto& it : info) {
274
0
        json_info.AddMember(rapidjson::Value(it.first.c_str(), json_info.GetAllocator()).Move(),
275
0
                            rapidjson::Value(it.second.c_str(), json_info.GetAllocator()).Move(),
276
0
                            json_info.GetAllocator());
277
0
    }
278
0
    rapidjson::StringBuffer json_info_str;
279
0
    rapidjson::Writer<rapidjson::StringBuffer> writer(json_info_str);
280
0
    json_info.Accept(writer);
281
0
    std::ofstream fp(path);
282
0
    if (!fp) {
283
0
        return false;
284
0
    }
285
0
    fp << json_info_str.GetString() << std::endl;
286
0
    fp.close();
287
288
0
    return true;
289
0
}
290
291
} // namespace doris