Coverage Report

Created: 2024-11-21 12:22

/root/doris/be/src/io/hdfs_builder.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/hdfs_builder.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/PlanNodes_types.h>
22
23
#include <cstdarg>
24
#include <cstdlib>
25
#include <utility>
26
#include <vector>
27
28
#include "common/config.h"
29
#include "common/logging.h"
30
#ifdef USE_HADOOP_HDFS
31
#include "hadoop_hdfs/hdfs.h"
32
#endif
33
#include "io/fs/hdfs.h"
34
#include "util/string_util.h"
35
36
namespace doris {
37
38
#ifdef USE_HADOOP_HDFS
39
0
void err_log_message(const char* fmt, ...) {
40
0
    va_list args;
41
0
    va_start(args, fmt);
42
43
    // First, call vsnprintf to get the required buffer size
44
0
    int size = vsnprintf(nullptr, 0, fmt, args) + 1; // +1 for '\0'
45
0
    if (size <= 0) {
46
0
        LOG(ERROR) << "Error formatting log message, invalid size";
47
0
        va_end(args);
48
0
        return;
49
0
    }
50
51
0
    va_end(args);
52
0
    va_start(args, fmt); // Reinitialize va_list
53
54
    // Allocate a buffer and format the string into it
55
0
    std::vector<char> buffer(size);
56
0
    vsnprintf(buffer.data(), size, fmt, args);
57
58
0
    va_end(args);
59
60
    // Use glog to log the message
61
0
    LOG(ERROR) << buffer.data();
62
0
}
63
64
0
void va_err_log_message(const char* fmt, va_list ap) {
65
0
    va_list args_copy;
66
0
    va_copy(args_copy, ap);
67
68
    // Call vsnprintf to get the required buffer size
69
0
    int size = vsnprintf(nullptr, 0, fmt, args_copy) + 1; // +1 for '\0'
70
0
    va_end(args_copy);                                    // Release the copied va_list
71
72
0
    if (size <= 0) {
73
0
        LOG(ERROR) << "Error formatting log message, invalid size";
74
0
        return;
75
0
    }
76
77
    // Reinitialize va_list for the second vsnprintf call
78
0
    va_copy(args_copy, ap);
79
80
    // Allocate a buffer and format the string into it
81
0
    std::vector<char> buffer(size);
82
0
    vsnprintf(buffer.data(), size, fmt, args_copy);
83
84
0
    va_end(args_copy);
85
86
    // Use glog to log the message
87
0
    LOG(ERROR) << buffer.data();
88
0
}
89
90
struct hdfsLogger logger = {.errLogMessage = err_log_message,
91
                            .vaErrLogMessage = va_err_log_message};
92
#endif // #ifdef USE_HADOOP_HDFS
93
94
0
Status HDFSCommonBuilder::init_hdfs_builder() {
95
0
#ifdef USE_HADOOP_HDFS
96
0
    static std::once_flag flag;
97
0
    std::call_once(flag, []() { hdfsSetLogger(&logger); });
98
0
#endif // #ifdef USE_HADOOP_HDFS
99
100
0
    hdfs_builder = hdfsNewBuilder();
101
0
    if (hdfs_builder == nullptr) {
102
0
        LOG(INFO) << "failed to init HDFSCommonBuilder, please check check be/conf/hdfs-site.xml";
103
0
        return Status::InternalError(
104
0
                "failed to init HDFSCommonBuilder, please check check be/conf/hdfs-site.xml");
105
0
    }
106
0
    hdfsBuilderSetForceNewInstance(hdfs_builder);
107
0
    return Status::OK();
108
0
}
109
110
0
Status HDFSCommonBuilder::check_krb_params() {
111
0
    std::string ticket_path = doris::config::kerberos_ccache_path;
112
0
    if (!ticket_path.empty()) {
113
0
        hdfsBuilderConfSetStr(hdfs_builder, "hadoop.security.kerberos.ticket.cache.path",
114
0
                              ticket_path.c_str());
115
0
        return Status::OK();
116
0
    }
117
    // we should check hdfs_kerberos_principal and hdfs_kerberos_keytab nonnull to login kdc.
118
0
    if (hdfs_kerberos_principal.empty() || hdfs_kerberos_keytab.empty()) {
119
0
        return Status::InvalidArgument("Invalid hdfs_kerberos_principal or hdfs_kerberos_keytab");
120
0
    }
121
    // enable auto-renew thread
122
0
    hdfsBuilderConfSetStr(hdfs_builder, "hadoop.kerberos.keytab.login.autorenewal.enabled", "true");
123
0
    return Status::OK();
124
0
}
125
126
0
THdfsParams parse_properties(const std::map<std::string, std::string>& properties) {
127
0
    StringCaseMap<std::string> prop(properties.begin(), properties.end());
128
0
    std::vector<THdfsConf> hdfs_configs;
129
0
    THdfsParams hdfsParams;
130
0
    for (auto iter = prop.begin(); iter != prop.end();) {
131
0
        if (iter->first.compare(FS_KEY) == 0) {
132
0
            hdfsParams.__set_fs_name(iter->second);
133
0
            iter = prop.erase(iter);
134
0
        } else if (iter->first.compare(USER) == 0) {
135
0
            hdfsParams.__set_user(iter->second);
136
0
            iter = prop.erase(iter);
137
0
        } else if (iter->first.compare(KERBEROS_PRINCIPAL) == 0) {
138
0
            hdfsParams.__set_hdfs_kerberos_principal(iter->second);
139
0
            iter = prop.erase(iter);
140
0
        } else if (iter->first.compare(KERBEROS_KEYTAB) == 0) {
141
0
            hdfsParams.__set_hdfs_kerberos_keytab(iter->second);
142
0
            iter = prop.erase(iter);
143
0
        } else {
144
0
            THdfsConf item;
145
0
            item.key = iter->first;
146
0
            item.value = iter->second;
147
0
            hdfs_configs.push_back(item);
148
0
            iter = prop.erase(iter);
149
0
        }
150
0
    }
151
0
    if (!hdfsParams.__isset.user && std::getenv("HADOOP_USER_NAME") != nullptr) {
152
0
        hdfsParams.__set_user(std::getenv("HADOOP_USER_NAME"));
153
0
    }
154
0
    hdfsParams.__set_hdfs_conf(hdfs_configs);
155
0
    return hdfsParams;
156
0
}
157
158
Status create_hdfs_builder(const THdfsParams& hdfsParams, const std::string& fs_name,
159
0
                           HDFSCommonBuilder* builder) {
160
0
    RETURN_IF_ERROR(builder->init_hdfs_builder());
161
0
    hdfsBuilderSetNameNode(builder->get(), fs_name.c_str());
162
    // set kerberos conf
163
0
    if (hdfsParams.__isset.hdfs_kerberos_keytab) {
164
0
        builder->kerberos_login = true;
165
0
        builder->hdfs_kerberos_keytab = hdfsParams.hdfs_kerberos_keytab;
166
0
#ifdef USE_HADOOP_HDFS
167
0
        hdfsBuilderSetKerb5Conf(builder->get(), doris::config::kerberos_krb5_conf_path.c_str());
168
0
        hdfsBuilderSetKeyTabFile(builder->get(), hdfsParams.hdfs_kerberos_keytab.c_str());
169
0
#endif
170
0
    }
171
0
    if (hdfsParams.__isset.hdfs_kerberos_principal) {
172
0
        builder->kerberos_login = true;
173
0
        builder->hdfs_kerberos_principal = hdfsParams.hdfs_kerberos_principal;
174
0
        hdfsBuilderSetPrincipal(builder->get(), hdfsParams.hdfs_kerberos_principal.c_str());
175
0
    } else if (hdfsParams.__isset.user) {
176
0
        hdfsBuilderSetUserName(builder->get(), hdfsParams.user.c_str());
177
0
#ifdef USE_HADOOP_HDFS
178
0
        hdfsBuilderSetKerb5Conf(builder->get(), nullptr);
179
0
        hdfsBuilderSetKeyTabFile(builder->get(), nullptr);
180
0
#endif
181
0
    }
182
    // set other conf
183
0
    if (hdfsParams.__isset.hdfs_conf) {
184
0
        for (const THdfsConf& conf : hdfsParams.hdfs_conf) {
185
0
            hdfsBuilderConfSetStr(builder->get(), conf.key.c_str(), conf.value.c_str());
186
0
            LOG(INFO) << "set hdfs config: " << conf.key << ", value: " << conf.value;
187
0
#ifdef USE_HADOOP_HDFS
188
            // Set krb5.conf, we should define java.security.krb5.conf in catalog properties
189
0
            if (strcmp(conf.key.c_str(), "java.security.krb5.conf") == 0) {
190
0
                hdfsBuilderSetKerb5Conf(builder->get(), conf.value.c_str());
191
0
            }
192
0
#endif
193
0
        }
194
0
    }
195
0
    if (builder->is_kerberos()) {
196
0
        RETURN_IF_ERROR(builder->check_krb_params());
197
0
    }
198
0
    hdfsBuilderConfSetStr(builder->get(), "ipc.client.fallback-to-simple-auth-allowed", "true");
199
0
    return Status::OK();
200
0
}
201
202
Status create_hdfs_builder(const std::map<std::string, std::string>& properties,
203
0
                           HDFSCommonBuilder* builder) {
204
0
    THdfsParams hdfsParams = parse_properties(properties);
205
0
    return create_hdfs_builder(hdfsParams, hdfsParams.fs_name, builder);
206
0
}
207
208
} // namespace doris