be/src/format/native/native_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format/native/native_reader.h" |
19 | | |
20 | | #include <gen_cpp/data.pb.h> |
21 | | |
22 | | #include "core/block/block.h" |
23 | | #include "format/native/native_format.h" |
24 | | #include "io/file_factory.h" |
25 | | #include "io/fs/buffered_reader.h" |
26 | | #include "io/fs/file_reader.h" |
27 | | #include "io/fs/tracing_file_reader.h" |
28 | | #include "runtime/runtime_profile.h" |
29 | | #include "runtime/runtime_state.h" |
30 | | |
31 | | namespace doris { |
32 | | |
33 | | NativeReader::NativeReader(RuntimeProfile* profile, const TFileScanRangeParams& params, |
34 | | const TFileRangeDesc& range, io::IOContext* io_ctx, RuntimeState* state) |
35 | 9 | : _profile(profile), |
36 | 9 | _scan_params(params), |
37 | 9 | _scan_range(range), |
38 | 9 | _io_ctx(io_ctx), |
39 | 9 | _state(state) {} |
40 | | |
41 | 9 | NativeReader::~NativeReader() { |
42 | 9 | (void)close(); |
43 | 9 | } |
44 | | |
45 | | namespace { |
46 | | |
47 | | Status validate_and_consume_header(io::FileReaderSPtr file_reader, const TFileRangeDesc& range, |
48 | 9 | int64_t* file_size, int64_t* current_offset, bool* eof) { |
49 | 9 | *file_size = file_reader->size(); |
50 | 9 | *current_offset = 0; |
51 | 9 | *eof = (*file_size == 0); |
52 | | |
53 | | // Validate and consume Doris Native file header. |
54 | | // Expected layout: |
55 | | // [magic bytes "DORISN1\0"][uint32_t format_version][uint64_t block_size]... |
56 | 9 | static constexpr size_t HEADER_SIZE = sizeof(DORIS_NATIVE_MAGIC) + sizeof(uint32_t); |
57 | 9 | if (*eof || *file_size < static_cast<int64_t>(HEADER_SIZE)) { |
58 | 0 | return Status::InternalError( |
59 | 0 | "invalid Doris Native file {}, file size {} is smaller than header size {}", |
60 | 0 | range.path, *file_size, HEADER_SIZE); |
61 | 0 | } |
62 | | |
63 | 9 | char header[HEADER_SIZE]; |
64 | 9 | Slice header_slice(header, sizeof(header)); |
65 | 9 | size_t bytes_read = 0; |
66 | 9 | RETURN_IF_ERROR(file_reader->read_at(0, header_slice, &bytes_read)); |
67 | 9 | if (bytes_read != sizeof(header)) { |
68 | 0 | return Status::InternalError( |
69 | 0 | "failed to read Doris Native header from file {}, expect {} bytes, got {} bytes", |
70 | 0 | range.path, sizeof(header), bytes_read); |
71 | 0 | } |
72 | | |
73 | 9 | if (memcmp(header, DORIS_NATIVE_MAGIC, sizeof(DORIS_NATIVE_MAGIC)) != 0) { |
74 | 0 | return Status::InternalError("invalid Doris Native magic header in file {}", range.path); |
75 | 0 | } |
76 | | |
77 | 9 | uint32_t version = 0; |
78 | 9 | memcpy(&version, header + sizeof(DORIS_NATIVE_MAGIC), sizeof(uint32_t)); |
79 | 9 | if (version != DORIS_NATIVE_FORMAT_VERSION) { |
80 | 0 | return Status::InternalError( |
81 | 0 | "unsupported Doris Native format version {} in file {}, expect {}", version, |
82 | 0 | range.path, DORIS_NATIVE_FORMAT_VERSION); |
83 | 0 | } |
84 | | |
85 | 9 | *current_offset = sizeof(header); |
86 | 9 | *eof = (*file_size == *current_offset); |
87 | 9 | return Status::OK(); |
88 | 9 | } |
89 | | |
90 | | } // namespace |
91 | | |
92 | 88 | Status NativeReader::init_reader() { |
93 | 88 | if (_file_reader != nullptr) { |
94 | 79 | return Status::OK(); |
95 | 79 | } |
96 | | |
97 | | // Create underlying file reader. For now we always use random access mode. |
98 | 9 | io::FileSystemProperties system_properties; |
99 | 9 | io::FileDescription file_description; |
100 | 9 | file_description.file_size = -1; |
101 | 9 | if (_scan_range.__isset.file_size) { |
102 | 9 | file_description.file_size = _scan_range.file_size; |
103 | 9 | } |
104 | 9 | file_description.path = _scan_range.path; |
105 | 9 | if (_scan_range.__isset.fs_name) { |
106 | 0 | file_description.fs_name = _scan_range.fs_name; |
107 | 0 | } |
108 | 9 | if (_scan_range.__isset.modification_time) { |
109 | 0 | file_description.mtime = _scan_range.modification_time; |
110 | 9 | } else { |
111 | 9 | file_description.mtime = 0; |
112 | 9 | } |
113 | | |
114 | 9 | if (_scan_range.__isset.file_type) { |
115 | | // For compatibility with older FE. |
116 | 9 | system_properties.system_type = _scan_range.file_type; |
117 | 9 | } else { |
118 | 0 | system_properties.system_type = _scan_params.file_type; |
119 | 0 | } |
120 | 9 | system_properties.properties = _scan_params.properties; |
121 | 9 | system_properties.hdfs_params = _scan_params.hdfs_params; |
122 | 9 | if (_scan_params.__isset.broker_addresses) { |
123 | 0 | system_properties.broker_addresses.assign(_scan_params.broker_addresses.begin(), |
124 | 0 | _scan_params.broker_addresses.end()); |
125 | 0 | } |
126 | | |
127 | 9 | io::FileReaderOptions reader_options = |
128 | 9 | FileFactory::get_reader_options(_state, file_description); |
129 | 9 | auto reader_res = io::DelegateReader::create_file_reader( |
130 | 9 | _profile, system_properties, file_description, reader_options, |
131 | 9 | io::DelegateReader::AccessMode::RANDOM, _io_ctx); |
132 | 9 | if (!reader_res.has_value()) { |
133 | 0 | return reader_res.error(); |
134 | 0 | } |
135 | 9 | _file_reader = reader_res.value(); |
136 | | |
137 | 9 | if (_io_ctx) { |
138 | 0 | _file_reader = |
139 | 0 | std::make_shared<io::TracingFileReader>(_file_reader, _io_ctx->file_reader_stats); |
140 | 0 | } |
141 | | |
142 | 9 | RETURN_IF_ERROR(validate_and_consume_header(_file_reader, _scan_range, &_file_size, |
143 | 9 | &_current_offset, &_eof)); |
144 | 9 | return Status::OK(); |
145 | 9 | } |
146 | | |
147 | 91 | Status NativeReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { |
148 | 91 | if (_eof) { |
149 | 5 | *read_rows = 0; |
150 | 5 | *eof = true; |
151 | 5 | return Status::OK(); |
152 | 5 | } |
153 | | |
154 | 86 | RETURN_IF_ERROR(init_reader()); |
155 | | |
156 | 86 | std::string buff; |
157 | 86 | bool local_eof = false; |
158 | | |
159 | | // If we have already loaded the first block for schema probing, use it first. |
160 | 86 | if (_first_block_loaded && !_first_block_consumed) { |
161 | 0 | buff = _first_block_buf; |
162 | 0 | local_eof = false; |
163 | 86 | } else { |
164 | 86 | RETURN_IF_ERROR(_read_next_pblock(&buff, &local_eof)); |
165 | 86 | } |
166 | | |
167 | | // If we reach EOF and also read no data for this call, the whole file is considered finished. |
168 | 86 | if (local_eof && buff.empty()) { |
169 | 1 | *read_rows = 0; |
170 | 1 | *eof = true; |
171 | 1 | _eof = true; |
172 | 1 | return Status::OK(); |
173 | 1 | } |
174 | | // If buffer is empty but we have not reached EOF yet, treat this as an error. |
175 | 85 | if (buff.empty()) { |
176 | 0 | return Status::InternalError("read empty native block from file {}", _scan_range.path); |
177 | 0 | } |
178 | | |
179 | 85 | PBlock pblock; |
180 | 85 | if (!pblock.ParseFromArray(buff.data(), static_cast<int>(buff.size()))) { |
181 | 0 | return Status::InternalError("Failed to parse native PBlock from file {}", |
182 | 0 | _scan_range.path); |
183 | 0 | } |
184 | | |
185 | | // Initialize schema from first block if not done yet. |
186 | 85 | if (!_schema_inited) { |
187 | 7 | RETURN_IF_ERROR(_init_schema_from_pblock(pblock)); |
188 | 7 | } |
189 | | |
190 | 85 | size_t uncompressed_bytes = 0; |
191 | 85 | int64_t decompress_time = 0; |
192 | 85 | RETURN_IF_ERROR(block->deserialize(pblock, &uncompressed_bytes, &decompress_time)); |
193 | | |
194 | | // For external file scan / TVF scenarios, unify all columns as nullable to match |
195 | | // GenericReader/SlotDescriptor convention. This ensures schema consistency when |
196 | | // some writers emit non-nullable columns. |
197 | 635 | for (size_t i = 0; i < block->columns(); ++i) { |
198 | 550 | auto& col_with_type = block->get_by_position(i); |
199 | 550 | if (!col_with_type.type->is_nullable()) { |
200 | 2 | col_with_type.column = make_nullable(col_with_type.column); |
201 | 2 | col_with_type.type = make_nullable(col_with_type.type); |
202 | 2 | } |
203 | 550 | } |
204 | | |
205 | 85 | *read_rows = block->rows(); |
206 | 85 | *eof = false; |
207 | | |
208 | 85 | if (_first_block_loaded && !_first_block_consumed) { |
209 | 0 | _first_block_consumed = true; |
210 | 0 | } |
211 | | |
212 | | // If we reached the physical end of file, mark eof for subsequent calls. |
213 | 85 | if (_current_offset >= _file_size) { |
214 | 7 | _eof = true; |
215 | 7 | } |
216 | | |
217 | 85 | return Status::OK(); |
218 | 85 | } |
219 | | |
220 | | Status NativeReader::get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type, |
221 | 1 | std::unordered_set<std::string>* missing_cols) { |
222 | 1 | missing_cols->clear(); |
223 | 1 | RETURN_IF_ERROR(init_reader()); |
224 | | |
225 | 1 | if (!_schema_inited) { |
226 | | // Load first block lazily to initialize schema. |
227 | 1 | if (!_first_block_loaded) { |
228 | 1 | bool local_eof = false; |
229 | 1 | RETURN_IF_ERROR(_read_next_pblock(&_first_block_buf, &local_eof)); |
230 | | // Treat file as empty only if we reach EOF and there is no block data at all. |
231 | 1 | if (local_eof && _first_block_buf.empty()) { |
232 | 0 | return Status::EndOfFile("empty native file {}", _scan_range.path); |
233 | 0 | } |
234 | | // Non-EOF but empty buffer means corrupted native file. |
235 | 1 | if (_first_block_buf.empty()) { |
236 | 0 | return Status::InternalError("first native block is empty {}", _scan_range.path); |
237 | 0 | } |
238 | 1 | _first_block_loaded = true; |
239 | 1 | } |
240 | | |
241 | 1 | PBlock pblock; |
242 | 1 | if (!pblock.ParseFromArray(_first_block_buf.data(), |
243 | 1 | static_cast<int>(_first_block_buf.size()))) { |
244 | 0 | return Status::InternalError("Failed to parse native PBlock for schema from file {}", |
245 | 0 | _scan_range.path); |
246 | 0 | } |
247 | 1 | RETURN_IF_ERROR(_init_schema_from_pblock(pblock)); |
248 | 1 | } |
249 | | |
250 | 7 | for (size_t i = 0; i < _schema_col_names.size(); ++i) { |
251 | 6 | name_to_type->emplace(_schema_col_names[i], _schema_col_types[i]); |
252 | 6 | } |
253 | 1 | return Status::OK(); |
254 | 1 | } |
255 | | |
256 | 0 | Status NativeReader::init_schema_reader() { |
257 | 0 | RETURN_IF_ERROR(init_reader()); |
258 | 0 | return Status::OK(); |
259 | 0 | } |
260 | | |
261 | | Status NativeReader::get_parsed_schema(std::vector<std::string>* col_names, |
262 | 1 | std::vector<DataTypePtr>* col_types) { |
263 | 1 | RETURN_IF_ERROR(init_reader()); |
264 | | |
265 | 1 | if (!_schema_inited) { |
266 | 0 | if (!_first_block_loaded) { |
267 | 0 | bool local_eof = false; |
268 | 0 | RETURN_IF_ERROR(_read_next_pblock(&_first_block_buf, &local_eof)); |
269 | | // Treat file as empty only if we reach EOF and there is no block data at all. |
270 | 0 | if (local_eof && _first_block_buf.empty()) { |
271 | 0 | return Status::EndOfFile("empty native file {}", _scan_range.path); |
272 | 0 | } |
273 | | // Non-EOF but empty buffer means corrupted native file. |
274 | 0 | if (_first_block_buf.empty()) { |
275 | 0 | return Status::InternalError("first native block is empty {}", _scan_range.path); |
276 | 0 | } |
277 | 0 | _first_block_loaded = true; |
278 | 0 | } |
279 | | |
280 | 0 | PBlock pblock; |
281 | 0 | if (!pblock.ParseFromArray(_first_block_buf.data(), |
282 | 0 | static_cast<int>(_first_block_buf.size()))) { |
283 | 0 | return Status::InternalError("Failed to parse native PBlock for schema from file {}", |
284 | 0 | _scan_range.path); |
285 | 0 | } |
286 | 0 | RETURN_IF_ERROR(_init_schema_from_pblock(pblock)); |
287 | 0 | } |
288 | | |
289 | 1 | *col_names = _schema_col_names; |
290 | 1 | *col_types = _schema_col_types; |
291 | 1 | return Status::OK(); |
292 | 1 | } |
293 | | |
294 | 9 | Status NativeReader::close() { |
295 | 9 | _file_reader.reset(); |
296 | 9 | return Status::OK(); |
297 | 9 | } |
298 | | |
299 | 87 | Status NativeReader::_read_next_pblock(std::string* buff, bool* eof) { |
300 | 87 | *eof = false; |
301 | 87 | buff->clear(); |
302 | | |
303 | 87 | if (_file_reader == nullptr) { |
304 | 0 | RETURN_IF_ERROR(init_reader()); |
305 | 0 | } |
306 | | |
307 | 87 | if (_current_offset >= _file_size) { |
308 | 1 | *eof = true; |
309 | 1 | return Status::OK(); |
310 | 1 | } |
311 | | |
312 | 86 | uint64_t len = 0; |
313 | 86 | Slice len_slice(reinterpret_cast<char*>(&len), sizeof(len)); |
314 | 86 | size_t bytes_read = 0; |
315 | 86 | RETURN_IF_ERROR(_file_reader->read_at(_current_offset, len_slice, &bytes_read)); |
316 | 86 | if (bytes_read == 0) { |
317 | 0 | *eof = true; |
318 | 0 | return Status::OK(); |
319 | 0 | } |
320 | 86 | if (bytes_read != sizeof(len)) { |
321 | 0 | return Status::InternalError( |
322 | 0 | "Failed to read native block length from file {}, expect {}, " |
323 | 0 | "actual {}", |
324 | 0 | _scan_range.path, sizeof(len), bytes_read); |
325 | 0 | } |
326 | | |
327 | 86 | _current_offset += sizeof(len); |
328 | 86 | if (len == 0) { |
329 | | // Empty block, nothing to read. |
330 | 0 | *eof = (_current_offset >= _file_size); |
331 | 0 | return Status::OK(); |
332 | 0 | } |
333 | | |
334 | 86 | buff->assign(len, '\0'); |
335 | 86 | Slice data_slice(buff->data(), len); |
336 | 86 | bytes_read = 0; |
337 | 86 | RETURN_IF_ERROR(_file_reader->read_at(_current_offset, data_slice, &bytes_read)); |
338 | 86 | if (bytes_read != len) { |
339 | 0 | return Status::InternalError( |
340 | 0 | "Failed to read native block body from file {}, expect {}, " |
341 | 0 | "actual {}", |
342 | 0 | _scan_range.path, len, bytes_read); |
343 | 0 | } |
344 | | |
345 | 86 | _current_offset += len; |
346 | 86 | *eof = (_current_offset >= _file_size); |
347 | 86 | return Status::OK(); |
348 | 86 | } |
349 | | |
350 | 8 | Status NativeReader::_init_schema_from_pblock(const PBlock& pblock) { |
351 | 8 | _schema_col_names.clear(); |
352 | 8 | _schema_col_types.clear(); |
353 | | |
354 | 88 | for (const auto& pcol_meta : pblock.column_metas()) { |
355 | 88 | DataTypePtr type = make_nullable(DataTypeFactory::instance().create_data_type(pcol_meta)); |
356 | 88 | VLOG_DEBUG << "init_schema_from_pblock, name=" << pcol_meta.name() |
357 | 0 | << ", type=" << type->get_name(); |
358 | 88 | _schema_col_names.emplace_back(pcol_meta.name()); |
359 | 88 | _schema_col_types.emplace_back(type); |
360 | 88 | } |
361 | 8 | _schema_inited = true; |
362 | 8 | return Status::OK(); |
363 | 8 | } |
364 | | |
365 | | } // namespace doris |