be/src/format/native/native_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format/native/native_reader.h" |
19 | | |
20 | | #include <gen_cpp/data.pb.h> |
21 | | |
22 | | #include "core/block/block.h" |
23 | | #include "format/native/native_format.h" |
24 | | #include "io/file_factory.h" |
25 | | #include "io/fs/buffered_reader.h" |
26 | | #include "io/fs/file_reader.h" |
27 | | #include "io/fs/tracing_file_reader.h" |
28 | | #include "runtime/runtime_profile.h" |
29 | | #include "runtime/runtime_state.h" |
30 | | |
31 | | namespace doris { |
32 | | |
33 | | #include "common/compile_check_begin.h" |
34 | | |
35 | | NativeReader::NativeReader(RuntimeProfile* profile, const TFileScanRangeParams& params, |
36 | | const TFileRangeDesc& range, io::IOContext* io_ctx, RuntimeState* state) |
37 | 14 | : _profile(profile), |
38 | 14 | _scan_params(params), |
39 | 14 | _scan_range(range), |
40 | 14 | _io_ctx(io_ctx), |
41 | 14 | _state(state) {} |
42 | | |
43 | 14 | NativeReader::~NativeReader() { |
44 | 14 | (void)close(); |
45 | 14 | } |
46 | | |
47 | | namespace { |
48 | | |
49 | | Status validate_and_consume_header(io::FileReaderSPtr file_reader, const TFileRangeDesc& range, |
50 | 14 | int64_t* file_size, int64_t* current_offset, bool* eof) { |
51 | 14 | *file_size = file_reader->size(); |
52 | 14 | *current_offset = 0; |
53 | 14 | *eof = (*file_size == 0); |
54 | | |
55 | | // Validate and consume Doris Native file header. |
56 | | // Expected layout: |
57 | | // [magic bytes "DORISN1\0"][uint32_t format_version][uint64_t block_size]... |
58 | 14 | static constexpr size_t HEADER_SIZE = sizeof(DORIS_NATIVE_MAGIC) + sizeof(uint32_t); |
59 | 14 | if (*eof || *file_size < static_cast<int64_t>(HEADER_SIZE)) { |
60 | 0 | return Status::InternalError( |
61 | 0 | "invalid Doris Native file {}, file size {} is smaller than header size {}", |
62 | 0 | range.path, *file_size, HEADER_SIZE); |
63 | 0 | } |
64 | | |
65 | 14 | char header[HEADER_SIZE]; |
66 | 14 | Slice header_slice(header, sizeof(header)); |
67 | 14 | size_t bytes_read = 0; |
68 | 14 | RETURN_IF_ERROR(file_reader->read_at(0, header_slice, &bytes_read)); |
69 | 14 | if (bytes_read != sizeof(header)) { |
70 | 0 | return Status::InternalError( |
71 | 0 | "failed to read Doris Native header from file {}, expect {} bytes, got {} bytes", |
72 | 0 | range.path, sizeof(header), bytes_read); |
73 | 0 | } |
74 | | |
75 | 14 | if (memcmp(header, DORIS_NATIVE_MAGIC, sizeof(DORIS_NATIVE_MAGIC)) != 0) { |
76 | 0 | return Status::InternalError("invalid Doris Native magic header in file {}", range.path); |
77 | 0 | } |
78 | | |
79 | 14 | uint32_t version = 0; |
80 | 14 | memcpy(&version, header + sizeof(DORIS_NATIVE_MAGIC), sizeof(uint32_t)); |
81 | 14 | if (version != DORIS_NATIVE_FORMAT_VERSION) { |
82 | 0 | return Status::InternalError( |
83 | 0 | "unsupported Doris Native format version {} in file {}, expect {}", version, |
84 | 0 | range.path, DORIS_NATIVE_FORMAT_VERSION); |
85 | 0 | } |
86 | | |
87 | 14 | *current_offset = sizeof(header); |
88 | 14 | *eof = (*file_size == *current_offset); |
89 | 14 | return Status::OK(); |
90 | 14 | } |
91 | | |
92 | | } // namespace |
93 | | |
94 | 108 | Status NativeReader::init_reader() { |
95 | 108 | if (_file_reader != nullptr) { |
96 | 94 | return Status::OK(); |
97 | 94 | } |
98 | | |
99 | | // Create underlying file reader. For now we always use random access mode. |
100 | 14 | io::FileSystemProperties system_properties; |
101 | 14 | io::FileDescription file_description; |
102 | 14 | file_description.file_size = -1; |
103 | 14 | if (_scan_range.__isset.file_size) { |
104 | 14 | file_description.file_size = _scan_range.file_size; |
105 | 14 | } |
106 | 14 | file_description.path = _scan_range.path; |
107 | 14 | if (_scan_range.__isset.fs_name) { |
108 | 0 | file_description.fs_name = _scan_range.fs_name; |
109 | 0 | } |
110 | 14 | if (_scan_range.__isset.modification_time) { |
111 | 5 | file_description.mtime = _scan_range.modification_time; |
112 | 9 | } else { |
113 | 9 | file_description.mtime = 0; |
114 | 9 | } |
115 | | |
116 | 14 | if (_scan_range.__isset.file_type) { |
117 | | // For compatibility with older FE. |
118 | 13 | system_properties.system_type = _scan_range.file_type; |
119 | 13 | } else { |
120 | 1 | system_properties.system_type = _scan_params.file_type; |
121 | 1 | } |
122 | 14 | system_properties.properties = _scan_params.properties; |
123 | 14 | system_properties.hdfs_params = _scan_params.hdfs_params; |
124 | 14 | if (_scan_params.__isset.broker_addresses) { |
125 | 1 | system_properties.broker_addresses.assign(_scan_params.broker_addresses.begin(), |
126 | 1 | _scan_params.broker_addresses.end()); |
127 | 1 | } |
128 | | |
129 | 14 | io::FileReaderOptions reader_options = |
130 | 14 | FileFactory::get_reader_options(_state, file_description); |
131 | 14 | auto reader_res = io::DelegateReader::create_file_reader( |
132 | 14 | _profile, system_properties, file_description, reader_options, |
133 | 14 | io::DelegateReader::AccessMode::RANDOM, _io_ctx); |
134 | 14 | if (!reader_res.has_value()) { |
135 | 0 | return reader_res.error(); |
136 | 0 | } |
137 | 14 | _file_reader = reader_res.value(); |
138 | | |
139 | 14 | if (_io_ctx) { |
140 | 5 | _file_reader = |
141 | 5 | std::make_shared<io::TracingFileReader>(_file_reader, _io_ctx->file_reader_stats); |
142 | 5 | } |
143 | | |
144 | 14 | RETURN_IF_ERROR(validate_and_consume_header(_file_reader, _scan_range, &_file_size, |
145 | 14 | &_current_offset, &_eof)); |
146 | 14 | return Status::OK(); |
147 | 14 | } |
148 | | |
149 | 104 | Status NativeReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { |
150 | 104 | if (_eof) { |
151 | 8 | *read_rows = 0; |
152 | 8 | *eof = true; |
153 | 8 | return Status::OK(); |
154 | 8 | } |
155 | | |
156 | 96 | RETURN_IF_ERROR(init_reader()); |
157 | | |
158 | 96 | std::string buff; |
159 | 96 | bool local_eof = false; |
160 | | |
161 | | // If we have already loaded the first block for schema probing, use it first. |
162 | 96 | if (_first_block_loaded && !_first_block_consumed) { |
163 | 3 | buff = _first_block_buf; |
164 | 3 | local_eof = false; |
165 | 93 | } else { |
166 | 93 | RETURN_IF_ERROR(_read_next_pblock(&buff, &local_eof)); |
167 | 93 | } |
168 | | |
169 | | // If we reach EOF and also read no data for this call, the whole file is considered finished. |
170 | 96 | if (local_eof && buff.empty()) { |
171 | 1 | *read_rows = 0; |
172 | 1 | *eof = true; |
173 | 1 | _eof = true; |
174 | 1 | return Status::OK(); |
175 | 1 | } |
176 | | // If buffer is empty but we have not reached EOF yet, treat this as an error. |
177 | 95 | if (buff.empty()) { |
178 | 0 | return Status::InternalError("read empty native block from file {}", _scan_range.path); |
179 | 0 | } |
180 | | |
181 | 95 | PBlock pblock; |
182 | 95 | if (!pblock.ParseFromArray(buff.data(), static_cast<int>(buff.size()))) { |
183 | 0 | return Status::InternalError("Failed to parse native PBlock from file {}", |
184 | 0 | _scan_range.path); |
185 | 0 | } |
186 | | |
187 | | // Initialize schema from first block if not done yet. |
188 | 95 | if (!_schema_inited) { |
189 | 7 | RETURN_IF_ERROR(_init_schema_from_pblock(pblock)); |
190 | 7 | } |
191 | | |
192 | 95 | size_t uncompressed_bytes = 0; |
193 | 95 | int64_t decompress_time = 0; |
194 | 95 | RETURN_IF_ERROR(block->deserialize(pblock, &uncompressed_bytes, &decompress_time)); |
195 | | |
196 | | // For external file scan / TVF scenarios, unify all columns as nullable to match |
197 | | // GenericReader/SlotDescriptor convention. This ensures schema consistency when |
198 | | // some writers emit non-nullable columns. |
199 | 677 | for (size_t i = 0; i < block->columns(); ++i) { |
200 | 582 | auto& col_with_type = block->get_by_position(i); |
201 | 582 | if (!col_with_type.type->is_nullable()) { |
202 | 16 | col_with_type.column = make_nullable(col_with_type.column); |
203 | 16 | col_with_type.type = make_nullable(col_with_type.type); |
204 | 16 | } |
205 | 582 | } |
206 | | |
207 | 95 | *read_rows = block->rows(); |
208 | 95 | *eof = false; |
209 | | |
210 | 95 | if (_first_block_loaded && !_first_block_consumed) { |
211 | 3 | _first_block_consumed = true; |
212 | 3 | } |
213 | | |
214 | | // If we reached the physical end of file, mark eof for subsequent calls. |
215 | 95 | if (_current_offset >= _file_size) { |
216 | 10 | _eof = true; |
217 | 10 | } |
218 | | |
219 | 95 | return Status::OK(); |
220 | 95 | } |
221 | | |
222 | | Status NativeReader::get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type, |
223 | 4 | std::unordered_set<std::string>* missing_cols) { |
224 | 4 | missing_cols->clear(); |
225 | 4 | RETURN_IF_ERROR(init_reader()); |
226 | | |
227 | 4 | if (!_schema_inited) { |
228 | | // Load first block lazily to initialize schema. |
229 | 4 | if (!_first_block_loaded) { |
230 | 4 | bool local_eof = false; |
231 | 4 | RETURN_IF_ERROR(_read_next_pblock(&_first_block_buf, &local_eof)); |
232 | | // Treat file as empty only if we reach EOF and there is no block data at all. |
233 | 4 | if (local_eof && _first_block_buf.empty()) { |
234 | 0 | return Status::EndOfFile("empty native file {}", _scan_range.path); |
235 | 0 | } |
236 | | // Non-EOF but empty buffer means corrupted native file. |
237 | 4 | if (_first_block_buf.empty()) { |
238 | 0 | return Status::InternalError("first native block is empty {}", _scan_range.path); |
239 | 0 | } |
240 | 4 | _first_block_loaded = true; |
241 | 4 | } |
242 | | |
243 | 4 | PBlock pblock; |
244 | 4 | if (!pblock.ParseFromArray(_first_block_buf.data(), |
245 | 4 | static_cast<int>(_first_block_buf.size()))) { |
246 | 0 | return Status::InternalError("Failed to parse native PBlock for schema from file {}", |
247 | 0 | _scan_range.path); |
248 | 0 | } |
249 | 4 | RETURN_IF_ERROR(_init_schema_from_pblock(pblock)); |
250 | 4 | } |
251 | | |
252 | 22 | for (size_t i = 0; i < _schema_col_names.size(); ++i) { |
253 | 18 | name_to_type->emplace(_schema_col_names[i], _schema_col_types[i]); |
254 | 18 | } |
255 | 4 | return Status::OK(); |
256 | 4 | } |
257 | | |
258 | 2 | Status NativeReader::init_schema_reader() { |
259 | 2 | RETURN_IF_ERROR(init_reader()); |
260 | 2 | return Status::OK(); |
261 | 2 | } |
262 | | |
263 | | Status NativeReader::get_parsed_schema(std::vector<std::string>* col_names, |
264 | 3 | std::vector<DataTypePtr>* col_types) { |
265 | 3 | RETURN_IF_ERROR(init_reader()); |
266 | | |
267 | 3 | if (!_schema_inited) { |
268 | 2 | if (!_first_block_loaded) { |
269 | 2 | bool local_eof = false; |
270 | 2 | RETURN_IF_ERROR(_read_next_pblock(&_first_block_buf, &local_eof)); |
271 | | // Treat file as empty only if we reach EOF and there is no block data at all. |
272 | 2 | if (local_eof && _first_block_buf.empty()) { |
273 | 0 | return Status::EndOfFile("empty native file {}", _scan_range.path); |
274 | 0 | } |
275 | | // Non-EOF but empty buffer means corrupted native file. |
276 | 2 | if (_first_block_buf.empty()) { |
277 | 0 | return Status::InternalError("first native block is empty {}", _scan_range.path); |
278 | 0 | } |
279 | 2 | _first_block_loaded = true; |
280 | 2 | } |
281 | | |
282 | 2 | PBlock pblock; |
283 | 2 | if (!pblock.ParseFromArray(_first_block_buf.data(), |
284 | 2 | static_cast<int>(_first_block_buf.size()))) { |
285 | 0 | return Status::InternalError("Failed to parse native PBlock for schema from file {}", |
286 | 0 | _scan_range.path); |
287 | 0 | } |
288 | 2 | RETURN_IF_ERROR(_init_schema_from_pblock(pblock)); |
289 | 2 | } |
290 | | |
291 | 3 | *col_names = _schema_col_names; |
292 | 3 | *col_types = _schema_col_types; |
293 | 3 | return Status::OK(); |
294 | 3 | } |
295 | | |
296 | 17 | Status NativeReader::close() { |
297 | 17 | _file_reader.reset(); |
298 | 17 | return Status::OK(); |
299 | 17 | } |
300 | | |
301 | 99 | Status NativeReader::_read_next_pblock(std::string* buff, bool* eof) { |
302 | 99 | *eof = false; |
303 | 99 | buff->clear(); |
304 | | |
305 | 99 | if (_file_reader == nullptr) { |
306 | 0 | RETURN_IF_ERROR(init_reader()); |
307 | 0 | } |
308 | | |
309 | 99 | if (_current_offset >= _file_size) { |
310 | 1 | *eof = true; |
311 | 1 | return Status::OK(); |
312 | 1 | } |
313 | | |
314 | 98 | uint64_t len = 0; |
315 | 98 | Slice len_slice(reinterpret_cast<char*>(&len), sizeof(len)); |
316 | 98 | size_t bytes_read = 0; |
317 | 98 | RETURN_IF_ERROR(_file_reader->read_at(_current_offset, len_slice, &bytes_read)); |
318 | 98 | if (bytes_read == 0) { |
319 | 0 | *eof = true; |
320 | 0 | return Status::OK(); |
321 | 0 | } |
322 | 98 | if (bytes_read != sizeof(len)) { |
323 | 0 | return Status::InternalError( |
324 | 0 | "Failed to read native block length from file {}, expect {}, " |
325 | 0 | "actual {}", |
326 | 0 | _scan_range.path, sizeof(len), bytes_read); |
327 | 0 | } |
328 | | |
329 | 98 | _current_offset += sizeof(len); |
330 | 98 | if (len == 0) { |
331 | | // Empty block, nothing to read. |
332 | 0 | *eof = (_current_offset >= _file_size); |
333 | 0 | return Status::OK(); |
334 | 0 | } |
335 | | |
336 | 98 | buff->assign(len, '\0'); |
337 | 98 | Slice data_slice(buff->data(), len); |
338 | 98 | bytes_read = 0; |
339 | 98 | RETURN_IF_ERROR(_file_reader->read_at(_current_offset, data_slice, &bytes_read)); |
340 | 98 | if (bytes_read != len) { |
341 | 0 | return Status::InternalError( |
342 | 0 | "Failed to read native block body from file {}, expect {}, " |
343 | 0 | "actual {}", |
344 | 0 | _scan_range.path, len, bytes_read); |
345 | 0 | } |
346 | | |
347 | 98 | _current_offset += len; |
348 | 98 | *eof = (_current_offset >= _file_size); |
349 | 98 | return Status::OK(); |
350 | 98 | } |
351 | | |
352 | 13 | Status NativeReader::_init_schema_from_pblock(const PBlock& pblock) { |
353 | 13 | _schema_col_names.clear(); |
354 | 13 | _schema_col_types.clear(); |
355 | | |
356 | 110 | for (const auto& pcol_meta : pblock.column_metas()) { |
357 | 110 | DataTypePtr type = make_nullable(DataTypeFactory::instance().create_data_type(pcol_meta)); |
358 | 110 | VLOG_DEBUG << "init_schema_from_pblock, name=" << pcol_meta.name() |
359 | 0 | << ", type=" << type->get_name(); |
360 | 110 | _schema_col_names.emplace_back(pcol_meta.name()); |
361 | 110 | _schema_col_types.emplace_back(type); |
362 | 110 | } |
363 | 13 | _schema_inited = true; |
364 | 13 | return Status::OK(); |
365 | 13 | } |
366 | | |
367 | | #include "common/compile_check_end.h" |
368 | | |
369 | | } // namespace doris |