be/src/io/fs/hdfs_file_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/fs/hdfs_file_writer.h" |
19 | | |
20 | | #include <fcntl.h> |
21 | | #include <fmt/core.h> |
22 | | |
23 | | #include <chrono> |
24 | | #include <filesystem> |
25 | | #include <ostream> |
26 | | #include <random> |
27 | | #include <string> |
28 | | #include <thread> |
29 | | #include <utility> |
30 | | |
31 | | #include "common/config.h" |
32 | | #include "common/logging.h" |
33 | | #include "common/status.h" |
34 | | #include "cpp/sync_point.h" |
35 | | #include "io/cache/block_file_cache.h" |
36 | | #include "io/cache/block_file_cache_factory.h" |
37 | | #include "io/cache/file_cache_common.h" |
38 | | #include "io/fs/err_utils.h" |
39 | | #include "io/fs/file_writer.h" |
40 | | #include "io/fs/hdfs_file_system.h" |
41 | | #include "io/hdfs_util.h" |
42 | | #include "runtime/exec_env.h" |
43 | | #include "service/backend_options.h" |
44 | | #include "util/bvar_helper.h" |
45 | | #include "util/jni-util.h" |
46 | | |
47 | | namespace doris::io { |
48 | | #include "common/compile_check_begin.h" |
49 | | |
50 | | bvar::Adder<uint64_t> hdfs_file_writer_total("hdfs_file_writer_total_num"); |
51 | | bvar::Adder<uint64_t> hdfs_bytes_written_total("hdfs_file_writer_bytes_written"); |
52 | | bvar::Adder<uint64_t> hdfs_file_created_total("hdfs_file_writer_file_created"); |
53 | | bvar::Adder<uint64_t> inflight_hdfs_file_writer("inflight_hdfs_file_writer"); |
54 | | bvar::Adder<uint64_t> hdfs_file_writer_async_close_queuing("hdfs_file_writer_async_close_queuing"); |
55 | | bvar::Adder<uint64_t> hdfs_file_writer_async_close_processing( |
56 | | "hdfs_file_writer_async_close_processing"); |
57 | | |
58 | | static constexpr size_t MB = 1024 * 1024; |
59 | | #ifndef USE_LIBHDFS3 |
60 | | static constexpr size_t CLIENT_WRITE_PACKET_SIZE = 64 * 1024; // 64 KB |
61 | | #endif |
62 | | |
63 | 0 | inline std::default_random_engine make_random_engine() { |
64 | 0 | return std::default_random_engine( |
65 | 0 | static_cast<uint32_t>(std::chrono::steady_clock::now().time_since_epoch().count())); |
66 | 0 | } |
67 | | |
68 | | // In practice, we've found that if the import frequency to HDFS is too fast, |
69 | | // it can cause an OutOfMemoryError (OOM) in the JVM started by the JNI. |
70 | | // For this, we should have a method to monitor how much JVM memory is currently being used. |
71 | | // The HdfsWriteMemUsageRecorder class increments a recorded value during hdfsWrite when writing to HDFS. |
72 | | // The HDFS client will blockingly call hdfsHsync or hdfsCloseFile |
73 | | // which ensures that the client's buffer is sent to the data node and returned with an acknowledgment before returning to the caller. |
74 | | // HdfsWriteMemUsageRecorder would reduce the mem usage at that time. |
75 | | // If the current usage exceeds the maximum set by the user, the current mem acquire would return failure. |
76 | | // The caller could do sleep to wait for free memory. |
77 | | class HdfsWriteMemUsageRecorder { |
78 | | public: |
79 | 9 | HdfsWriteMemUsageRecorder() = default; |
80 | 0 | ~HdfsWriteMemUsageRecorder() = default; |
81 | 0 | size_t max_usage() const { |
82 | 0 | return static_cast<size_t>(static_cast<double>(max_jvm_heap_size()) * |
83 | 0 | config::max_hdfs_wirter_jni_heap_usage_ratio); |
84 | 0 | } |
85 | 0 | Status acquire_memory(size_t memory_size, int try_time) { |
86 | | #if defined(USE_LIBHDFS3) || defined(BE_TEST) |
87 | | return Status::OK(); |
88 | | #else |
89 | 0 | if (!config::enable_hdfs_mem_limiter) { |
90 | 0 | return Status::OK(); |
91 | 0 | } |
92 | 0 | auto unit = config::hdfs_jni_write_sleep_milliseconds; |
93 | 0 | std::default_random_engine rng = make_random_engine(); |
94 | 0 | std::uniform_int_distribution<int64_t> u(unit, 2 * unit); |
95 | 0 | std::uniform_int_distribution<int64_t> u2(2 * unit, 4 * unit); |
96 | 0 | auto duration_ms = |
97 | 0 | try_time < (config::hdfs_jni_write_max_retry_time / 2) ? u(rng) : u2(rng); |
98 | 0 | std::unique_lock lck {cur_memory_latch}; |
99 | 0 | cv.wait_for(lck, std::chrono::milliseconds(duration_ms), |
100 | 0 | [&]() { return cur_memory_comsuption + memory_size <= max_usage(); }); |
101 | 0 | if (cur_memory_comsuption + memory_size > max_usage()) { |
102 | 0 | lck.unlock(); |
103 | 0 | return Status::InternalError<false>( |
104 | 0 | "Run out of Jni jvm heap space, current limit size is {}, max heap size is {}, " |
105 | 0 | "ratio is {}", |
106 | 0 | max_usage(), max_jvm_heap_size(), config::max_hdfs_wirter_jni_heap_usage_ratio); |
107 | 0 | } |
108 | 0 | cur_memory_comsuption += memory_size; |
109 | 0 | return Status::OK(); |
110 | 0 | #endif |
111 | 0 | } |
112 | | |
113 | 0 | void release_memory(size_t memory_size) { |
114 | | #if defined(USE_LIBHDFS3) || defined(BE_TEST) |
115 | | #else |
116 | 0 | if (!config::enable_hdfs_mem_limiter) { |
117 | 0 | return; |
118 | 0 | } |
119 | 0 | std::unique_lock lck {cur_memory_latch}; |
120 | 0 | size_t origin_size = cur_memory_comsuption; |
121 | 0 | cur_memory_comsuption -= memory_size; |
122 | 0 | if (cur_memory_comsuption < max_usage() && origin_size > max_usage()) { |
123 | 0 | cv.notify_all(); |
124 | 0 | } |
125 | 0 | #endif |
126 | 0 | } |
127 | | |
128 | | private: |
129 | | // clang-format off |
130 | 0 | size_t max_jvm_heap_size() const { |
131 | 0 | return Jni::Util::get_max_jni_heap_memory_size(); |
132 | 0 | } |
133 | | // clang-format on |
134 | | [[maybe_unused]] std::size_t cur_memory_comsuption {0}; |
135 | | std::mutex cur_memory_latch; |
136 | | std::condition_variable cv; |
137 | | }; |
138 | | |
139 | | static HdfsWriteMemUsageRecorder g_hdfs_write_rate_limiter; |
140 | | |
141 | | HdfsFileWriter::HdfsFileWriter(Path path, std::shared_ptr<HdfsHandler> handler, hdfsFile hdfs_file, |
142 | | std::string fs_name, const FileWriterOptions* opts) |
143 | 1 | : _path(std::move(path)), |
144 | 1 | _hdfs_handler(std::move(handler)), |
145 | 1 | _hdfs_file(hdfs_file), |
146 | 1 | _fs_name(std::move(fs_name)), |
147 | 1 | _sync_file_data(opts ? opts->sync_file_data : true), |
148 | 1 | _batch_buffer(MB * config::hdfs_write_batch_buffer_size_mb) { |
149 | 1 | init_cache_builder(opts, _path); |
150 | 1 | hdfs_file_writer_total << 1; |
151 | | |
152 | 1 | TEST_SYNC_POINT("HdfsFileWriter"); |
153 | 1 | } |
154 | | |
155 | 1 | HdfsFileWriter::~HdfsFileWriter() { |
156 | 1 | if (_async_close_pack != nullptr) { |
157 | | // For thread safety |
158 | 0 | std::ignore = _async_close_pack->future.get(); |
159 | 0 | _async_close_pack = nullptr; |
160 | 0 | } |
161 | 1 | if (_hdfs_file) { |
162 | 0 | SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_close_latency); |
163 | 0 | hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file); |
164 | 0 | inflight_hdfs_file_writer << -1; |
165 | 0 | _flush_and_reset_approximate_jni_buffer_size(); |
166 | 0 | } |
167 | 1 | } |
168 | | |
169 | 2 | void HdfsFileWriter::_flush_and_reset_approximate_jni_buffer_size() { |
170 | 2 | g_hdfs_write_rate_limiter.release_memory(_approximate_jni_buffer_size); |
171 | 2 | _approximate_jni_buffer_size = 0; |
172 | 2 | } |
173 | | |
174 | 5 | Status HdfsFileWriter::_acquire_jni_memory(size_t size) { |
175 | | #ifdef USE_LIBHDFS3 |
176 | | return Status::OK(); |
177 | | #else |
178 | 5 | size_t actual_size = std::max(CLIENT_WRITE_PACKET_SIZE, size); |
179 | 5 | int try_time = 0; |
180 | 5 | if (auto st = g_hdfs_write_rate_limiter.acquire_memory(actual_size, try_time); !st.ok()) { |
181 | 0 | if (_approximate_jni_buffer_size > 0) { |
182 | 0 | int ret; |
183 | 0 | { |
184 | 0 | SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_hflush_latency); |
185 | 0 | ret = SYNC_POINT_HOOK_RETURN_VALUE(hdfsHFlush(_hdfs_handler->hdfs_fs, _hdfs_file), |
186 | 0 | "HdfsFileWriter::close::hdfsHFlush"); |
187 | 0 | } |
188 | 0 | _flush_and_reset_approximate_jni_buffer_size(); |
189 | 0 | if (ret != 0) { |
190 | 0 | return Status::InternalError( |
191 | 0 | "Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}, " |
192 | 0 | "file_size={}", |
193 | 0 | BackendOptions::get_localhost(), _fs_name, _path.native(), hdfs_error(), |
194 | 0 | bytes_appended()); |
195 | 0 | } |
196 | 0 | } |
197 | | // Other hdfs writers might have occupied too much memory, we need to sleep for a while to wait for them |
198 | | // releasing their memory |
199 | 0 | for (; try_time < config::hdfs_jni_write_max_retry_time; try_time++) { |
200 | 0 | if (g_hdfs_write_rate_limiter.acquire_memory(actual_size, try_time).ok()) { |
201 | 0 | _approximate_jni_buffer_size += actual_size; |
202 | 0 | return Status::OK(); |
203 | 0 | } |
204 | 0 | } |
205 | 0 | return st; |
206 | 0 | } |
207 | | |
208 | 5 | _approximate_jni_buffer_size += actual_size; |
209 | 5 | return Status::OK(); |
210 | 5 | #endif |
211 | 5 | } |
212 | | |
213 | 1 | Status HdfsFileWriter::close(bool non_block) { |
214 | 1 | if (state() == State::CLOSED) { |
215 | 0 | return Status::InternalError("HdfsFileWriter already closed, file path {}, fs name {}", |
216 | 0 | _path.native(), _fs_name); |
217 | 0 | } |
218 | 1 | if (state() == State::ASYNC_CLOSING) { |
219 | 0 | if (non_block) { |
220 | 0 | return Status::InternalError("Don't submit async close multi times"); |
221 | 0 | } |
222 | 0 | CHECK(_async_close_pack != nullptr); |
223 | 0 | _st = _async_close_pack->future.get(); |
224 | 0 | _async_close_pack = nullptr; |
225 | | // We should wait for all the pre async task to be finished |
226 | 0 | _state = State::CLOSED; |
227 | | // The next time we call close() with no matter non_block true or false, it would always return the |
228 | | // '_st' value because this writer is already closed. |
229 | 0 | return _st; |
230 | 0 | } |
231 | 1 | if (non_block) { |
232 | 0 | _state = State::ASYNC_CLOSING; |
233 | 0 | _async_close_pack = std::make_unique<AsyncCloseStatusPack>(); |
234 | 0 | _async_close_pack->future = _async_close_pack->promise.get_future(); |
235 | 0 | hdfs_file_writer_async_close_queuing << 1; |
236 | 0 | return ExecEnv::GetInstance()->non_block_close_thread_pool()->submit_func([&]() { |
237 | 0 | hdfs_file_writer_async_close_queuing << -1; |
238 | 0 | hdfs_file_writer_async_close_processing << 1; |
239 | 0 | _async_close_pack->promise.set_value(_close_impl()); |
240 | 0 | hdfs_file_writer_async_close_processing << -1; |
241 | 0 | }); |
242 | 0 | } |
243 | 1 | _st = _close_impl(); |
244 | 1 | _state = State::CLOSED; |
245 | 1 | return _st; |
246 | 1 | } |
247 | | |
248 | 0 | Status HdfsFileWriter::_close_impl() { |
249 | 0 | if (_batch_buffer.size() != 0) { |
250 | 0 | if (_st = _flush_buffer(); !_st.ok()) { |
251 | 0 | return _st; |
252 | 0 | } |
253 | 0 | } |
254 | 0 | int ret; |
255 | 0 | if (_sync_file_data) { |
256 | 0 | { |
257 | 0 | SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_hsync_latency); |
258 | | #ifdef USE_LIBHDFS3 |
259 | | ret = SYNC_POINT_HOOK_RETURN_VALUE(hdfsSync(_hdfs_handler->hdfs_fs, _hdfs_file), |
260 | | "HdfsFileWriter::close::hdfsHSync"); |
261 | | #else |
262 | 0 | ret = SYNC_POINT_HOOK_RETURN_VALUE(hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file), |
263 | 0 | "HdfsFileWriter::close::hdfsHSync"); |
264 | 0 | _flush_and_reset_approximate_jni_buffer_size(); |
265 | 0 | #endif |
266 | 0 | } |
267 | 0 | TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::hdfsSync", |
268 | 0 | Status::InternalError("failed to sync hdfs file")); |
269 | |
|
270 | 0 | if (ret != 0) { |
271 | 0 | _st = Status::InternalError( |
272 | 0 | "failed to sync hdfs file. fs_name={} path={} : {}, file_size={}", _fs_name, |
273 | 0 | _path.native(), hdfs_error(), bytes_appended()); |
274 | 0 | return _st; |
275 | 0 | } |
276 | 0 | } |
277 | | |
278 | 0 | { |
279 | 0 | SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_close_latency); |
280 | | // The underlying implementation will invoke `hdfsHFlush` to flush buffered data and wait for |
281 | | // the HDFS response, but won't guarantee the synchronization of data to HDFS. |
282 | 0 | ret = SYNC_POINT_HOOK_RETURN_VALUE(hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file), |
283 | 0 | "HdfsFileWriter::close::hdfsCloseFile"); |
284 | 0 | inflight_hdfs_file_writer << -1; |
285 | 0 | _flush_and_reset_approximate_jni_buffer_size(); |
286 | 0 | } |
287 | 0 | _hdfs_file = nullptr; |
288 | 0 | TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::hdfsCloseFile", |
289 | 0 | Status::InternalError("failed to close hdfs file")); |
290 | 0 | if (ret != 0) { |
291 | 0 | _st = Status::InternalError( |
292 | 0 | "Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}, file_size={}", |
293 | 0 | BackendOptions::get_localhost(), _fs_name, _path.native(), hdfs_error(), |
294 | 0 | bytes_appended()); |
295 | 0 | return _st; |
296 | 0 | } |
297 | 0 | hdfs_file_created_total << 1; |
298 | 0 | return Status::OK(); |
299 | 0 | } |
300 | | |
301 | 1 | HdfsFileWriter::BatchBuffer::BatchBuffer(size_t capacity) { |
302 | 1 | _batch_buffer.reserve(capacity); |
303 | 1 | } |
304 | | |
305 | 2.41k | bool HdfsFileWriter::BatchBuffer::full() const { |
306 | 2.41k | return size() == capacity(); |
307 | 2.41k | } |
308 | | |
309 | 0 | const char* HdfsFileWriter::BatchBuffer::data() const { |
310 | 0 | return _batch_buffer.data(); |
311 | 0 | } |
312 | | |
313 | 3.61k | size_t HdfsFileWriter::BatchBuffer::capacity() const { |
314 | 3.61k | return _batch_buffer.capacity(); |
315 | 3.61k | } |
316 | | |
317 | 3.61k | size_t HdfsFileWriter::BatchBuffer::size() const { |
318 | 3.61k | return _batch_buffer.size(); |
319 | 3.61k | } |
320 | | |
321 | 5 | void HdfsFileWriter::BatchBuffer::clear() { |
322 | 5 | _batch_buffer.clear(); |
323 | 5 | } |
324 | | |
325 | | // TODO(ByteYue): Refactor Upload Buffer to reduce this duplicate code |
326 | 0 | void HdfsFileWriter::_write_into_local_file_cache() { |
327 | 0 | int64_t tablet_id = get_tablet_id(_path.native()).value_or(0); |
328 | 0 | auto holder = _cache_builder->allocate_cache_holder(_bytes_appended - _batch_buffer.size(), |
329 | 0 | _batch_buffer.capacity(), tablet_id); |
330 | 0 | size_t pos = 0; |
331 | 0 | size_t data_remain_size = _batch_buffer.size(); |
332 | 0 | for (auto& block : holder->file_blocks) { |
333 | 0 | if (data_remain_size == 0) { |
334 | 0 | break; |
335 | 0 | } |
336 | 0 | size_t block_size = block->range().size(); |
337 | 0 | size_t append_size = std::min(data_remain_size, block_size); |
338 | 0 | if (block->state() == FileBlock::State::EMPTY) { |
339 | 0 | block->get_or_set_downloader(); |
340 | 0 | if (block->is_downloader()) { |
341 | 0 | Slice s(_batch_buffer.data() + pos, append_size); |
342 | 0 | Status st = block->append(s); |
343 | 0 | if (st.ok()) { |
344 | 0 | st = block->finalize(); |
345 | 0 | } |
346 | 0 | if (!st.ok()) { |
347 | 0 | LOG_WARNING("failed to append data to file cache").error(st); |
348 | 0 | } |
349 | 0 | } |
350 | 0 | } |
351 | 0 | data_remain_size -= append_size; |
352 | 0 | pos += append_size; |
353 | 0 | } |
354 | 0 | } |
355 | | |
356 | 0 | Status HdfsFileWriter::append_hdfs_file(std::string_view content) { |
357 | 0 | RETURN_IF_ERROR(_acquire_jni_memory(content.size())); |
358 | 0 | while (!content.empty()) { |
359 | 0 | int64_t written_bytes; |
360 | 0 | { |
361 | 0 | TEST_INJECTION_POINT_CALLBACK("HdfsFileWriter::append_hdfs_file_delay"); |
362 | 0 | SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_write_latency); |
363 | 0 | int64_t max_to_write = content.size(); |
364 | 0 | tSize to_write = static_cast<tSize>(std::min( |
365 | 0 | max_to_write, static_cast<int64_t>(std::numeric_limits<tSize>::max()))); |
366 | 0 | written_bytes = SYNC_POINT_HOOK_RETURN_VALUE( |
367 | 0 | hdfsWrite(_hdfs_handler->hdfs_fs, _hdfs_file, content.data(), to_write), |
368 | 0 | "HdfsFileWriter::append_hdfs_file::hdfsWrite", content); |
369 | 0 | { |
370 | 0 | TEST_INJECTION_POINT_RETURN_WITH_VALUE( |
371 | 0 | "HdfsFileWriter::append_hdfs_file_error", |
372 | 0 | Status::InternalError( |
373 | 0 | "write hdfs failed. fs_name: {}, path: {}, error: inject error", |
374 | 0 | _fs_name, _path.native())); |
375 | 0 | } |
376 | 0 | } |
377 | 0 | if (written_bytes < 0) { |
378 | 0 | return Status::InternalError( |
379 | 0 | "write hdfs failed. fs_name: {}, path: {}, error: {}, file_size={}", _fs_name, |
380 | 0 | _path.native(), hdfs_error(), bytes_appended()); |
381 | 0 | } |
382 | 0 | hdfs_bytes_written_total << written_bytes; |
383 | 0 | content.remove_prefix(written_bytes); |
384 | 0 | } |
385 | 0 | return Status::OK(); |
386 | 0 | } |
387 | | |
388 | 5 | Status HdfsFileWriter::_flush_buffer() { |
389 | 5 | RETURN_IF_ERROR(append_hdfs_file(_batch_buffer.content())); |
390 | 5 | if (_cache_builder != nullptr) { |
391 | 0 | _write_into_local_file_cache(); |
392 | 0 | } |
393 | 5 | _batch_buffer.clear(); |
394 | 5 | return Status::OK(); |
395 | 5 | } |
396 | | |
397 | 1.20k | size_t HdfsFileWriter::BatchBuffer::append(std::string_view content) { |
398 | 1.20k | size_t append_size = std::min(capacity() - size(), content.size()); |
399 | 1.20k | _batch_buffer.append(content.data(), append_size); |
400 | 1.20k | return append_size; |
401 | 1.20k | } |
402 | | |
403 | 5 | std::string_view HdfsFileWriter::BatchBuffer::content() const { |
404 | 5 | return _batch_buffer; |
405 | 5 | } |
406 | | |
407 | 1.20k | Status HdfsFileWriter::_append(std::string_view content) { |
408 | 2.40k | while (!content.empty()) { |
409 | 1.20k | if (_batch_buffer.full()) { |
410 | 0 | auto error_msg = fmt::format("invalid batch buffer status, capacity {}, size {}", |
411 | 0 | _batch_buffer.capacity(), _batch_buffer.size()); |
412 | 0 | return Status::InternalError(error_msg); |
413 | 0 | } |
414 | 1.20k | size_t append_size = _batch_buffer.append(content); |
415 | 1.20k | content.remove_prefix(append_size); |
416 | 1.20k | _bytes_appended += append_size; |
417 | 1.20k | if (_batch_buffer.full()) { |
418 | 4 | RETURN_IF_ERROR(_flush_buffer()); |
419 | 4 | } |
420 | 1.20k | } |
421 | 1.20k | return Status::OK(); |
422 | 1.20k | } |
423 | | |
424 | 1.20k | Status HdfsFileWriter::appendv(const Slice* data, size_t data_cnt) { |
425 | 1.20k | if (_state != State::OPENED) [[unlikely]] { |
426 | 0 | return Status::InternalError("append to closed file: {}", _path.native()); |
427 | 0 | } |
428 | | |
429 | 2.40k | for (size_t i = 0; i < data_cnt; i++) { |
430 | 1.20k | RETURN_IF_ERROR(_append({data[i].get_data(), data[i].get_size()})); |
431 | 1.20k | } |
432 | 1.20k | return Status::OK(); |
433 | 1.20k | } |
434 | | |
435 | | Result<FileWriterPtr> HdfsFileWriter::create(Path full_path, std::shared_ptr<HdfsHandler> handler, |
436 | | const std::string& fs_name, |
437 | 0 | const FileWriterOptions* opts) { |
438 | 0 | auto path = convert_path(full_path, fs_name); |
439 | | #ifdef USE_LIBHDFS3 |
440 | | std::string hdfs_dir = path.parent_path().string(); |
441 | | int exists = hdfsExists(handler->hdfs_fs, hdfs_dir.c_str()); |
442 | | if (exists != 0) { |
443 | | VLOG_NOTICE << "hdfs dir doesn't exist, create it: " << hdfs_dir; |
444 | | int ret = hdfsCreateDirectory(handler->hdfs_fs, hdfs_dir.c_str()); |
445 | | if (ret != 0) { |
446 | | std::stringstream ss; |
447 | | ss << "create dir failed. " |
448 | | << " fs_name: " << fs_name << " path: " << hdfs_dir << ", err: " << hdfs_error(); |
449 | | LOG(WARNING) << ss.str(); |
450 | | return ResultError(Status::InternalError(ss.str())); |
451 | | } |
452 | | } |
453 | | #endif |
454 | | // open file |
455 | |
|
456 | 0 | hdfsFile hdfs_file = nullptr; |
457 | 0 | { |
458 | 0 | SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_open_latency); |
459 | 0 | hdfs_file = hdfsOpenFile(handler->hdfs_fs, path.c_str(), O_WRONLY, 0, 0, 0); |
460 | 0 | } |
461 | 0 | if (hdfs_file == nullptr) { |
462 | 0 | std::stringstream ss; |
463 | 0 | ss << "open file failed. " |
464 | 0 | << " fs_name:" << fs_name << " path:" << path << ", err: " << hdfs_error(); |
465 | 0 | LOG(WARNING) << ss.str(); |
466 | 0 | return ResultError(Status::InternalError(ss.str())); |
467 | 0 | } |
468 | 0 | VLOG_NOTICE << "open file. fs_name:" << fs_name << ", path:" << path; |
469 | 0 | inflight_hdfs_file_writer << 1; |
470 | 0 | return std::make_unique<HdfsFileWriter>(std::move(path), handler, hdfs_file, fs_name, opts); |
471 | 0 | } |
472 | | #include "common/compile_check_end.h" |
473 | | |
474 | | } // namespace doris::io |