Coverage Report

Created: 2025-04-16 14:10

/root/doris/be/src/io/hdfs_util.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <bvar/bvar.h>
21
22
#include <atomic>
23
#include <cstdint>
24
#include <memory>
25
#include <string>
26
#include <unordered_map>
27
28
#include "common/status.h"
29
#include "io/fs/hdfs.h"
30
#include "io/fs/path.h"
31
32
namespace cloud {
33
class HdfsVaultInfo;
34
}
35
36
namespace doris {
37
class HDFSCommonBuilder;
38
class THdfsParams;
39
40
namespace io {
41
42
namespace hdfs_bvar {
43
extern bvar::LatencyRecorder hdfs_read_latency;
44
extern bvar::LatencyRecorder hdfs_write_latency;
45
extern bvar::LatencyRecorder hdfs_create_dir_latency;
46
extern bvar::LatencyRecorder hdfs_open_latency;
47
extern bvar::LatencyRecorder hdfs_close_latency;
48
extern bvar::LatencyRecorder hdfs_flush_latency;
49
extern bvar::LatencyRecorder hdfs_hflush_latency;
50
extern bvar::LatencyRecorder hdfs_hsync_latency;
51
}; // namespace hdfs_bvar
52
53
class HdfsHandler {
54
public:
55
    HdfsHandler(hdfsFS fs, bool cached)
56
            : hdfs_fs(fs),
57
              from_cache(cached),
58
              _create_time(std::chrono::duration_cast<std::chrono::milliseconds>(
59
                                   std::chrono::system_clock::now().time_since_epoch())
60
                                   .count()),
61
              _last_access_time(0),
62
0
              _invalid(false) {}
63
64
0
    ~HdfsHandler() {
65
0
        if (hdfs_fs != nullptr) {
66
            // DO NOT call hdfsDisconnect(), or we will meet "Filesystem closed"
67
            // even if we create a new one
68
            // hdfsDisconnect(hdfs_fs);
69
0
        }
70
0
        hdfs_fs = nullptr;
71
0
    }
72
73
0
    int64_t last_access_time() { return _last_access_time; }
74
75
0
    void update_last_access_time() {
76
0
        if (from_cache) {
77
0
            _last_access_time = std::chrono::duration_cast<std::chrono::milliseconds>(
78
0
                                        std::chrono::system_clock::now().time_since_epoch())
79
0
                                        .count();
80
0
        }
81
0
    }
82
83
0
    bool invalid() { return _invalid; }
84
85
0
    void set_invalid() { _invalid = true; }
86
87
    hdfsFS hdfs_fs;
88
    // When cache is full, and all handlers are in use, HdfsFileSystemCache will return an uncached handler.
89
    // Client should delete the handler in such case.
90
    const bool from_cache;
91
92
private:
93
    // For kerberos authentication, we need to save create time so that
94
    // we can know if the kerberos ticket is expired.
95
    std::atomic<uint64_t> _create_time;
96
    // HdfsFileSystemCache try to remove the oldest handler when the cache is full
97
    std::atomic<uint64_t> _last_access_time;
98
    // Client will set invalid if error thrown, and HdfsFileSystemCache will not reuse this handler
99
    std::atomic<bool> _invalid;
100
};
101
102
// Cache for HdfsHandler
103
class HdfsHandlerCache {
104
public:
105
0
    static HdfsHandlerCache* instance() {
106
0
        static HdfsHandlerCache s_instance;
107
0
        return &s_instance;
108
0
    }
109
110
    HdfsHandlerCache(const HdfsHandlerCache&) = delete;
111
    const HdfsHandlerCache& operator=(const HdfsHandlerCache&) = delete;
112
113
    // This function is thread-safe
114
    Status get_connection(const THdfsParams& hdfs_params, const std::string& fs_name,
115
                          std::shared_ptr<HdfsHandler>* fs_handle);
116
117
private:
118
    static constexpr int MAX_CACHE_HANDLE = 64;
119
120
    std::mutex _lock;
121
    std::unordered_map<uint64_t, std::shared_ptr<HdfsHandler>> _cache;
122
123
0
    HdfsHandlerCache() = default;
124
125
    void _clean_invalid();
126
    void _clean_oldest();
127
};
128
129
// if the format of path is hdfs://ip:port/path, replace it to /path.
130
// path like hdfs://ip:port/path can't be used by libhdfs3.
131
Path convert_path(const Path& path, const std::string& namenode);
132
133
std::string get_fs_name(const std::string& path);
134
135
// return true if path_or_fs contains "hdfs://"
136
bool is_hdfs(const std::string& path_or_fs);
137
138
THdfsParams to_hdfs_params(const cloud::HdfsVaultInfo& vault);
139
140
} // namespace io
141
} // namespace doris