Coverage Report

Created: 2026-06-03 07:01

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/native/native_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/native/native_reader.h"
19
20
#include <gen_cpp/data.pb.h>
21
22
#include <utility>
23
24
#include "core/block/block.h"
25
#include "format/native/native_format.h"
26
#include "io/file_factory.h"
27
#include "io/fs/buffered_reader.h"
28
#include "io/fs/file_reader.h"
29
#include "io/fs/tracing_file_reader.h"
30
#include "runtime/runtime_profile.h"
31
#include "runtime/runtime_state.h"
32
33
namespace doris {
34
35
NativeReader::NativeReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
36
                           const TFileRangeDesc& range, io::IOContext* io_ctx, RuntimeState* state)
37
9
        : _profile(profile),
38
9
          _scan_params(params),
39
9
          _scan_range(range),
40
9
          _io_ctx(io_ctx),
41
9
          _state(state) {}
42
43
NativeReader::NativeReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
44
                           const TFileRangeDesc& range,
45
                           std::shared_ptr<io::IOContext> io_ctx_holder, RuntimeState* state)
46
0
        : _profile(profile),
47
0
          _scan_params(params),
48
0
          _scan_range(range),
49
0
          _io_ctx(io_ctx_holder ? io_ctx_holder.get() : nullptr),
50
0
          _io_ctx_holder(std::move(io_ctx_holder)),
51
0
          _state(state) {}
