be/src/io/fs/hdfs_file_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/fs/hdfs_file_writer.h" |
19 | | |
20 | | #include <fcntl.h> |
21 | | #include <fmt/core.h> |
22 | | |
23 | | #include <chrono> |
24 | | #include <filesystem> |
25 | | #include <ostream> |
26 | | #include <random> |
27 | | #include <string> |
28 | | #include <thread> |
29 | | #include <utility> |
30 | | |
31 | | #include "common/config.h" |
32 | | #include "common/logging.h" |
33 | | #include "common/status.h" |
34 | | #include "cpp/sync_point.h" |
35 | | #include "io/cache/block_file_cache.h" |
36 | | #include "io/cache/block_file_cache_factory.h" |
37 | | #include "io/cache/file_cache_common.h" |
38 | | #include "io/fs/err_utils.h" |
39 | | #include "io/fs/file_writer.h" |
40 | | #include "io/fs/hdfs_file_system.h" |
41 | | #include "io/hdfs_util.h" |
42 | | #include "runtime/exec_env.h" |
43 | | #include "service/backend_options.h" |
44 | | #include "util/bvar_helper.h" |
45 | | #include "util/jni-util.h" |
46 | | |
47 | | namespace doris::io { |
48 | | |
49 | | bvar::Adder<uint64_t> hdfs_file_writer_total("hdfs_file_writer_total_num"); |
50 | | bvar::Adder<uint64_t> hdfs_bytes_written_total("hdfs_file_writer_bytes_written"); |
51 | | bvar::Adder<uint64_t> hdfs_file_created_total("hdfs_file_writer_file_created"); |
52 | | bvar::Adder<uint64_t> inflight_hdfs_file_writer("inflight_hdfs_file_writer"); |
53 | | bvar::Adder<uint64_t> hdfs_file_writer_async_close_queuing("hdfs_file_writer_async_close_queuing"); |
54 | | bvar::Adder<uint64_t> hdfs_file_writer_async_close_processing( |
55 | | "hdfs_file_writer_async_close_processing"); |
56 | | |
57 | | static constexpr size_t MB = 1024 * 1024; |
58 | | #ifndef USE_LIBHDFS3 |
59 | | static constexpr size_t CLIENT_WRITE_PACKET_SIZE = 64 * 1024; // 64 KB |
60 | | #endif |
61 | | |
62 | 2.74k | inline std::default_random_engine make_random_engine() { |
63 | 2.74k | return std::default_random_engine( |
64 | 2.74k | static_cast<uint32_t>(std::chrono::steady_clock::now().time_since_epoch().count())); |
65 | 2.74k | } |
66 | | |
67 | | // In practice, we've found that if the import frequency to HDFS is too fast, |
68 | | // it can cause an OutOfMemoryError (OOM) in the JVM started by the JNI. |
69 | | // For this, we should have a method to monitor how much JVM memory is currently being used. |
70 | | // The HdfsWriteMemUsageRecorder class increments a recorded value during hdfsWrite when writing to HDFS. |
71 | | // The HDFS client will blockingly call hdfsHsync or hdfsCloseFile |
72 | | // which ensures that the client's buffer is sent to the data node and returned with an acknowledgment before returning to the caller. |
73 | | // HdfsWriteMemUsageRecorder would reduce the mem usage at that time. |
74 | | // If the current usage exceeds the maximum set by the user, the current mem acquire would return failure. |
75 | | // The caller could do sleep to wait for free memory. |
76 | | class HdfsWriteMemUsageRecorder { |
77 | | public: |
78 | 9 | HdfsWriteMemUsageRecorder() = default; |
79 | 0 | ~HdfsWriteMemUsageRecorder() = default; |
80 | 16.1k | size_t max_usage() const { |
81 | 16.1k | return static_cast<size_t>(static_cast<double>(max_jvm_heap_size()) * |
82 | 16.1k | config::max_hdfs_wirter_jni_heap_usage_ratio); |
83 | 16.1k | } |
84 | 2.74k | Status acquire_memory(size_t memory_size, int try_time) { |
85 | | #if defined(USE_LIBHDFS3) || defined(BE_TEST) |
86 | | return Status::OK(); |
87 | | #else |
88 | 2.74k | if (!config::enable_hdfs_mem_limiter) { |
89 | 0 | return Status::OK(); |
90 | 0 | } |
91 | 2.74k | auto unit = config::hdfs_jni_write_sleep_milliseconds; |
92 | 2.74k | std::default_random_engine rng = make_random_engine(); |
93 | 2.74k | std::uniform_int_distribution<int64_t> u(unit, 2 * unit); |
94 | 2.74k | std::uniform_int_distribution<int64_t> u2(2 * unit, 4 * unit); |
95 | 2.74k | auto duration_ms = |
96 | 2.74k | try_time < (config::hdfs_jni_write_max_retry_time / 2) ? u(rng) : u2(rng); |
97 | 2.74k | std::unique_lock lck {cur_memory_latch}; |
98 | 2.74k | cv.wait_for(lck, std::chrono::milliseconds(duration_ms), |
99 | 2.74k | [&]() { return cur_memory_comsuption + memory_size <= max_usage(); }); |
100 | 2.74k | if (cur_memory_comsuption + memory_size > max_usage()) { |
101 | 0 | lck.unlock(); |
102 | 0 | return Status::InternalError<false>( |
103 | 0 | "Run out of Jni jvm heap space, current limit size is {}, max heap size is {}, " |
104 | 0 | "ratio is {}", |
105 | 0 | max_usage(), max_jvm_heap_size(), config::max_hdfs_wirter_jni_heap_usage_ratio); |
106 | 0 | } |
107 | 2.74k | cur_memory_comsuption += memory_size; |
108 | 2.74k | return Status::OK(); |
109 | 2.74k | #endif |
110 | 2.74k | } |
111 | | |
112 | 5.33k | void release_memory(size_t memory_size) { |
113 | | #if defined(USE_LIBHDFS3) || defined(BE_TEST) |
114 | | #else |
115 | 5.33k | if (!config::enable_hdfs_mem_limiter) { |
116 | 0 | return; |
117 | 0 | } |
118 | 5.33k | std::unique_lock lck {cur_memory_latch}; |
119 | 5.33k | size_t origin_size = cur_memory_comsuption; |
120 | 5.33k | cur_memory_comsuption -= memory_size; |
121 | 5.33k | if (cur_memory_comsuption < max_usage() && origin_size > max_usage()) { |
122 | 0 | cv.notify_all(); |
123 | 0 | } |
124 | 5.33k | #endif |
125 | 5.33k | } |
126 | | |
127 | | private: |
128 | | // clang-format off |
129 | 16.1k | size_t max_jvm_heap_size() const { |
130 | 16.1k | return Jni::Util::get_max_jni_heap_memory_size(); |
131 | 16.1k | } |
132 | | // clang-format on |
133 | | [[maybe_unused]] std::size_t cur_memory_comsuption {0}; |
134 | | std::mutex cur_memory_latch; |
135 | | std::condition_variable cv; |
136 | | }; |
137 | | |
138 | | static HdfsWriteMemUsageRecorder g_hdfs_write_rate_limiter; |
139 | | |
140 | | HdfsFileWriter::HdfsFileWriter(Path path, std::shared_ptr<HdfsHandler> handler, hdfsFile hdfs_file, |
141 | | std::string fs_name, const FileWriterOptions* opts) |
142 | 2.74k | : _path(std::move(path)), |
143 | 2.74k | _hdfs_handler(std::move(handler)), |
144 | 2.74k | _hdfs_file(hdfs_file), |
145 | 2.74k | _fs_name(std::move(fs_name)), |
146 | 2.74k | _sync_file_data(opts ? opts->sync_file_data : true), |
147 | 2.74k | _batch_buffer(MB * config::hdfs_write_batch_buffer_size_mb) { |
148 | 2.74k | init_cache_builder(opts, _path); |
149 | 2.74k | hdfs_file_writer_total << 1; |
150 | | |
151 | 2.74k | TEST_SYNC_POINT("HdfsFileWriter"); |
152 | 2.74k | } |
153 | | |
154 | 2.74k | HdfsFileWriter::~HdfsFileWriter() { |
155 | 2.74k | if (_async_close_pack != nullptr) { |
156 | | // For thread safety |
157 | 0 | std::ignore = _async_close_pack->future.get(); |
158 | 0 | _async_close_pack = nullptr; |
159 | 0 | } |
160 | 2.74k | if (_hdfs_file) { |
161 | 0 | SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_close_latency); |
162 | 0 | hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file); |
163 | 0 | inflight_hdfs_file_writer << -1; |
164 | 0 | _flush_and_reset_approximate_jni_buffer_size(); |
165 | 0 | } |
166 | 2.74k | } |
167 | | |
168 | 5.34k | void HdfsFileWriter::_flush_and_reset_approximate_jni_buffer_size() { |
169 | 5.34k | g_hdfs_write_rate_limiter.release_memory(_approximate_jni_buffer_size); |
170 | 5.34k | _approximate_jni_buffer_size = 0; |
171 | 5.34k | } |
172 | | |
173 | 2.75k | Status HdfsFileWriter::_acquire_jni_memory(size_t size) { |
174 | | #ifdef USE_LIBHDFS3 |
175 | | return Status::OK(); |
176 | | #else |
177 | 2.75k | size_t actual_size = std::max(CLIENT_WRITE_PACKET_SIZE, size); |
178 | 2.75k | int try_time = 0; |
179 | 2.75k | if (auto st = g_hdfs_write_rate_limiter.acquire_memory(actual_size, try_time); !st.ok()) { |
180 | 0 | if (_approximate_jni_buffer_size > 0) { |
181 | 0 | int ret; |
182 | 0 | { |
183 | 0 | SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_hflush_latency); |
184 | 0 | ret = SYNC_POINT_HOOK_RETURN_VALUE(hdfsHFlush(_hdfs_handler->hdfs_fs, _hdfs_file), |
185 | 0 | "HdfsFileWriter::close::hdfsHFlush"); |
186 | 0 | } |
187 | 0 | _flush_and_reset_approximate_jni_buffer_size(); |
188 | 0 | if (ret != 0) { |
189 | 0 | return Status::InternalError( |
190 | 0 | "Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}, " |
191 | 0 | "file_size={}", |
192 | 0 | BackendOptions::get_localhost(), _fs_name, _path.native(), hdfs_error(), |
193 | 0 | bytes_appended()); |
194 | 0 | } |
195 | 0 | } |
196 | | // Other hdfs writers might have occupied too much memory, we need to sleep for a while to wait for them |
197 | | // releasing their memory |
198 | 0 | for (; try_time < config::hdfs_jni_write_max_retry_time; try_time++) { |
199 | 0 | if (g_hdfs_write_rate_limiter.acquire_memory(actual_size, try_time).ok()) { |
200 | 0 | _approximate_jni_buffer_size += actual_size; |
201 | 0 | return Status::OK(); |
202 | 0 | } |
203 | 0 | } |
204 | 0 | return st; |
205 | 0 | } |
206 | | |
207 | 2.75k | _approximate_jni_buffer_size += actual_size; |
208 | 2.75k | return Status::OK(); |
209 | 2.75k | #endif |
210 | 2.75k | } |
211 | | |
212 | 2.74k | Status HdfsFileWriter::close(bool non_block) { |
213 | 2.74k | if (state() == State::CLOSED) { |
214 | 0 | return Status::InternalError("HdfsFileWriter already closed, file path {}, fs name {}", |
215 | 0 | _path.native(), _fs_name); |
216 | 0 | } |
217 | 2.74k | if (state() == State::ASYNC_CLOSING) { |
218 | 0 | if (non_block) { |
219 | 0 | return Status::InternalError("Don't submit async close multi times"); |
220 | 0 | } |
221 | 0 | CHECK(_async_close_pack != nullptr); |
222 | 0 | _st = _async_close_pack->future.get(); |
223 | 0 | _async_close_pack = nullptr; |
224 | | // We should wait for all the pre async task to be finished |
225 | 0 | _state = State::CLOSED; |
226 | | // The next time we call close() with no matter non_block true or false, it would always return the |
227 | | // '_st' value because this writer is already closed. |
228 | 0 | return _st; |
229 | 0 | } |
230 | 2.74k | if (non_block) { |
231 | 0 | _state = State::ASYNC_CLOSING; |
232 | 0 | _async_close_pack = std::make_unique<AsyncCloseStatusPack>(); |
233 | 0 | _async_close_pack->future = _async_close_pack->promise.get_future(); |
234 | 0 | hdfs_file_writer_async_close_queuing << 1; |
235 | 0 | return ExecEnv::GetInstance()->non_block_close_thread_pool()->submit_func([&]() { |
236 | 0 | hdfs_file_writer_async_close_queuing << -1; |
237 | 0 | hdfs_file_writer_async_close_processing << 1; |
238 | 0 | _async_close_pack->promise.set_value(_close_impl()); |
239 | 0 | hdfs_file_writer_async_close_processing << -1; |
240 | 0 | }); |
241 | 0 | } |
242 | 2.74k | _st = _close_impl(); |
243 | 2.74k | _state = State::CLOSED; |
244 | 2.74k | return _st; |
245 | 2.74k | } |
246 | | |
247 | 2.74k | Status HdfsFileWriter::_close_impl() { |
248 | 2.74k | if (_batch_buffer.size() != 0) { |
249 | 2.74k | if (_st = _flush_buffer(); !_st.ok()) { |
250 | 0 | return _st; |
251 | 0 | } |
252 | 2.74k | } |
253 | 2.74k | int ret; |
254 | 2.74k | if (_sync_file_data) { |
255 | 2.59k | { |
256 | 2.59k | SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_hsync_latency); |
257 | | #ifdef USE_LIBHDFS3 |
258 | | ret = SYNC_POINT_HOOK_RETURN_VALUE(hdfsSync(_hdfs_handler->hdfs_fs, _hdfs_file), |
259 | | "HdfsFileWriter::close::hdfsHSync"); |
260 | | #else |
261 | 2.59k | ret = SYNC_POINT_HOOK_RETURN_VALUE(hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file), |
262 | 2.59k | "HdfsFileWriter::close::hdfsHSync"); |
263 | 2.59k | _flush_and_reset_approximate_jni_buffer_size(); |
264 | 2.59k | #endif |
265 | 2.59k | } |
266 | 2.59k | TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::hdfsSync", |
267 | 2.59k | Status::InternalError("failed to sync hdfs file")); |
268 | | |
269 | 2.59k | if (ret != 0) { |
270 | 0 | _st = Status::InternalError( |
271 | 0 | "failed to sync hdfs file. fs_name={} path={} : {}, file_size={}", _fs_name, |
272 | 0 | _path.native(), hdfs_error(), bytes_appended()); |
273 | 0 | return _st; |
274 | 0 | } |
275 | 2.59k | } |
276 | | |
277 | 2.74k | { |
278 | 2.74k | SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_close_latency); |
279 | | // The underlying implementation will invoke `hdfsHFlush` to flush buffered data and wait for |
280 | | // the HDFS response, but won't guarantee the synchronization of data to HDFS. |
281 | 2.74k | ret = SYNC_POINT_HOOK_RETURN_VALUE(hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file), |
282 | 2.74k | "HdfsFileWriter::close::hdfsCloseFile"); |
283 | 2.74k | inflight_hdfs_file_writer << -1; |
284 | 2.74k | _flush_and_reset_approximate_jni_buffer_size(); |
285 | 2.74k | } |
286 | 2.74k | _hdfs_file = nullptr; |
287 | 2.74k | TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::hdfsCloseFile", |
288 | 2.74k | Status::InternalError("failed to close hdfs file")); |
289 | 2.74k | if (ret != 0) { |
290 | 0 | _st = Status::InternalError( |
291 | 0 | "Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}, file_size={}", |
292 | 0 | BackendOptions::get_localhost(), _fs_name, _path.native(), hdfs_error(), |
293 | 0 | bytes_appended()); |
294 | 0 | return _st; |
295 | 0 | } |
296 | 2.74k | hdfs_file_created_total << 1; |
297 | 2.74k | return Status::OK(); |
298 | 2.74k | } |
299 | | |
300 | 2.74k | HdfsFileWriter::BatchBuffer::BatchBuffer(size_t capacity) { |
301 | 2.74k | _batch_buffer.reserve(capacity); |
302 | 2.74k | } |
303 | | |
304 | 218k | bool HdfsFileWriter::BatchBuffer::full() const { |
305 | 218k | return size() == capacity(); |
306 | 218k | } |
307 | | |
308 | 0 | const char* HdfsFileWriter::BatchBuffer::data() const { |
309 | 0 | return _batch_buffer.data(); |
310 | 0 | } |
311 | | |
312 | 327k | size_t HdfsFileWriter::BatchBuffer::capacity() const { |
313 | 327k | return _batch_buffer.capacity(); |
314 | 327k | } |
315 | | |
316 | 330k | size_t HdfsFileWriter::BatchBuffer::size() const { |
317 | 330k | return _batch_buffer.size(); |
318 | 330k | } |
319 | | |
320 | 2.75k | void HdfsFileWriter::BatchBuffer::clear() { |
321 | 2.75k | _batch_buffer.clear(); |
322 | 2.75k | } |
323 | | |
324 | | // TODO(ByteYue): Refactor Upload Buffer to reduce this duplicate code |
325 | 0 | void HdfsFileWriter::_write_into_local_file_cache() { |
326 | 0 | int64_t tablet_id = get_tablet_id(_path.native()).value_or(0); |
327 | 0 | auto holder = _cache_builder->allocate_cache_holder(_bytes_appended - _batch_buffer.size(), |
328 | 0 | _batch_buffer.capacity(), tablet_id); |
329 | 0 | size_t pos = 0; |
330 | 0 | size_t data_remain_size = _batch_buffer.size(); |
331 | 0 | for (auto& block : holder->file_blocks) { |
332 | 0 | if (data_remain_size == 0) { |
333 | 0 | break; |
334 | 0 | } |
335 | 0 | size_t block_size = block->range().size(); |
336 | 0 | size_t append_size = std::min(data_remain_size, block_size); |
337 | 0 | if (block->state() == FileBlock::State::EMPTY) { |
338 | 0 | block->get_or_set_downloader(); |
339 | 0 | if (block->is_downloader()) { |
340 | 0 | Slice s(_batch_buffer.data() + pos, append_size); |
341 | 0 | Status st = block->append(s); |
342 | 0 | if (st.ok()) { |
343 | 0 | st = block->finalize(); |
344 | 0 | } |
345 | 0 | if (!st.ok()) { |
346 | 0 | LOG_WARNING("failed to append data to file cache").error(st); |
347 | 0 | } |
348 | 0 | } |
349 | 0 | } |
350 | 0 | data_remain_size -= append_size; |
351 | 0 | pos += append_size; |
352 | 0 | } |
353 | 0 | } |
354 | | |
355 | 2.74k | Status HdfsFileWriter::append_hdfs_file(std::string_view content) { |
356 | 2.74k | RETURN_IF_ERROR(_acquire_jni_memory(content.size())); |
357 | 5.49k | while (!content.empty()) { |
358 | 2.74k | int64_t written_bytes; |
359 | 2.74k | { |
360 | 2.74k | TEST_INJECTION_POINT_CALLBACK("HdfsFileWriter::append_hdfs_file_delay"); |
361 | 2.74k | SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_write_latency); |
362 | 2.74k | int64_t max_to_write = content.size(); |
363 | 2.74k | tSize to_write = static_cast<tSize>(std::min( |
364 | 2.74k | max_to_write, static_cast<int64_t>(std::numeric_limits<tSize>::max()))); |
365 | 2.74k | written_bytes = SYNC_POINT_HOOK_RETURN_VALUE( |
366 | 2.74k | hdfsWrite(_hdfs_handler->hdfs_fs, _hdfs_file, content.data(), to_write), |
367 | 2.74k | "HdfsFileWriter::append_hdfs_file::hdfsWrite", content); |
368 | 2.74k | { |
369 | 2.74k | TEST_INJECTION_POINT_RETURN_WITH_VALUE( |
370 | 2.74k | "HdfsFileWriter::append_hdfs_file_error", |
371 | 2.74k | Status::InternalError( |
372 | 2.74k | "write hdfs failed. fs_name: {}, path: {}, error: inject error", |
373 | 2.74k | _fs_name, _path.native())); |
374 | 2.74k | } |
375 | 2.74k | } |
376 | 2.74k | if (written_bytes < 0) { |
377 | 0 | return Status::InternalError( |
378 | 0 | "write hdfs failed. fs_name: {}, path: {}, error: {}, file_size={}", _fs_name, |
379 | 0 | _path.native(), hdfs_error(), bytes_appended()); |
380 | 0 | } |
381 | 2.74k | hdfs_bytes_written_total << written_bytes; |
382 | 2.74k | content.remove_prefix(written_bytes); |
383 | 2.74k | } |
384 | 2.74k | return Status::OK(); |
385 | 2.74k | } |
386 | | |
387 | 2.75k | Status HdfsFileWriter::_flush_buffer() { |
388 | 2.75k | RETURN_IF_ERROR(append_hdfs_file(_batch_buffer.content())); |
389 | 2.75k | if (_cache_builder != nullptr) { |
390 | 0 | _write_into_local_file_cache(); |
391 | 0 | } |
392 | 2.75k | _batch_buffer.clear(); |
393 | 2.75k | return Status::OK(); |
394 | 2.75k | } |
395 | | |
396 | 109k | size_t HdfsFileWriter::BatchBuffer::append(std::string_view content) { |
397 | 109k | size_t append_size = std::min(capacity() - size(), content.size()); |
398 | 109k | _batch_buffer.append(content.data(), append_size); |
399 | 109k | return append_size; |
400 | 109k | } |
401 | | |
402 | 2.75k | std::string_view HdfsFileWriter::BatchBuffer::content() const { |
403 | 2.75k | return _batch_buffer; |
404 | 2.75k | } |
405 | | |
406 | 109k | Status HdfsFileWriter::_append(std::string_view content) { |
407 | 218k | while (!content.empty()) { |
408 | 109k | if (_batch_buffer.full()) { |
409 | 0 | auto error_msg = fmt::format("invalid batch buffer status, capacity {}, size {}", |
410 | 0 | _batch_buffer.capacity(), _batch_buffer.size()); |
411 | 0 | return Status::InternalError(error_msg); |
412 | 0 | } |
413 | 109k | size_t append_size = _batch_buffer.append(content); |
414 | 109k | content.remove_prefix(append_size); |
415 | 109k | _bytes_appended += append_size; |
416 | 109k | if (_batch_buffer.full()) { |
417 | 4 | RETURN_IF_ERROR(_flush_buffer()); |
418 | 4 | } |
419 | 109k | } |
420 | 109k | return Status::OK(); |
421 | 109k | } |
422 | | |
423 | 109k | Status HdfsFileWriter::appendv(const Slice* data, size_t data_cnt) { |
424 | 109k | if (_state != State::OPENED) [[unlikely]] { |
425 | 0 | return Status::InternalError("append to closed file: {}", _path.native()); |
426 | 0 | } |
427 | | |
428 | 218k | for (size_t i = 0; i < data_cnt; i++) { |
429 | 109k | RETURN_IF_ERROR(_append({data[i].get_data(), data[i].get_size()})); |
430 | 109k | } |
431 | 109k | return Status::OK(); |
432 | 109k | } |
433 | | |
434 | | Result<FileWriterPtr> HdfsFileWriter::create(Path full_path, std::shared_ptr<HdfsHandler> handler, |
435 | | const std::string& fs_name, |
436 | 2.74k | const FileWriterOptions* opts) { |
437 | 2.74k | auto path = convert_path(full_path, fs_name); |
438 | | #ifdef USE_LIBHDFS3 |
439 | | std::string hdfs_dir = path.parent_path().string(); |
440 | | int exists = hdfsExists(handler->hdfs_fs, hdfs_dir.c_str()); |
441 | | if (exists != 0) { |
442 | | VLOG_NOTICE << "hdfs dir doesn't exist, create it: " << hdfs_dir; |
443 | | int ret = hdfsCreateDirectory(handler->hdfs_fs, hdfs_dir.c_str()); |
444 | | if (ret != 0) { |
445 | | std::stringstream ss; |
446 | | ss << "create dir failed. " |
447 | | << " fs_name: " << fs_name << " path: " << hdfs_dir << ", err: " << hdfs_error(); |
448 | | LOG(WARNING) << ss.str(); |
449 | | return ResultError(Status::InternalError(ss.str())); |
450 | | } |
451 | | } |
452 | | #endif |
453 | | // open file |
454 | | |
455 | 2.74k | hdfsFile hdfs_file = nullptr; |
456 | 2.74k | { |
457 | 2.74k | SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_open_latency); |
458 | 2.74k | hdfs_file = hdfsOpenFile(handler->hdfs_fs, path.c_str(), O_WRONLY, 0, 0, 0); |
459 | 2.74k | } |
460 | 2.74k | if (hdfs_file == nullptr) { |
461 | 0 | std::stringstream ss; |
462 | 0 | ss << "open file failed. " |
463 | 0 | << " fs_name:" << fs_name << " path:" << path << ", err: " << hdfs_error(); |
464 | 0 | LOG(WARNING) << ss.str(); |
465 | 0 | return ResultError(Status::InternalError(ss.str())); |
466 | 0 | } |
467 | 2.74k | VLOG_NOTICE << "open file. fs_name:" << fs_name << ", path:" << path; |
468 | 2.74k | inflight_hdfs_file_writer << 1; |
469 | 2.74k | return std::make_unique<HdfsFileWriter>(std::move(path), handler, hdfs_file, fs_name, opts); |
470 | 2.74k | } |
471 | | |
472 | | } // namespace doris::io |