Coverage Report

Created: 2025-04-25 11:21

/root/doris/be/src/io/hdfs_builder.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/hdfs_builder.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/PlanNodes_types.h>
22
23
#include <cstdarg>
24
#include <cstdlib>
25
#include <utility>
26
#include <vector>
27
28
#include "common/config.h"
29
#include "common/kerberos/kerberos_ticket_mgr.h"
30
#include "common/logging.h"
31
#ifdef USE_HADOOP_HDFS
32
#include "hadoop_hdfs/hdfs.h"
33
#endif
34
#include "io/fs/hdfs.h"
35
#include "runtime/exec_env.h"
36
#include "util/string_util.h"
37
38
namespace doris {
39
40
#ifdef USE_DORIS_HADOOP_HDFS
41
0
void err_log_message(const char* fmt, ...) {
42
0
    va_list args;
43
0
    va_start(args, fmt);
44
45
    // First, call vsnprintf to get the required buffer size
46
0
    int size = vsnprintf(nullptr, 0, fmt, args) + 1; // +1 for '\0'
47
0
    if (size <= 0) {
48
0
        LOG(ERROR) << "Error formatting log message, invalid size";
49
0
        va_end(args);
50
0
        return;
51
0
    }
52
53
0
    va_end(args);
54
0
    va_start(args, fmt); // Reinitialize va_list
55
56
    // Allocate a buffer and format the string into it
57
0
    std::vector<char> buffer(size);
58
0
    vsnprintf(buffer.data(), size, fmt, args);
59
60
0
    va_end(args);
61
62
    // Use glog to log the message
63
0
    LOG(ERROR) << buffer.data();
64
0
}
65
66
0
void va_err_log_message(const char* fmt, va_list ap) {
67
0
    va_list args_copy;
68
0
    va_copy(args_copy, ap);
69
70
    // Call vsnprintf to get the required buffer size
71
0
    int size = vsnprintf(nullptr, 0, fmt, args_copy) + 1; // +1 for '\0'
72
0
    va_end(args_copy);                                    // Release the copied va_list
73
74
0
    if (size <= 0) {
75
0
        LOG(ERROR) << "Error formatting log message, invalid size";
76
0
        return;
77
0
    }
78
79
    // Reinitialize va_list for the second vsnprintf call
80
0
    va_copy(args_copy, ap);
81
82
    // Allocate a buffer and format the string into it
83
0
    std::vector<char> buffer(size);
84
0
    vsnprintf(buffer.data(), size, fmt, args_copy);
85
86
0
    va_end(args_copy);
87
88
    // Use glog to log the message
89
0
    LOG(ERROR) << buffer.data();
90
0
}
91
92
struct hdfsLogger logger = {.errLogMessage = err_log_message,
93
                            .vaErrLogMessage = va_err_log_message};
94
#endif // #ifdef USE_DORIS_HADOOP_HDFS
95
96
0
Status HDFSCommonBuilder::init_hdfs_builder() {
97
0
#ifdef USE_DORIS_HADOOP_HDFS
98
0
    static std::once_flag flag;
99
0
    std::call_once(flag, []() { hdfsSetLogger(&logger); });
100
0
#endif // #ifdef USE_DORIS_HADOOP_HDFS
101
102
0
    hdfs_builder = hdfsNewBuilder();
103
0
    if (hdfs_builder == nullptr) {
104
0
        LOG(INFO) << "failed to init HDFSCommonBuilder, please check check be/conf/hdfs-site.xml";
105
0
        return Status::InternalError(
106
0
                "failed to init HDFSCommonBuilder, please check check be/conf/hdfs-site.xml");
107
0
    }
108
0
    hdfsBuilderSetForceNewInstance(hdfs_builder);
109
0
    return Status::OK();
110
0
}
111
112
0
Status HDFSCommonBuilder::check_krb_params() {
113
0
    std::string ticket_path = doris::config::kerberos_ccache_path;
114
0
    if (!ticket_path.empty()) {
115
0
        hdfsBuilderConfSetStr(hdfs_builder, "hadoop.security.kerberos.ticket.cache.path",
116
0
                              ticket_path.c_str());
117
0
        return Status::OK();
118
0
    }
119
    // we should check hdfs_kerberos_principal and hdfs_kerberos_keytab nonnull to login kdc.
120
0
    if (hdfs_kerberos_principal.empty() || hdfs_kerberos_keytab.empty()) {
121
0
        return Status::InvalidArgument("Invalid hdfs_kerberos_principal or hdfs_kerberos_keytab");
122
0
    }
123
    // enable auto-renew thread
124
0
    hdfsBuilderConfSetStr(hdfs_builder, "hadoop.kerberos.keytab.login.autorenewal.enabled", "true");
125
0
    return Status::OK();
126
0
}
127
128
0
void HDFSCommonBuilder::set_hdfs_conf(const std::string& key, const std::string& val) {
129
0
    hdfs_conf[key] = val;
130
0
}
131
132
std::string HDFSCommonBuilder::get_hdfs_conf_value(const std::string& key,
133
0
                                                   const std::string& default_val) const {
134
0
    auto it = hdfs_conf.find(key);
135
0
    if (it != hdfs_conf.end()) {
136
0
        return it->second;
137
0
    } else {
138
0
        return default_val;
139
0
    }
140
0
}
141
142
0
void HDFSCommonBuilder::set_hdfs_conf_to_hdfs_builder() {
143
0
    for (const auto& pair : hdfs_conf) {
144
0
        hdfsBuilderConfSetStr(hdfs_builder, pair.first.c_str(), pair.second.c_str());
145
0
    }
146
0
}
147
148
// This method is deprecated, will be removed later
149
0
Status HDFSCommonBuilder::set_kerberos_ticket_cache() {
150
    // kerberos::KerberosConfig config;
151
    // config.set_principal_and_keytab(hdfs_kerberos_principal, hdfs_kerberos_keytab);
152
    // config.set_krb5_conf_path(config::kerberos_krb5_conf_path);
153
    // config.set_refresh_interval(config::kerberos_refresh_interval_second);
154
    // config.set_min_time_before_refresh(600);
155
    // kerberos::KerberosTicketMgr* ticket_mgr = ExecEnv::GetInstance()->kerberos_ticket_mgr();
156
    // RETURN_IF_ERROR(ticket_mgr->get_or_set_ticket_cache(config, &ticket_cache));
157
    // // ATTN, can't use ticket_cache->get_ticket_cache_path() directly,
158
    // // it may cause the kerberos ticket cache path in libhdfs is empty,
159
    // kerberos_ticket_path = ticket_cache->get_ticket_cache_path();
160
    // hdfsBuilderSetUserName(hdfs_builder, hdfs_kerberos_principal.c_str());
161
    // hdfsBuilderSetKerbTicketCachePath(hdfs_builder, kerberos_ticket_path.c_str());
162
    // hdfsBuilderSetForceNewInstance(hdfs_builder);
163
    // LOG(INFO) << "get kerberos ticket path: " << kerberos_ticket_path
164
    //           << " with principal: " << hdfs_kerberos_principal;
165
0
    return Status::OK();
166
0
}
167
168
0
THdfsParams parse_properties(const std::map<std::string, std::string>& properties) {
169
0
    StringCaseMap<std::string> prop(properties.begin(), properties.end());
170
0
    std::vector<THdfsConf> hdfs_configs;
171
0
    THdfsParams hdfsParams;
172
0
    for (auto iter = prop.begin(); iter != prop.end();) {
173
0
        if (iter->first.compare(FS_KEY) == 0) {
174
0
            hdfsParams.__set_fs_name(iter->second);
175
0
            iter = prop.erase(iter);
176
0
        } else if (iter->first.compare(USER) == 0) {
177
0
            hdfsParams.__set_user(iter->second);
178
0
            iter = prop.erase(iter);
179
0
        } else if (iter->first.compare(KERBEROS_PRINCIPAL) == 0) {
180
0
            hdfsParams.__set_hdfs_kerberos_principal(iter->second);
181
0
            iter = prop.erase(iter);
182
0
        } else if (iter->first.compare(KERBEROS_KEYTAB) == 0) {
183
0
            hdfsParams.__set_hdfs_kerberos_keytab(iter->second);
184
0
            iter = prop.erase(iter);
185
0
        } else {
186
0
            THdfsConf item;
187
0
            item.key = iter->first;
188
0
            item.value = iter->second;
189
0
            hdfs_configs.push_back(item);
190
0
            iter = prop.erase(iter);
191
0
        }
192
0
    }
193
0
    if (!hdfsParams.__isset.user && std::getenv("HADOOP_USER_NAME") != nullptr) {
194
0
        hdfsParams.__set_user(std::getenv("HADOOP_USER_NAME"));
195
0
    }
196
0
    hdfsParams.__set_hdfs_conf(hdfs_configs);
197
0
    return hdfsParams;
198
0
}
199
200
Status create_hdfs_builder(const THdfsParams& hdfsParams, const std::string& fs_name,
201
0
                           HDFSCommonBuilder* builder) {
202
0
    RETURN_IF_ERROR(builder->init_hdfs_builder());
203
0
    builder->fs_name = fs_name;
204
0
    hdfsBuilderSetNameNode(builder->get(), builder->fs_name.c_str());
205
0
    LOG(INFO) << "set hdfs namenode: " << fs_name;
206
207
0
    std::string auth_type = "simple";
208
    // First, copy all hdfs conf and set to hdfs builder
209
0
    if (hdfsParams.__isset.hdfs_conf) {
210
        // set other conf
211
0
        for (const THdfsConf& conf : hdfsParams.hdfs_conf) {
212
0
            builder->set_hdfs_conf(conf.key, conf.value);
213
0
            LOG(INFO) << "set hdfs config key: " << conf.key << ", value: " << conf.value;
214
0
            if (strcmp(conf.key.c_str(), "hadoop.security.authentication") == 0) {
215
0
                auth_type = conf.value;
216
0
            }
217
0
        }
218
0
        builder->set_hdfs_conf_to_hdfs_builder();
219
0
    }
220
221
0
    if (auth_type == "kerberos") {
222
        // set kerberos conf
223
0
        if (!hdfsParams.__isset.hdfs_kerberos_principal ||
224
0
            !hdfsParams.__isset.hdfs_kerberos_keytab) {
225
0
            return Status::InvalidArgument("Must set both principal and keytab");
226
0
        }
227
0
        builder->kerberos_login = true;
228
0
        builder->hdfs_kerberos_principal = hdfsParams.hdfs_kerberos_principal;
229
0
        builder->hdfs_kerberos_keytab = hdfsParams.hdfs_kerberos_keytab;
230
0
        hdfsBuilderSetPrincipal(builder->get(), builder->hdfs_kerberos_principal.c_str());
231
0
#ifdef USE_HADOOP_HDFS
232
0
        hdfsBuilderSetKerb5Conf(builder->get(), doris::config::kerberos_krb5_conf_path.c_str());
233
0
        hdfsBuilderSetKeyTabFile(builder->get(), builder->hdfs_kerberos_keytab.c_str());
234
0
#endif
235
0
        hdfsBuilderConfSetStr(builder->get(), "hadoop.kerberos.keytab.login.autorenewal.enabled",
236
0
                              "true");
237
        // RETURN_IF_ERROR(builder->set_kerberos_ticket_cache());
238
0
    } else {
239
0
        if (hdfsParams.__isset.user) {
240
0
            builder->hadoop_user = hdfsParams.user;
241
0
            hdfsBuilderSetUserName(builder->get(), builder->hadoop_user.c_str());
242
0
        }
243
0
    }
244
0
    hdfsBuilderConfSetStr(builder->get(), FALLBACK_TO_SIMPLE_AUTH_ALLOWED.c_str(),
245
0
                          TRUE_VALUE.c_str());
246
0
    return Status::OK();
247
0
}
248
249
Status create_hdfs_builder(const std::map<std::string, std::string>& properties,
250
0
                           HDFSCommonBuilder* builder) {
251
0
    THdfsParams hdfsParams = parse_properties(properties);
252
0
    return create_hdfs_builder(hdfsParams, hdfsParams.fs_name, builder);
253
0
}
254
255
} // namespace doris