Coverage Report

Created: 2026-06-17 07:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/file_factory.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/file_factory.h"
19
20
#include <gen_cpp/PaloInternalService_types.h>
21
#include <gen_cpp/PlanNodes_types.h>
22
#include <gen_cpp/Types_types.h>
23
24
#include <mutex>
25
#include <utility>
26
27
#include "common/cast_set.h"
28
#include "common/config.h"
29
#include "common/status.h"
30
#include "io/fs/broker_file_system.h"
31
#include "io/fs/broker_file_writer.h"
32
#include "io/fs/file_reader.h"
33
#include "io/fs/file_system.h"
34
#include "io/fs/hdfs/hdfs_mgr.h"
35
#include "io/fs/hdfs_file_reader.h"
36
#include "io/fs/hdfs_file_system.h"
37
#include "io/fs/hdfs_file_writer.h"
38
#include "io/fs/http_file_reader.h"
39
#include "io/fs/http_file_system.h"
40
#include "io/fs/local_file_system.h"
41
#include "io/fs/multi_table_pipe.h"
42
#include "io/fs/s3_file_reader.h"
43
#include "io/fs/s3_file_system.h"
44
#include "io/fs/s3_file_writer.h"
45
#include "io/fs/stream_load_pipe.h"
46
#include "io/hdfs_builder.h"
47
#include "io/hdfs_util.h"
48
#include "load/stream_load/new_load_stream_mgr.h"
49
#include "load/stream_load/stream_load_context.h"
50
#include "runtime/exec_env.h"
51
#include "runtime/runtime_state.h"
52
#include "util/s3_uri.h"
53
#include "util/s3_util.h"
54
#include "util/string_util.h"
55
#include "util/uid_util.h"
56
57
namespace doris {
58
59
constexpr std::string_view RANDOM_CACHE_BASE_PATH = "random";
60
61
namespace {
62
63
void append_identity_property(const StringCaseMap<std::string>& properties, const char* name,
64
96.8k
                              std::string* identity) {
65
96.8k
    auto it = properties.find(name);
66
96.8k
    if (it == properties.end()) {
67
32.3k
        return;
68
32.3k
    }
69
64.5k
    identity->push_back('\0');
70
64.5k
    identity->append(name);
71
64.5k
    identity->push_back('=');
72
64.5k
    identity->append(it->second);
73
64.5k
}
74
75
} // namespace
76
77
io::FileReaderOptions FileFactory::get_reader_options(RuntimeState* state,
78
77.0k
                                                      const io::FileDescription& fd) {
79
77.0k
    io::FileReaderOptions opts {
80
77.0k
            .cache_base_path {},
81
77.0k
            .file_size = fd.file_size,
82
77.0k
            .mtime = fd.mtime,
83
77.0k
    };
84
77.0k
    if (config::enable_file_cache && state != nullptr &&
85
77.0k
        state->query_options().__isset.enable_file_cache &&
86
77.0k
        state->query_options().enable_file_cache && fd.file_cache_admission) {
87
772
        opts.cache_type = io::FileCachePolicy::FILE_BLOCK_CACHE;
88
772
    }
89
77.0k
    if (state != nullptr && state->query_options().__isset.file_cache_base_path &&
90
77.0k
        state->query_options().file_cache_base_path != RANDOM_CACHE_BASE_PATH) {
91
0
        opts.cache_base_path = state->query_options().file_cache_base_path;
92
0
    }
93
77.0k
    return opts;
94
77.0k
}
95
96
2
int32_t get_broker_index(const std::vector<TNetworkAddress>& brokers, const std::string& path) {
97
2
    if (brokers.empty()) {
98
0
        return -1;
99
0
    }
100
101
    // firstly find local broker
102
2
    const auto local_host = BackendOptions::get_localhost();
103
2
    for (int32_t i = 0; i < brokers.size(); ++i) {
104
2
        if (brokers[i].hostname == local_host) {
105
2
            return i;
106
2
        }
107
2
    }
108
109
    // secondly select broker by hash of file path
110
0
    auto key = HashUtil::hash(path.data(), cast_set<uint32_t>(path.size()), 0);
111
112
0
    return key % brokers.size();
113
2
}
114
115
Result<io::FileSystemSPtr> FileFactory::create_fs(const io::FSPropertiesRef& fs_properties,
116
4.64k
                                                  const io::FileDescription& file_description) {
117
4.64k
    switch (fs_properties.type) {
118
1
    case TFileType::FILE_LOCAL:
119
1
        return io::global_local_filesystem();
120
0
    case TFileType::FILE_BROKER: {
121
0
        auto index = get_broker_index(*fs_properties.broker_addresses, file_description.path);
122
0
        if (index < 0) {
123
0
            return ResultError(Status::InternalError("empty broker_addresses"));
124
0
        }
125
0
        LOG_INFO("select broker: {} for file {}", (*fs_properties.broker_addresses)[index].hostname,
126
0
                 file_description.path);
127
0
        return io::BrokerFileSystem::create((*fs_properties.broker_addresses)[index],
128
0
                                            *fs_properties.properties, io::FileSystem::TMP_FS_ID);
129
0
    }
130
2.01k
    case TFileType::FILE_S3: {
131
2.01k
        S3URI s3_uri(file_description.path);
132
2.01k
        RETURN_IF_ERROR_RESULT(s3_uri.parse());
133
2.01k
        S3Conf s3_conf;
134
2.01k
        RETURN_IF_ERROR_RESULT(S3ClientFactory::convert_properties_to_s3_conf(
135
2.01k
                *fs_properties.properties, s3_uri, &s3_conf));
136
2.01k
        return io::S3FileSystem::create(std::move(s3_conf), io::FileSystem::TMP_FS_ID);
137
2.01k
    }
138
2.63k
    case TFileType::FILE_HDFS: {
139
2.63k
        std::string fs_name = get_fs_name(file_description);
140
2.63k
        return io::HdfsFileSystem::create(*fs_properties.properties, fs_name,
141
2.63k
                                          io::FileSystem::TMP_FS_ID, nullptr);
142
2.01k
    }
143
0
    case TFileType::FILE_HTTP: {
144
0
        const auto& kv = *fs_properties.properties;
145
0
        auto it = kv.find("uri");
146
0
        if (it == kv.end() || it->second.empty()) {
147
0
            return ResultError(Status::InternalError("http fs must set uri property"));
148
0
        }
149
0
        return io::HttpFileSystem::create(it->second, io::FileSystem::TMP_FS_ID, kv);
150
0
    }
151
0
    default:
152
0
        return ResultError(Status::InternalError("unsupported fs type: {}",
153
0
                                                 std::to_string(fs_properties.type)));
154
4.64k
    }
155
4.64k
}
156
157
76.5k
std::string FileFactory::get_fs_name(const io::FileDescription& file_description) {
158
    // If the destination path contains a schema, use the schema directly.
159
    // If not, use origin file_description.fs_name
160
    // Because the default fsname in file_description.fs_name maybe different from
161
    // file's.
162
    // example:
163
    //    hdfs://host:port/path1/path2  --> hdfs://host:port
164
    //    hdfs://nameservice/path1/path2 --> hdfs://nameservice
165
76.5k
    std::string fs_name = file_description.fs_name;
166
76.5k
    std::string::size_type idx = file_description.path.find("://");
167
76.6k
    if (idx != std::string::npos) {
168
76.6k
        idx = file_description.path.find('/', idx + 3);
169
76.6k
        if (idx != std::string::npos) {
170
76.6k
            fs_name = file_description.path.substr(0, idx);
171
76.6k
        }
172
76.6k
    }
173
76.5k
    return fs_name;
174
76.5k
}
175
176
std::string FileFactory::get_file_cache_identity(const io::FileSystemProperties& system_properties,
177
106k
                                                 const io::FileDescription& file_description) {
178
106k
    switch (system_properties.system_type) {
179
73.9k
    case TFileType::FILE_HDFS: {
180
73.9k
        io::FileDescription identity_description = file_description;
181
73.9k
        if (identity_description.fs_name.empty()) {
182
2.55k
            identity_description.fs_name = system_properties.hdfs_params.fs_name;
183
2.55k
        }
184
73.9k
        return get_fs_name(identity_description);
185
0
    }
186
32.2k
    case TFileType::FILE_S3: {
187
32.2k
        StringCaseMap<std::string> properties(system_properties.properties.begin(),
188
32.2k
                                              system_properties.properties.end());
189
32.2k
        std::string identity = "s3";
190
32.2k
        append_identity_property(properties, "AWS_ENDPOINT", &identity);
191
32.2k
        append_identity_property(properties, "AWS_REGION", &identity);
192
32.2k
        append_identity_property(properties, "provider", &identity);
193
32.2k
        return identity;
194
0
    }
195
418
    default:
196
418
        return file_description.fs_name;
197
106k
    }
198
106k
}
199
200
Result<io::FileWriterPtr> FileFactory::create_file_writer(
201
        TFileType::type type, ExecEnv* env, const std::vector<TNetworkAddress>& broker_addresses,
202
        const std::map<std::string, std::string>& properties, const std::string& path,
203
672
        const io::FileWriterOptions& options) {
204
672
    io::FileWriterPtr file_writer;
205
672
    switch (type) {
206
145
    case TFileType::FILE_LOCAL: {
207
145
        RETURN_IF_ERROR_RESULT(io::global_local_filesystem()->create_file(path, &file_writer));
208
145
        return file_writer;
209
145
    }
210
2
    case TFileType::FILE_BROKER: {
211
2
        auto index = get_broker_index(broker_addresses, path);
212
2
        if (index < 0) {
213
0
            return ResultError(Status::InternalError("empty broker_addresses"));
214
0
        }
215
2
        LOG_INFO("select broker: {} for file {}", broker_addresses[index].hostname, path);
216
2
        return io::BrokerFileWriter::create(env, broker_addresses[index], properties, path);
217
2
    }
218
367
    case TFileType::FILE_S3: {
219
367
        S3URI s3_uri(path);
220
367
        RETURN_IF_ERROR_RESULT(s3_uri.parse());
221
367
        S3Conf s3_conf;
222
367
        RETURN_IF_ERROR_RESULT(
223
367
                S3ClientFactory::convert_properties_to_s3_conf(properties, s3_uri, &s3_conf));
224
367
        auto client = std::make_shared<io::ObjClientHolder>(std::move(s3_conf.client_conf));
225
367
        RETURN_IF_ERROR_RESULT(client->init());
226
367
        return std::make_unique<io::S3FileWriter>(std::move(client), std::move(s3_conf.bucket),
227
367
                                                  s3_uri.get_key(), &options);
228
367
    }
229
158
    case TFileType::FILE_HDFS: {
230
158
        THdfsParams hdfs_params = parse_properties(properties);
231
158
        std::shared_ptr<io::HdfsHandler> handler;
232
158
        RETURN_IF_ERROR_RESULT(ExecEnv::GetInstance()->hdfs_mgr()->get_or_create_fs(
233
158
                hdfs_params, hdfs_params.fs_name, &handler));
234
158
        return io::HdfsFileWriter::create(path, handler, hdfs_params.fs_name, &options);
235
158
    }
236
0
    default:
237
0
        return ResultError(
238
0
                Status::InternalError("unsupported file writer type: {}", std::to_string(type)));
239
672
    }
240
672
}
241
242
Result<io::FileReaderSPtr> FileFactory::create_file_reader(
243
        const io::FileSystemProperties& system_properties,
244
        const io::FileDescription& file_description, const io::FileReaderOptions& reader_options,
245
77.0k
        RuntimeProfile* profile) {
246
77.0k
    auto reader_res = _create_file_reader_internal(system_properties, file_description,
247
77.0k
                                                   reader_options, profile);
248
77.0k
    if (!reader_res.has_value()) {
249
0
        return unexpected(std::move(reader_res).error());
250
0
    }
251
77.0k
    return std::move(reader_res).value();
252
77.0k
}
253
254
Result<io::FileReaderSPtr> FileFactory::_create_file_reader_internal(
255
        const io::FileSystemProperties& system_properties,
256
        const io::FileDescription& file_description, const io::FileReaderOptions& reader_options,
257
77.1k
        RuntimeProfile* profile) {
258
77.1k
    TFileType::type type = system_properties.system_type;
259
77.1k
    switch (type) {
260
1.05k
    case TFileType::FILE_LOCAL: {
261
1.05k
        io::FileReaderSPtr file_reader;
262
1.05k
        RETURN_IF_ERROR_RESULT(io::global_local_filesystem()->open_file(
263
1.05k
                file_description.path, &file_reader, &reader_options));
264
1.05k
        return file_reader;
265
1.05k
    }
266
34.8k
    case TFileType::FILE_S3: {
267
34.8k
        S3URI s3_uri(file_description.path);
268
34.8k
        RETURN_IF_ERROR_RESULT(s3_uri.parse());
269
34.8k
        S3Conf s3_conf;
270
34.8k
        RETURN_IF_ERROR_RESULT(S3ClientFactory::convert_properties_to_s3_conf(
271
34.8k
                system_properties.properties, s3_uri, &s3_conf));
272
34.8k
        auto client_holder = std::make_shared<io::ObjClientHolder>(s3_conf.client_conf);
273
34.8k
        RETURN_IF_ERROR_RESULT(client_holder->init());
274
34.8k
        return io::S3FileReader::create(std::move(client_holder), s3_conf.bucket, s3_uri.get_key(),
275
34.8k
                                        file_description.file_size, profile)
276
34.8k
                .and_then([&](auto&& reader) {
277
34.8k
                    return io::create_cached_file_reader(std::move(reader), reader_options);
278
34.8k
                });
279
34.8k
    }
280
40.8k
    case TFileType::FILE_HDFS: {
281
40.8k
        std::shared_ptr<io::HdfsHandler> handler;
282
        // FIXME(plat1ko): Explain the difference between `system_properties.hdfs_params.fs_name`
283
        // and `file_description.fs_name`, it's so confused.
284
40.8k
        const std::string fs_name = get_file_cache_identity(system_properties, file_description);
285
40.8k
        RETURN_IF_ERROR_RESULT(ExecEnv::GetInstance()->hdfs_mgr()->get_or_create_fs(
286
40.8k
                system_properties.hdfs_params, fs_name, &handler));
287
40.8k
        return io::HdfsFileReader::create(file_description.path, handler->hdfs_fs, fs_name,
288
40.8k
                                          reader_options, profile)
289
40.8k
                .and_then([&](auto&& reader) {
290
40.8k
                    return io::create_cached_file_reader(std::move(reader), reader_options);
291
40.8k
                });
292
40.8k
    }
293
0
    case TFileType::FILE_BROKER: {
294
0
        auto index = get_broker_index(system_properties.broker_addresses, file_description.path);
295
0
        if (index < 0) {
296
0
            return ResultError(Status::InternalError("empty broker_addresses"));
297
0
        }
298
0
        LOG_INFO("select broker: {} for file {}",
299
0
                 system_properties.broker_addresses[index].hostname, file_description.path);
300
        // TODO(plat1ko): Create `FileReader` without FS
301
0
        return io::BrokerFileSystem::create(system_properties.broker_addresses[index],
302
0
                                            system_properties.properties, io::FileSystem::TMP_FS_ID)
303
0
                .and_then([&](auto&& fs) -> Result<io::FileReaderSPtr> {
304
0
                    io::FileReaderSPtr file_reader;
305
0
                    RETURN_IF_ERROR_RESULT(
306
0
                            fs->open_file(file_description.path, &file_reader, &reader_options));
307
0
                    return file_reader;
308
0
                });
309
0
    }
310
432
    case TFileType::FILE_HTTP: {
311
432
        return io::HttpFileReader::create(file_description.path, system_properties.properties,
312
432
                                          reader_options, profile)
313
432
                .and_then([&](auto&& reader) {
314
432
                    return io::create_cached_file_reader(std::move(reader), reader_options);
315
432
                });
316
0
    }
317
0
    default:
318
0
        return ResultError(
319
0
                Status::InternalError("unsupported file reader type: {}", std::to_string(type)));
320
77.1k
    }
321
77.1k
}
322
323
// file scan node/stream load pipe
324
Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader,
325
4.63k
                                       RuntimeState* runtime_state, bool need_schema) {
326
4.63k
    auto stream_load_ctx = ExecEnv::GetInstance()->new_load_stream_mgr()->get(load_id);
327
4.63k
    if (!stream_load_ctx) {
328
0
        return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string());
329
0
    }
330
4.63k
    if (need_schema) {
331
123
        RETURN_IF_ERROR(stream_load_ctx->allocate_schema_buffer());
332
        // Here, a portion of the data is processed to parse column information
333
123
        auto pipe = std::make_shared<io::StreamLoadPipe>(
334
123
                io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
335
123
                stream_load_ctx->schema_buffer()->pos /* total_length */);
336
123
        stream_load_ctx->schema_buffer()->flip();
337
123
        RETURN_IF_ERROR(pipe->append(stream_load_ctx->schema_buffer()));
338
123
        RETURN_IF_ERROR(pipe->finish());
339
123
        *file_reader = std::move(pipe);
340
4.51k
    } else {
341
4.51k
        *file_reader = stream_load_ctx->pipe;
342
4.51k
    }
343
344
4.63k
    return Status::OK();
345
4.63k
}
346
347
} // namespace doris