Coverage Report

Created: 2025-05-09 17:05

/root/doris/be/src/agent/utils.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "agent/utils.h"
19
20
// IWYU pragma: no_include <bthread/errno.h>
21
#include <errno.h> // IWYU pragma: keep
22
#include <fmt/format.h>
23
#include <gen_cpp/FrontendService.h>
24
#include <gen_cpp/HeartbeatService_types.h>
25
#include <gen_cpp/Types_types.h>
26
#include <glog/logging.h>
27
#include <rapidjson/document.h>
28
#include <rapidjson/encodings.h>
29
#include <rapidjson/rapidjson.h>
30
#include <rapidjson/stringbuffer.h>
31
#include <rapidjson/writer.h>
32
#include <stdint.h>
33
#include <stdlib.h>
34
#include <string.h>
35
#include <thrift/transport/TTransportException.h>
36
37
#include <cstdio>
38
#include <exception>
39
#include <fstream>
40
#include <memory>
41
#include <utility>
42
43
#include "common/config.h"
44
#include "common/status.h"
45
#include "runtime/client_cache.h"
46
47
namespace doris {
48
class TConfirmUnusedRemoteFilesRequest;
49
class TConfirmUnusedRemoteFilesResult;
50
class TFinishTaskRequest;
51
class TMasterResult;
52
class TReportRequest;
53
} // namespace doris
54
55
using std::map;
56
using std::string;
57
using apache::thrift::transport::TTransportException;
58
59
namespace doris {
60
61
static FrontendServiceClientCache s_client_cache;
62
static std::unique_ptr<MasterServerClient> s_client;
63
64
3
MasterServerClient* MasterServerClient::create(const TMasterInfo& master_info) {
65
3
    s_client.reset(new MasterServerClient(master_info));
66
3
    return s_client.get();
67
3
}
68
69
341k
MasterServerClient* MasterServerClient::instance() {
70
341k
    return s_client.get();
71
341k
}
72
73
MasterServerClient::MasterServerClient(const TMasterInfo& master_info)
74
3
        : _master_info(master_info) {}
75
76
340k
Status MasterServerClient::finish_task(const TFinishTaskRequest& request, TMasterResult* result) {
77
340k
    Status client_status;
78
340k
    FrontendServiceConnection client(&s_client_cache, _master_info.network_address,
79
340k
                                     config::thrift_rpc_timeout_ms, &client_status);
80
81
340k
    if (!client_status.ok()) {
82
0
        LOG(WARNING) << "fail to get master client from cache. "
83
0
                     << "host=" << _master_info.network_address.hostname
84
0
                     << ", port=" << _master_info.network_address.port
85
0
                     << ", code=" << client_status.code();
86
0
        return Status::InternalError("Failed to get master client");
87
0
    }
88
89
340k
    try {
90
340k
        try {
91
340k
            client->finishTask(*result, request);
92
340k
        } catch ([[maybe_unused]] TTransportException& e) {
93
#ifndef ADDRESS_SANITIZER
94
            LOG(WARNING) << "master client, retry finishTask: " << e.what();
95
#endif
96
0
            client_status = client.reopen(config::thrift_rpc_timeout_ms);
97
0
            if (!client_status.ok()) {
98
#ifndef ADDRESS_SANITIZER
99
                LOG(WARNING) << "fail to get master client from cache. "
100
                             << "host=" << _master_info.network_address.hostname
101
                             << ", port=" << _master_info.network_address.port
102
                             << ", code=" << client_status.code();
103
#endif
104
0
                return Status::InternalError("Master client finish task failed");
105
0
            }
106
0
            client->finishTask(*result, request);
107
0
        }
108
340k
    } catch (std::exception& e) {
109
0
        RETURN_IF_ERROR(client.reopen(config::thrift_rpc_timeout_ms));
110
0
        LOG(WARNING) << "fail to finish_task. "
111
0
                     << "host=" << _master_info.network_address.hostname
112
0
                     << ", port=" << _master_info.network_address.port << ", error=" << e.what();
113
0
        return Status::InternalError("Fail to finish task");
114
0
    }
115
116
340k
    return Status::OK();
117
340k
}
118
119
731
Status MasterServerClient::report(const TReportRequest& request, TMasterResult* result) {
120
731
    Status client_status;
121
731
    FrontendServiceConnection client(&s_client_cache, _master_info.network_address,
122
731
                                     config::thrift_rpc_timeout_ms, &client_status);
123
124
731
    if (!client_status.ok()) {
125
0
        LOG(WARNING) << "fail to get master client from cache. "
126
0
                     << "host=" << _master_info.network_address.hostname
127
0
                     << ", port=" << _master_info.network_address.port
128
0
                     << ", code=" << client_status;
129
0
        return Status::InternalError("Fail to get master client from cache");
130
0
    }
131
132
731
    try {
133
731
        try {
134
731
            client->report(*result, request);
135
731
        } catch (TTransportException& e) {
136
2
            TTransportException::TTransportExceptionType type = e.getType();
137
2
            if (type != TTransportException::TTransportExceptionType::TIMED_OUT) {
138
#ifndef ADDRESS_SANITIZER
139
                // if not TIMED_OUT, retry
140
                LOG(WARNING) << "master client, retry finishTask: " << e.what();
141
#endif
142
143
2
                client_status = client.reopen(config::thrift_rpc_timeout_ms);
144
2
                if (!client_status.ok()) {
145
#ifndef ADDRESS_SANITIZER
146
                    LOG(WARNING) << "fail to get master client from cache. "
147
                                 << "host=" << _master_info.network_address.hostname
148
                                 << ", port=" << _master_info.network_address.port
149
                                 << ", code=" << client_status.code();
150
#endif
151
2
                    return Status::InternalError("Fail to get master client from cache");
152
2
                }
153
154
0
                client->report(*result, request);
155
0
            } else {
156
                // TIMED_OUT exception. do not retry
157
                // actually we don't care what FE returns.
158
#ifndef ADDRESS_SANITIZER
159
                LOG(WARNING) << "fail to report to master: " << e.what();
160
#endif
161
0
                return Status::InternalError("Fail to report to master");
162
0
            }
163
2
        }
164
731
    } catch (std::exception& e) {
165
0
        RETURN_IF_ERROR(client.reopen(config::thrift_rpc_timeout_ms));
166
0
        LOG(WARNING) << "fail to report to master. "
167
0
                     << "host=" << _master_info.network_address.hostname
168
0
                     << ", port=" << _master_info.network_address.port
169
0
                     << ", code=" << client_status.code() << ", reason=" << e.what();
170
0
        return Status::InternalError("Fail to report to master");
171
0
    }
172
173
729
    return Status::OK();
174
731
}
175
176
Status MasterServerClient::confirm_unused_remote_files(
177
6
        const TConfirmUnusedRemoteFilesRequest& request, TConfirmUnusedRemoteFilesResult* result) {
178
6
    Status client_status;
179
6
    FrontendServiceConnection client(&s_client_cache, _master_info.network_address,
180
6
                                     config::thrift_rpc_timeout_ms, &client_status);
181
182
6
    if (!client_status.ok()) {
183
0
        return Status::InternalError(
184
0
                "fail to get master client from cache. host={}, port={}, code={}",
185
0
                _master_info.network_address.hostname, _master_info.network_address.port,
186
0
                client_status.code());
187
0
    }
188
6
    try {
189
6
        try {
190
6
            client->confirmUnusedRemoteFiles(*result, request);
191
6
        } catch (TTransportException& e) {
192
0
            TTransportException::TTransportExceptionType type = e.getType();
193
0
            if (type != TTransportException::TTransportExceptionType::TIMED_OUT) {
194
#ifndef ADDRESS_SANITIZER
195
                // if not TIMED_OUT, retry
196
                LOG(WARNING) << "master client, retry finishTask: " << e.what();
197
#endif
198
199
0
                client_status = client.reopen(config::thrift_rpc_timeout_ms);
200
0
                if (!client_status.ok()) {
201
0
                    return Status::InternalError(
202
0
                            "fail to get master client from cache. host={}, port={}, code={}",
203
0
                            _master_info.network_address.hostname,
204
0
                            _master_info.network_address.port, client_status.code());
205
0
                }
206
207
0
                client->confirmUnusedRemoteFiles(*result, request);
208
0
            } else {
209
                // TIMED_OUT exception. do not retry
210
                // actually we don't care what FE returns.
211
0
                return Status::InternalError(
212
0
                        "fail to confirm unused remote files. host={}, port={}, code={}, reason={}",
213
0
                        _master_info.network_address.hostname, _master_info.network_address.port,
214
0
                        client_status.code(), e.what());
215
0
            }
216
0
        }
217
6
    } catch (std::exception& e) {
218
0
        RETURN_IF_ERROR(client.reopen(config::thrift_rpc_timeout_ms));
219
0
        return Status::InternalError(
220
0
                "fail to confirm unused remote files. host={}, port={}, code={}, reason={}",
221
0
                _master_info.network_address.hostname, _master_info.network_address.port,
222
0
                client_status.code(), e.what());
223
0
    }
224
225
6
    return Status::OK();
226
6
}
227
228
0
bool AgentUtils::exec_cmd(const string& command, string* errmsg, bool redirect_stderr) {
229
    // The exit status of the command.
230
0
    uint32_t rc = 0;
231
232
    // Redirect stderr to stdout to get error message.
233
0
    string cmd = command;
234
0
    if (redirect_stderr) {
235
0
        cmd += " 2>&1";
236
0
    }
237
238
    // Execute command.
239
0
    FILE* fp = popen(cmd.c_str(), "r");
240
0
    if (fp == nullptr) {
241
0
        *errmsg = fmt::format("popen failed. {}, with errno: {}.\n", strerror(errno), errno);
242
0
        return false;
243
0
    }
244
245
    // Get command output.
246
0
    char result[1024] = {'\0'};
247
0
    while (fgets(result, sizeof(result), fp) != nullptr) {
248
0
        *errmsg += result;
249
0
    }
250
251
    // Waits for the associated process to terminate and returns.
252
0
    rc = pclose(fp);
253
0
    if (rc == -1) {
254
0
        if (errno == ECHILD) {
255
0
            *errmsg += "pclose cannot obtain the child status.\n";
256
0
        } else {
257
0
            *errmsg += fmt::format("Close popen failed. {}, with errno: {}.\n", strerror(errno),
258
0
                                   errno);
259
0
        }
260
0
        return false;
261
0
    }
262
263
    // Get return code of command.
264
0
    int32_t status_child = WEXITSTATUS(rc);
265
0
    if (status_child == 0) {
266
0
        return true;
267
0
    } else {
268
0
        return false;
269
0
    }
270
0
}
271
272
0
bool AgentUtils::write_json_to_file(const map<string, string>& info, const string& path) {
273
0
    rapidjson::Document json_info(rapidjson::kObjectType);
274
0
    for (auto& it : info) {
275
0
        json_info.AddMember(rapidjson::Value(it.first.c_str(), json_info.GetAllocator()).Move(),
276
0
                            rapidjson::Value(it.second.c_str(), json_info.GetAllocator()).Move(),
277
0
                            json_info.GetAllocator());
278
0
    }
279
0
    rapidjson::StringBuffer json_info_str;
280
0
    rapidjson::Writer<rapidjson::StringBuffer> writer(json_info_str);
281
0
    json_info.Accept(writer);
282
0
    std::ofstream fp(path);
283
0
    if (!fp) {
284
0
        return false;
285
0
    }
286
0
    fp << json_info_str.GetString() << std::endl;
287
0
    fp.close();
288
289
0
    return true;
290
0
}
291
292
} // namespace doris