52
53
9
NativeReader::~NativeReader() {
54
9
    (void)close();
55
9
}
56
57
namespace {
58
59
Status validate_and_consume_header(io::FileReaderSPtr file_reader, const TFileRangeDesc& range,
60
                                   int64_t* file_size, int64_t* current_offset, bool* eof,
61
9
                                   const io::IOContext* io_ctx) {
62
9
    *file_size = file_reader->size();
63
9
    *current_offset = 0;
64
9
    *eof = (*file_size == 0);
65
66
    // Validate and consume Doris Native file header.
67
    // Expected layout:
68
    // [magic bytes "DORISN1\0"][uint32_t format_version][uint64_t block_size]...
69
9
    static constexpr size_t HEADER_SIZE = sizeof(DORIS_NATIVE_MAGIC) + sizeof(uint32_t);
70
9
    if (*eof || *file_size < static_cast<int64_t>(HEADER_SIZE)) {
71
0
        return Status::InternalError(
72
0
                "invalid Doris Native file {}, file size {} is smaller than header size {}",
73
0
                range.path, *file_size, HEADER_SIZE);
74
0
    }
75
76
9
    char header[HEADER_SIZE];
77
9
    Slice header_slice(header, sizeof(header));
78
9
    size_t bytes_read = 0;
79
9
    RETURN_IF_ERROR(file_reader->read_at(0, header_slice, &bytes_read, io_ctx));
80
9
    if (bytes_read != sizeof(header)) {
81
0
        return Status::InternalError(
82
0
                "failed to read Doris Native header from file {}, expect {} bytes, got {} bytes",
83
0
                range.path, sizeof(header), bytes_read);
84
0
    }
85
86
9
    if (memcmp(header, DORIS_NATIVE_MAGIC, sizeof(DORIS_NATIVE_MAGIC)) != 0) {
87
0
        return Status::InternalError("invalid Doris Native magic header in file {}", range.path);
88
0
    }
89
90
9
    uint32_t version = 0;
91
9
    memcpy(&version, header + sizeof(DORIS_NATIVE_MAGIC), sizeof(uint32_t));
92
9
    if (version != DORIS_NATIVE_FORMAT_VERSION) {
93
0
        return Status::InternalError(
94
0
                "unsupported Doris Native format version {} in file {}, expect {}", version,
95
0
                range.path, DORIS_NATIVE_FORMAT_VERSION);
96
0
    }
97
98
9
    *current_offset = sizeof(header);
99
9
    *eof = (*file_size == *current_offset);
100
9
    return Status::OK();
101
9
}
102
103
} // namespace
104
105
88
Status NativeReader::init_reader() {
106
88
    if (_file_reader != nullptr) {
107
79
        return Status::OK();
108
79
    }
109
110
    // Create underlying file reader. For now we always use random access mode.
111
9
    io::FileSystemProperties system_properties;
112
9
    io::FileDescription file_description;
113
9
    file_description.file_size = -1;
114
9
    if (_scan_range.__isset.file_size) {
115
9
        file_description.file_size = _scan_range.file_size;
116
9
    }
117
9
    file_description.path = _scan_range.path;
118
9
    if (_scan_range.__isset.fs_name) {
119
0
        file_description.fs_name = _scan_range.fs_name;
120
0
    }
121
9
    if (_scan_range.__isset.modification_time) {
122
0
        file_description.mtime = _scan_range.modification_time;
123
9
    } else {
124
9
        file_description.mtime = 0;
125
9
    }
126
127
9
    if (_scan_range.__isset.file_type) {
128
        // For compatibility with older FE.
129
9
        system_properties.system_type = _scan_range.file_type;
130
9
    } else {
131
0
        system_properties.system_type = _scan_params.file_type;
132
0
    }
133
9
    system_properties.properties = _scan_params.properties;
134
9
    system_properties.hdfs_params = _scan_params.hdfs_params;
135
9
    if (_scan_params.__isset.broker_addresses) {
136
0
        system_properties.broker_addresses.assign(_scan_params.broker_addresses.begin(),
137
0
                                                  _scan_params.broker_addresses.end());
138
0
    }
139
140
9
    io::FileReaderOptions reader_options =
141
9
            FileFactory::get_reader_options(_state, file_description);
142
9
    auto reader_res =
143
9
            _io_ctx_holder ? io::DelegateReader::create_file_reader(
144
0
                                     _profile, system_properties, file_description, reader_options,
145
0
                                     io::DelegateReader::AccessMode::RANDOM,
146
0
                                     std::static_pointer_cast<const io::IOContext>(_io_ctx_holder))
147
9
                           : io::DelegateReader::create_file_reader(
148
9
                                     _profile, system_properties, file_description, reader_options,
149
9
                                     io::DelegateReader::AccessMode::RANDOM, _io_ctx);
150
9
    if (!reader_res.has_value()) {
151
0
        return reader_res.error();
152
0
    }
153
9
    _file_reader = reader_res.value();
154
155
9
    if (_io_ctx && _io_ctx->file_reader_stats) {
156
0
        _file_reader =
157
0
                std::make_shared<io::TracingFileReader>(_file_reader, _io_ctx->file_reader_stats);
158
0
    }
159
160
9
    RETURN_IF_ERROR(validate_and_consume_header(_file_reader, _scan_range, &_file_size,
161
9
                                                &_current_offset, &_eof, _io_ctx));
162
9
    return Status::OK();
163
9
}
164
165
91
Status NativeReader::_do_get_next_block(Block* block, size_t* read_rows, bool* eof) {
166
91
    if (_eof) {
167
5
        *read_rows = 0;
168
5
        *eof = true;
169
5
        return Status::OK();
170
5
    }
171
172
86
    RETURN_IF_ERROR(init_reader());
173
174
86
    std::string buff;
175
86
    bool local_eof = false;
176
177
    // If we have already loaded the first block for schema probing, use it first.
178
86
    if (_first_block_loaded && !_first_block_consumed) {
179
0
        buff = _first_block_buf;
180
0
        local_eof = false;
181
86
    } else {
182
86
        RETURN_IF_ERROR(_read_next_pblock(&buff, &local_eof));
183
86
    }
184
185
    // If we reach EOF and also read no data for this call, the whole file is considered finished.
186
86
    if (local_eof && buff.empty()) {
187
1
        *read_rows = 0;
188
1
        *eof = true;
189
1
        _eof = true;
190
1
        return Status::OK();
191
1
    }
192
    // If buffer is empty but we have not reached EOF yet, treat this as an error.
193
85
    if (buff.empty()) {
194
0
        return Status::InternalError("read empty native block from file {}", _scan_range.path);
195
0
    }
196
197
85
    PBlock pblock;
198
85
    if (!pblock.ParseFromArray(buff.data(), static_cast<int>(buff.size()))) {
199
0
        return Status::InternalError("Failed to parse native PBlock from file {}",
200
0
                                     _scan_range.path);
201
0
    }
202
203
    // Initialize schema from first block if not done yet.
204
85
    if (!_schema_inited) {
205
7
        RETURN_IF_ERROR(_init_schema_from_pblock(pblock));
206
7
    }
207
208
85
    size_t uncompressed_bytes = 0;
209
85
    int64_t decompress_time = 0;
210
85
    RETURN_IF_ERROR(block->deserialize(pblock, &uncompressed_bytes, &decompress_time));
211
212
    // For external file scan / TVF scenarios, unify all columns as nullable to match
213
    // GenericReader/SlotDescriptor convention. This ensures schema consistency when
214
    // some writers emit non-nullable columns.
215
635
    for (size_t i = 0; i < block->columns(); ++i) {
216
550
        auto& col_with_type = block->get_by_position(i);
217
550
        if (!col_with_type.type->is_nullable()) {
218
2
            col_with_type.column = make_nullable(col_with_type.column);
219
2
            col_with_type.type = make_nullable(col_with_type.type);
220
2
        }
221
550
    }
222
223
85
    *read_rows = block->rows();
224
85
    *eof = false;
225
226
85
    if (_first_block_loaded && !_first_block_consumed) {
227
0
        _first_block_consumed = true;
228
0
    }
229
230
    // If we reached the physical end of file, mark eof for subsequent calls.
231
85
    if (_current_offset >= _file_size) {
232
7
        _eof = true;
233
7
    }
234
235
85
    return Status::OK();
236
85
}
237
238
1
Status NativeReader::_get_columns_impl(std::unordered_map<std::string, DataTypePtr>* name_to_type) {
239
1
    RETURN_IF_ERROR(init_reader());
240
241
1
    if (!_schema_inited) {
242
        // Load first block lazily to initialize schema.
243
1
        if (!_first_block_loaded) {
244
1
            bool local_eof = false;
245
1
            RETURN_IF_ERROR(_read_next_pblock(&_first_block_buf, &local_eof));
246
            // Treat file as empty only if we reach EOF and there is no block data at all.
247
1
            if (local_eof && _first_block_buf.empty()) {
248
0
                return Status::EndOfFile("empty native file {}", _scan_range.path);
249
0
            }
250
            // Non-EOF but empty buffer means corrupted native file.
251
1
            if (_first_block_buf.empty()) {
252
0
                return Status::InternalError("first native block is empty {}", _scan_range.path);
253
0
            }
254
1
            _first_block_loaded = true;
255
1
        }
256
257
1
        PBlock pblock;
258
1
        if (!pblock.ParseFromArray(_first_block_buf.data(),
259
1
                                   static_cast<int>(_first_block_buf.size()))) {
260
0
            return Status::InternalError("Failed to parse native PBlock for schema from file {}",
261
0
                                         _scan_range.path);
262
0
        }
263
1
        RETURN_IF_ERROR(_init_schema_from_pblock(pblock));
264
1
    }
265
266
7
    for (size_t i = 0; i < _schema_col_names.size(); ++i) {
267
6
        name_to_type->emplace(_schema_col_names[i], _schema_col_types[i]);
268
6
    }
269
1
    return Status::OK();
270
1
}
271
272
0
Status NativeReader::init_schema_reader() {
273
0
    RETURN_IF_ERROR(init_reader());
274
0
    return Status::OK();
275
0
}
276
277
Status NativeReader::get_parsed_schema(std::vector<std::string>* col_names,
278
1
                                       std::vector<DataTypePtr>* col_types) {
279
1
    RETURN_IF_ERROR(init_reader());
280
281
1
    if (!_schema_inited) {
282
0
        if (!_first_block_loaded) {
283
0
            bool local_eof = false;
284
0
            RETURN_IF_ERROR(_read_next_pblock(&_first_block_buf, &local_eof));
285
            // Treat file as empty only if we reach EOF and there is no block data at all.
286
0
            if (local_eof && _first_block_buf.empty()) {
287
0
                return Status::EndOfFile("empty native file {}", _scan_range.path);
288
0
            }
289
            // Non-EOF but empty buffer means corrupted native file.
290
0
            if (_first_block_buf.empty()) {
291
0
                return Status::InternalError("first native block is empty {}", _scan_range.path);
292
0
            }
293
0
            _first_block_loaded = true;
294
0
        }
295
296
0
        PBlock pblock;
297
0
        if (!pblock.ParseFromArray(_first_block_buf.data(),
298
0
                                   static_cast<int>(_first_block_buf.size()))) {
299
0
            return Status::InternalError("Failed to parse native PBlock for schema from file {}",
300
0
                                         _scan_range.path);
301
0
        }
302
0
        RETURN_IF_ERROR(_init_schema_from_pblock(pblock));
303
0
    }
304
305
1
    *col_names = _schema_col_names;
306
1
    *col_types = _schema_col_types;
307
1
    return Status::OK();
308
1
}
309
310
9
Status NativeReader::close() {
311
9
    _file_reader.reset();
312
9
    return Status::OK();
313
9
}
314
315
87
Status NativeReader::_read_next_pblock(std::string* buff, bool* eof) {
316
87
    *eof = false;
317
87
    buff->clear();
318
319
87
    if (_file_reader == nullptr) {
320
0
        RETURN_IF_ERROR(init_reader());
321
0
    }
322
323
87
    if (_current_offset >= _file_size) {
324
1
        *eof = true;
325
1
        return Status::OK();
326
1
    }
327
328
86
    uint64_t len = 0;
329
86
    Slice len_slice(reinterpret_cast<char*>(&len), sizeof(len));
330
86
    size_t bytes_read = 0;
331
86
    RETURN_IF_ERROR(_file_reader->read_at(_current_offset, len_slice, &bytes_read, _io_ctx));
332
86
    if (bytes_read == 0) {
333
0
        *eof = true;
334
0
        return Status::OK();
335
0
    }
336
86
    if (bytes_read != sizeof(len)) {
337
0
        return Status::InternalError(
338
0
                "Failed to read native block length from file {}, expect {}, "
339
0
                "actual {}",
340
0
                _scan_range.path, sizeof(len), bytes_read);
341
0
    }
342
343
86
    _current_offset += sizeof(len);
344
86
    if (len == 0) {
345
        // Empty block, nothing to read.
346
0
        *eof = (_current_offset >= _file_size);
347
0
        return Status::OK();
348
0
    }
349
350
86
    buff->assign(len, '\0');
351
86
    Slice data_slice(buff->data(), len);
352
86
    bytes_read = 0;
353
86
    RETURN_IF_ERROR(_file_reader->read_at(_current_offset, data_slice, &bytes_read, _io_ctx));
354
86
    if (bytes_read != len) {
355
0
        return Status::InternalError(
356
0
                "Failed to read native block body from file {}, expect {}, "
357
0
                "actual {}",
358
0
                _scan_range.path, len, bytes_read);
359
0
    }
360
361
86
    _current_offset += len;
362
86
    *eof = (_current_offset >= _file_size);
363
86
    return Status::OK();
364
86
}
365
366
8
Status NativeReader::_init_schema_from_pblock(const PBlock& pblock) {
367
8
    _schema_col_names.clear();
368
8
    _schema_col_types.clear();
369
370
88
    for (const auto& pcol_meta : pblock.column_metas()) {
371
88
        DataTypePtr type = make_nullable(DataTypeFactory::instance().create_data_type(pcol_meta));
372
88
        VLOG_DEBUG << "init_schema_from_pblock, name=" << pcol_meta.name()
373
0
                   << ", type=" << type->get_name();
374
88
        _schema_col_names.emplace_back(pcol_meta.name());
375
88
        _schema_col_types.emplace_back(type);
376
88
    }
377
8
    _schema_inited = true;
378
8
    return Status::OK();
379
8
}
380
381
} // namespace doris