be/src/format/native/native_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format/native/native_reader.h" |
19 | | |
20 | | #include <gen_cpp/data.pb.h> |
21 | | |
22 | | #include "core/block/block.h" |
23 | | #include "format/native/native_format.h" |
24 | | #include "io/file_factory.h" |
25 | | #include "io/fs/buffered_reader.h" |
26 | | #include "io/fs/file_reader.h" |
27 | | #include "io/fs/tracing_file_reader.h" |
28 | | #include "runtime/runtime_profile.h" |
29 | | #include "runtime/runtime_state.h" |
30 | | |
31 | | namespace doris { |
32 | | |
33 | | NativeReader::NativeReader(RuntimeProfile* profile, const TFileScanRangeParams& params, |
34 | | const TFileRangeDesc& range, io::IOContext* io_ctx, RuntimeState* state) |
35 | 9 | : _profile(profile), |
36 | 9 | _scan_params(params), |
37 | 9 | _scan_range(range), |
38 | 9 | _io_ctx(io_ctx), |
39 | 9 | _state(state) {} |
40 | | |
41 | 9 | NativeReader::~NativeReader() { |
42 | 9 | (void)close(); |
43 | 9 | } |
44 | | |
45 | | namespace { |
46 | | |
47 | | Status validate_and_consume_header(io::FileReaderSPtr file_reader, const TFileRangeDesc& range, |
48 | | int64_t* file_size, int64_t* current_offset, bool* eof, |
49 | 9 | const io::IOContext* io_ctx) { |
50 | 9 | *file_size = file_reader->size(); |
51 | 9 | *current_offset = 0; |
52 | 9 | *eof = (*file_size == 0); |
53 | | |
54 | | // Validate and consume Doris Native file header. |
55 | | // Expected layout: |
56 | | // [magic bytes "DORISN1\0"][uint32_t format_version][uint64_t block_size]... |
57 | 9 | static constexpr size_t HEADER_SIZE = sizeof(DORIS_NATIVE_MAGIC) + sizeof(uint32_t); |
58 | 9 | if (*eof || *file_size < static_cast<int64_t>(HEADER_SIZE)) { |
59 | 0 | return Status::InternalError( |
60 | 0 | "invalid Doris Native file {}, file size {} is smaller than header size {}", |
61 | 0 | range.path, *file_size, HEADER_SIZE); |
62 | 0 | } |
63 | | |
64 | 9 | char header[HEADER_SIZE]; |
65 | 9 | Slice header_slice(header, sizeof(header)); |
66 | 9 | size_t bytes_read = 0; |
67 | 9 | RETURN_IF_ERROR(file_reader->read_at(0, header_slice, &bytes_read, io_ctx)); |
68 | 9 | if (bytes_read != sizeof(header)) { |
69 | 0 | return Status::InternalError( |
70 | 0 | "failed to read Doris Native header from file {}, expect {} bytes, got {} bytes", |
71 | 0 | range.path, sizeof(header), bytes_read); |
72 | 0 | } |
73 | | |
74 | 9 | if (memcmp(header, DORIS_NATIVE_MAGIC, sizeof(DORIS_NATIVE_MAGIC)) != 0) { |
75 | 0 | return Status::InternalError("invalid Doris Native magic header in file {}", range.path); |
76 | 0 | } |
77 | | |
78 | 9 | uint32_t version = 0; |
79 | 9 | memcpy(&version, header + sizeof(DORIS_NATIVE_MAGIC), sizeof(uint32_t)); |
80 | 9 | if (version != DORIS_NATIVE_FORMAT_VERSION) { |
81 | 0 | return Status::InternalError( |
82 | 0 | "unsupported Doris Native format version {} in file {}, expect {}", version, |
83 | 0 | range.path, DORIS_NATIVE_FORMAT_VERSION); |
84 | 0 | } |
85 | | |
86 | 9 | *current_offset = sizeof(header); |
87 | 9 | *eof = (*file_size == *current_offset); |
88 | 9 | return Status::OK(); |
89 | 9 | } |
90 | | |
91 | | } // namespace |
92 | | |
93 | 88 | Status NativeReader::init_reader() { |
94 | 88 | if (_file_reader != nullptr) { |
95 | 79 | return Status::OK(); |
96 | 79 | } |
97 | | |
98 | | // Create underlying file reader. For now we always use random access mode. |
99 | 9 | io::FileSystemProperties system_properties; |
100 | 9 | io::FileDescription file_description; |
101 | 9 | file_description.file_size = -1; |
102 | 9 | if (_scan_range.__isset.file_size) { |
103 | 9 | file_description.file_size = _scan_range.file_size; |
104 | 9 | } |
105 | 9 | file_description.path = _scan_range.path; |
106 | 9 | if (_scan_range.__isset.fs_name) { |
107 | 0 | file_description.fs_name = _scan_range.fs_name; |
108 | 0 | } |
109 | 9 | if (_scan_range.__isset.modification_time) { |
110 | 0 | file_description.mtime = _scan_range.modification_time; |
111 | 9 | } else { |
112 | 9 | file_description.mtime = 0; |
113 | 9 | } |
114 | | |
115 | 9 | if (_scan_range.__isset.file_type) { |
116 | | // For compatibility with older FE. |
117 | 9 | system_properties.system_type = _scan_range.file_type; |
118 | 9 | } else { |
119 | 0 | system_properties.system_type = _scan_params.file_type; |
120 | 0 | } |
121 | 9 | system_properties.properties = _scan_params.properties; |
122 | 9 | system_properties.hdfs_params = _scan_params.hdfs_params; |
123 | 9 | if (_scan_params.__isset.broker_addresses) { |
124 | 0 | system_properties.broker_addresses.assign(_scan_params.broker_addresses.begin(), |
125 | 0 | _scan_params.broker_addresses.end()); |
126 | 0 | } |
127 | | |
128 | 9 | io::FileReaderOptions reader_options = |
129 | 9 | FileFactory::get_reader_options(_state, file_description); |
130 | 9 | auto reader_res = io::DelegateReader::create_file_reader( |
131 | 9 | _profile, system_properties, file_description, reader_options, |
132 | 9 | io::DelegateReader::AccessMode::RANDOM, _io_ctx); |
133 | 9 | if (!reader_res.has_value()) { |
134 | 0 | return reader_res.error(); |
135 | 0 | } |
136 | 9 | _file_reader = reader_res.value(); |
137 | | |
138 | 9 | if (_io_ctx) { |
139 | 0 | _file_reader = |
140 | 0 | std::make_shared<io::TracingFileReader>(_file_reader, _io_ctx->file_reader_stats); |
141 | 0 | } |
142 | | |
143 | 9 | RETURN_IF_ERROR(validate_and_consume_header(_file_reader, _scan_range, &_file_size, |
144 | 9 | &_current_offset, &_eof, _io_ctx)); |
145 | 9 | return Status::OK(); |
146 | 9 | } |
147 | | |
148 | 91 | Status NativeReader::_do_get_next_block(Block* block, size_t* read_rows, bool* eof) { |
149 | 91 | if (_eof) { |
150 | 5 | *read_rows = 0; |
151 | 5 | *eof = true; |
152 | 5 | return Status::OK(); |
153 | 5 | } |
154 | | |
155 | 86 | RETURN_IF_ERROR(init_reader()); |
156 | | |
157 | 86 | std::string buff; |
158 | 86 | bool local_eof = false; |
159 | | |
160 | | // If we have already loaded the first block for schema probing, use it first. |
161 | 86 | if (_first_block_loaded && !_first_block_consumed) { |
162 | 0 | buff = _first_block_buf; |
163 | 0 | local_eof = false; |
164 | 86 | } else { |
165 | 86 | RETURN_IF_ERROR(_read_next_pblock(&buff, &local_eof)); |
166 | 86 | } |
167 | | |
168 | | // If we reach EOF and also read no data for this call, the whole file is considered finished. |
169 | 86 | if (local_eof && buff.empty()) { |
170 | 1 | *read_rows = 0; |
171 | 1 | *eof = true; |
172 | 1 | _eof = true; |
173 | 1 | return Status::OK(); |
174 | 1 | } |
175 | | // If buffer is empty but we have not reached EOF yet, treat this as an error. |
176 | 85 | if (buff.empty()) { |
177 | 0 | return Status::InternalError("read empty native block from file {}", _scan_range.path); |
178 | 0 | } |
179 | | |
180 | 85 | PBlock pblock; |
181 | 85 | if (!pblock.ParseFromArray(buff.data(), static_cast<int>(buff.size()))) { |
182 | 0 | return Status::InternalError("Failed to parse native PBlock from file {}", |
183 | 0 | _scan_range.path); |
184 | 0 | } |
185 | | |
186 | | // Initialize schema from first block if not done yet. |
187 | 85 | if (!_schema_inited) { |
188 | 7 | RETURN_IF_ERROR(_init_schema_from_pblock(pblock)); |
189 | 7 | } |
190 | | |
191 | 85 | size_t uncompressed_bytes = 0; |
192 | 85 | int64_t decompress_time = 0; |
193 | 85 | RETURN_IF_ERROR(block->deserialize(pblock, &uncompressed_bytes, &decompress_time)); |
194 | | |
195 | | // For external file scan / TVF scenarios, unify all columns as nullable to match |
196 | | // GenericReader/SlotDescriptor convention. This ensures schema consistency when |
197 | | // some writers emit non-nullable columns. |
198 | 635 | for (size_t i = 0; i < block->columns(); ++i) { |
199 | 550 | auto& col_with_type = block->get_by_position(i); |
200 | 550 | if (!col_with_type.type->is_nullable()) { |
201 | 2 | col_with_type.column = make_nullable(col_with_type.column); |
202 | 2 | col_with_type.type = make_nullable(col_with_type.type); |
203 | 2 | } |
204 | 550 | } |
205 | | |
206 | 85 | *read_rows = block->rows(); |
207 | 85 | *eof = false; |
208 | | |
209 | 85 | if (_first_block_loaded && !_first_block_consumed) { |
210 | 0 | _first_block_consumed = true; |
211 | 0 | } |
212 | | |
213 | | // If we reached the physical end of file, mark eof for subsequent calls. |
214 | 85 | if (_current_offset >= _file_size) { |
215 | 7 | _eof = true; |
216 | 7 | } |
217 | | |
218 | 85 | return Status::OK(); |
219 | 85 | } |
220 | | |
221 | 1 | Status NativeReader::_get_columns_impl(std::unordered_map<std::string, DataTypePtr>* name_to_type) { |
222 | 1 | RETURN_IF_ERROR(init_reader()); |
223 | | |
224 | 1 | if (!_schema_inited) { |
225 | | // Load first block lazily to initialize schema. |
226 | 1 | if (!_first_block_loaded) { |
227 | 1 | bool local_eof = false; |
228 | 1 | RETURN_IF_ERROR(_read_next_pblock(&_first_block_buf, &local_eof)); |
229 | | // Treat file as empty only if we reach EOF and there is no block data at all. |
230 | 1 | if (local_eof && _first_block_buf.empty()) { |
231 | 0 | return Status::EndOfFile("empty native file {}", _scan_range.path); |
232 | 0 | } |
233 | | // Non-EOF but empty buffer means corrupted native file. |
234 | 1 | if (_first_block_buf.empty()) { |
235 | 0 | return Status::InternalError("first native block is empty {}", _scan_range.path); |
236 | 0 | } |
237 | 1 | _first_block_loaded = true; |
238 | 1 | } |
239 | | |
240 | 1 | PBlock pblock; |
241 | 1 | if (!pblock.ParseFromArray(_first_block_buf.data(), |
242 | 1 | static_cast<int>(_first_block_buf.size()))) { |
243 | 0 | return Status::InternalError("Failed to parse native PBlock for schema from file {}", |
244 | 0 | _scan_range.path); |
245 | 0 | } |
246 | 1 | RETURN_IF_ERROR(_init_schema_from_pblock(pblock)); |
247 | 1 | } |
248 | | |
249 | 7 | for (size_t i = 0; i < _schema_col_names.size(); ++i) { |
250 | 6 | name_to_type->emplace(_schema_col_names[i], _schema_col_types[i]); |
251 | 6 | } |
252 | 1 | return Status::OK(); |
253 | 1 | } |
254 | | |
255 | 0 | Status NativeReader::init_schema_reader() { |
256 | 0 | RETURN_IF_ERROR(init_reader()); |
257 | 0 | return Status::OK(); |
258 | 0 | } |
259 | | |
260 | | Status NativeReader::get_parsed_schema(std::vector<std::string>* col_names, |
261 | 1 | std::vector<DataTypePtr>* col_types) { |
262 | 1 | RETURN_IF_ERROR(init_reader()); |
263 | | |
264 | 1 | if (!_schema_inited) { |
265 | 0 | if (!_first_block_loaded) { |
266 | 0 | bool local_eof = false; |
267 | 0 | RETURN_IF_ERROR(_read_next_pblock(&_first_block_buf, &local_eof)); |
268 | | // Treat file as empty only if we reach EOF and there is no block data at all. |
269 | 0 | if (local_eof && _first_block_buf.empty()) { |
270 | 0 | return Status::EndOfFile("empty native file {}", _scan_range.path); |
271 | 0 | } |
272 | | // Non-EOF but empty buffer means corrupted native file. |
273 | 0 | if (_first_block_buf.empty()) { |
274 | 0 | return Status::InternalError("first native block is empty {}", _scan_range.path); |
275 | 0 | } |
276 | 0 | _first_block_loaded = true; |
277 | 0 | } |
278 | | |
279 | 0 | PBlock pblock; |
280 | 0 | if (!pblock.ParseFromArray(_first_block_buf.data(), |
281 | 0 | static_cast<int>(_first_block_buf.size()))) { |
282 | 0 | return Status::InternalError("Failed to parse native PBlock for schema from file {}", |
283 | 0 | _scan_range.path); |
284 | 0 | } |
285 | 0 | RETURN_IF_ERROR(_init_schema_from_pblock(pblock)); |
286 | 0 | } |
287 | | |
288 | 1 | *col_names = _schema_col_names; |
289 | 1 | *col_types = _schema_col_types; |
290 | 1 | return Status::OK(); |
291 | 1 | } |
292 | | |
293 | 9 | Status NativeReader::close() { |
294 | 9 | _file_reader.reset(); |
295 | 9 | return Status::OK(); |
296 | 9 | } |
297 | | |
298 | 87 | Status NativeReader::_read_next_pblock(std::string* buff, bool* eof) { |
299 | 87 | *eof = false; |
300 | 87 | buff->clear(); |
301 | | |
302 | 87 | if (_file_reader == nullptr) { |
303 | 0 | RETURN_IF_ERROR(init_reader()); |
304 | 0 | } |
305 | | |
306 | 87 | if (_current_offset >= _file_size) { |
307 | 1 | *eof = true; |
308 | 1 | return Status::OK(); |
309 | 1 | } |
310 | | |
311 | 86 | uint64_t len = 0; |
312 | 86 | Slice len_slice(reinterpret_cast<char*>(&len), sizeof(len)); |
313 | 86 | size_t bytes_read = 0; |
314 | 86 | RETURN_IF_ERROR(_file_reader->read_at(_current_offset, len_slice, &bytes_read, _io_ctx)); |
315 | 86 | if (bytes_read == 0) { |
316 | 0 | *eof = true; |
317 | 0 | return Status::OK(); |
318 | 0 | } |
319 | 86 | if (bytes_read != sizeof(len)) { |
320 | 0 | return Status::InternalError( |
321 | 0 | "Failed to read native block length from file {}, expect {}, " |
322 | 0 | "actual {}", |
323 | 0 | _scan_range.path, sizeof(len), bytes_read); |
324 | 0 | } |
325 | | |
326 | 86 | _current_offset += sizeof(len); |
327 | 86 | if (len == 0) { |
328 | | // Empty block, nothing to read. |
329 | 0 | *eof = (_current_offset >= _file_size); |
330 | 0 | return Status::OK(); |
331 | 0 | } |
332 | | |
333 | 86 | buff->assign(len, '\0'); |
334 | 86 | Slice data_slice(buff->data(), len); |
335 | 86 | bytes_read = 0; |
336 | 86 | RETURN_IF_ERROR(_file_reader->read_at(_current_offset, data_slice, &bytes_read, _io_ctx)); |
337 | 86 | if (bytes_read != len) { |
338 | 0 | return Status::InternalError( |
339 | 0 | "Failed to read native block body from file {}, expect {}, " |
340 | 0 | "actual {}", |
341 | 0 | _scan_range.path, len, bytes_read); |
342 | 0 | } |
343 | | |
344 | 86 | _current_offset += len; |
345 | 86 | *eof = (_current_offset >= _file_size); |
346 | 86 | return Status::OK(); |
347 | 86 | } |
348 | | |
349 | 8 | Status NativeReader::_init_schema_from_pblock(const PBlock& pblock) { |
350 | 8 | _schema_col_names.clear(); |
351 | 8 | _schema_col_types.clear(); |
352 | | |
353 | 88 | for (const auto& pcol_meta : pblock.column_metas()) { |
354 | 88 | DataTypePtr type = make_nullable(DataTypeFactory::instance().create_data_type(pcol_meta)); |
355 | 88 | VLOG_DEBUG << "init_schema_from_pblock, name=" << pcol_meta.name() |
356 | 0 | << ", type=" << type->get_name(); |
357 | 88 | _schema_col_names.emplace_back(pcol_meta.name()); |
358 | 88 | _schema_col_types.emplace_back(type); |
359 | 88 | } |
360 | 8 | _schema_inited = true; |
361 | 8 | return Status::OK(); |
362 | 8 | } |
363 | | |
364 | | } // namespace doris |