Coverage Report

Created: 2026-03-13 03:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/native/native_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/native/native_reader.h"
19
20
#include <gen_cpp/data.pb.h>
21
22
#include "core/block/block.h"
23
#include "format/native/native_format.h"
24
#include "io/file_factory.h"
25
#include "io/fs/buffered_reader.h"
26
#include "io/fs/file_reader.h"
27
#include "io/fs/tracing_file_reader.h"
28
#include "runtime/runtime_profile.h"
29
#include "runtime/runtime_state.h"
30
31
namespace doris {
32
33
#include "common/compile_check_begin.h"
34
35
NativeReader::NativeReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
36
                           const TFileRangeDesc& range, io::IOContext* io_ctx, RuntimeState* state)
37
14
        : _profile(profile),
38
14
          _scan_params(params),
39
14
          _scan_range(range),
40
14
          _io_ctx(io_ctx),
41
14
          _state(state) {}
42
43
14
NativeReader::~NativeReader() {
44
14
    (void)close();
45
14
}
46
47
namespace {
48
49
Status validate_and_consume_header(io::FileReaderSPtr file_reader, const TFileRangeDesc& range,
50
14
                                   int64_t* file_size, int64_t* current_offset, bool* eof) {
51
14
    *file_size = file_reader->size();
52
14
    *current_offset = 0;
53
14
    *eof = (*file_size == 0);
54
55
    // Validate and consume Doris Native file header.
56
    // Expected layout:
57
    // [magic bytes "DORISN1\0"][uint32_t format_version][uint64_t block_size]...
58
14
    static constexpr size_t HEADER_SIZE = sizeof(DORIS_NATIVE_MAGIC) + sizeof(uint32_t);
59
14
    if (*eof || *file_size < static_cast<int64_t>(HEADER_SIZE)) {
60
0
        return Status::InternalError(
61
0
                "invalid Doris Native file {}, file size {} is smaller than header size {}",
62
0
                range.path, *file_size, HEADER_SIZE);
63
0
    }
64
65
14
    char header[HEADER_SIZE];
66
14
    Slice header_slice(header, sizeof(header));
67
14
    size_t bytes_read = 0;
68
14
    RETURN_IF_ERROR(file_reader->read_at(0, header_slice, &bytes_read));
69
14
    if (bytes_read != sizeof(header)) {
70
0
        return Status::InternalError(
71
0
                "failed to read Doris Native header from file {}, expect {} bytes, got {} bytes",
72
0
                range.path, sizeof(header), bytes_read);
73
0
    }
74
75
14
    if (memcmp(header, DORIS_NATIVE_MAGIC, sizeof(DORIS_NATIVE_MAGIC)) != 0) {
76
0
        return Status::InternalError("invalid Doris Native magic header in file {}", range.path);
77
0
    }
78
79
14
    uint32_t version = 0;
80
14
    memcpy(&version, header + sizeof(DORIS_NATIVE_MAGIC), sizeof(uint32_t));
81
14
    if (version != DORIS_NATIVE_FORMAT_VERSION) {
82
0
        return Status::InternalError(
83
0
                "unsupported Doris Native format version {} in file {}, expect {}", version,
84
0
                range.path, DORIS_NATIVE_FORMAT_VERSION);
85
0
    }
86
87
14
    *current_offset = sizeof(header);
88
14
    *eof = (*file_size == *current_offset);
89
14
    return Status::OK();
90
14
}
91
92
} // namespace
93
94
108
Status NativeReader::init_reader() {
95
108
    if (_file_reader != nullptr) {
96
94
        return Status::OK();
97
94
    }
98
99
    // Create underlying file reader. For now we always use random access mode.
100
14
    io::FileSystemProperties system_properties;
101
14
    io::FileDescription file_description;
102
14
    file_description.file_size = -1;
103
14
    if (_scan_range.__isset.file_size) {
104
14
        file_description.file_size = _scan_range.file_size;
105
14
    }
106
14
    file_description.path = _scan_range.path;
107
14
    if (_scan_range.__isset.fs_name) {
108
0
        file_description.fs_name = _scan_range.fs_name;
109
0
    }
110
14
    if (_scan_range.__isset.modification_time) {
111
5
        file_description.mtime = _scan_range.modification_time;
112
9
    } else {
113
9
        file_description.mtime = 0;
114
9
    }
115
116
14
    if (_scan_range.__isset.file_type) {
117
        // For compatibility with older FE.
118
13
        system_properties.system_type = _scan_range.file_type;
119
13
    } else {
120
1
        system_properties.system_type = _scan_params.file_type;
121
1
    }
122
14
    system_properties.properties = _scan_params.properties;
123
14
    system_properties.hdfs_params = _scan_params.hdfs_params;
124
14
    if (_scan_params.__isset.broker_addresses) {
125
1
        system_properties.broker_addresses.assign(_scan_params.broker_addresses.begin(),
126
1
                                                  _scan_params.broker_addresses.end());
127
1
    }
128
129
14
    io::FileReaderOptions reader_options =
130
14
            FileFactory::get_reader_options(_state, file_description);
131
14
    auto reader_res = io::DelegateReader::create_file_reader(
132
14
            _profile, system_properties, file_description, reader_options,
133
14
            io::DelegateReader::AccessMode::RANDOM, _io_ctx);
134
14
    if (!reader_res.has_value()) {
135
0
        return reader_res.error();
136
0
    }
137
14
    _file_reader = reader_res.value();
138
139
14
    if (_io_ctx) {
140
5
        _file_reader =
141
5
                std::make_shared<io::TracingFileReader>(_file_reader, _io_ctx->file_reader_stats);
142
5
    }
143
144
14
    RETURN_IF_ERROR(validate_and_consume_header(_file_reader, _scan_range, &_file_size,
145
14
                                                &_current_offset, &_eof));
146
14
    return Status::OK();
147
14
}
148
149
104
Status NativeReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
150
104
    if (_eof) {
151
8
        *read_rows = 0;
152
8
        *eof = true;
153
8
        return Status::OK();
154
8
    }
155
156
96
    RETURN_IF_ERROR(init_reader());
157
158
96
    std::string buff;
159
96
    bool local_eof = false;
160
161
    // If we have already loaded the first block for schema probing, use it first.
162
96
    if (_first_block_loaded && !_first_block_consumed) {
163
3
        buff = _first_block_buf;
164
3
        local_eof = false;
165
93
    } else {
166
93
        RETURN_IF_ERROR(_read_next_pblock(&buff, &local_eof));
167
93
    }
168
169
    // If we reach EOF and also read no data for this call, the whole file is considered finished.
170
96
    if (local_eof && buff.empty()) {
171
1
        *read_rows = 0;
172
1
        *eof = true;
173
1
        _eof = true;
174
1
        return Status::OK();
175
1
    }
176
    // If buffer is empty but we have not reached EOF yet, treat this as an error.
177
95
    if (buff.empty()) {
178
0
        return Status::InternalError("read empty native block from file {}", _scan_range.path);
179
0
    }
180
181
95
    PBlock pblock;
182
95
    if (!pblock.ParseFromArray(buff.data(), static_cast<int>(buff.size()))) {
183
0
        return Status::InternalError("Failed to parse native PBlock from file {}",
184
0
                                     _scan_range.path);
185
0
    }
186
187
    // Initialize schema from first block if not done yet.
188
95
    if (!_schema_inited) {
189
7
        RETURN_IF_ERROR(_init_schema_from_pblock(pblock));
190
7
    }
191
192
95
    size_t uncompressed_bytes = 0;
193
95
    int64_t decompress_time = 0;
194
95
    RETURN_IF_ERROR(block->deserialize(pblock, &uncompressed_bytes, &decompress_time));
195
196
    // For external file scan / TVF scenarios, unify all columns as nullable to match
197
    // GenericReader/SlotDescriptor convention. This ensures schema consistency when
198
    // some writers emit non-nullable columns.
199
677
    for (size_t i = 0; i < block->columns(); ++i) {
200
582
        auto& col_with_type = block->get_by_position(i);
201
582
        if (!col_with_type.type->is_nullable()) {
202
16
            col_with_type.column = make_nullable(col_with_type.column);
203
16
            col_with_type.type = make_nullable(col_with_type.type);
204
16
        }
205
582
    }
206
207
95
    *read_rows = block->rows();
208
95
    *eof = false;
209
210
95
    if (_first_block_loaded && !_first_block_consumed) {
211
3
        _first_block_consumed = true;
212
3
    }
213
214
    // If we reached the physical end of file, mark eof for subsequent calls.
215
95
    if (_current_offset >= _file_size) {
216
10
        _eof = true;
217
10
    }
218
219
95
    return Status::OK();
220
95
}
221
222
Status NativeReader::get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type,
223
4
                                 std::unordered_set<std::string>* missing_cols) {
224
4
    missing_cols->clear();
225
4
    RETURN_IF_ERROR(init_reader());
226
227
4
    if (!_schema_inited) {
228
        // Load first block lazily to initialize schema.
229
4
        if (!_first_block_loaded) {
230
4
            bool local_eof = false;
231
4
            RETURN_IF_ERROR(_read_next_pblock(&_first_block_buf, &local_eof));
232
            // Treat file as empty only if we reach EOF and there is no block data at all.
233
4
            if (local_eof && _first_block_buf.empty()) {
234
0
                return Status::EndOfFile("empty native file {}", _scan_range.path);
235
0
            }
236
            // Non-EOF but empty buffer means corrupted native file.
237
4
            if (_first_block_buf.empty()) {
238
0
                return Status::InternalError("first native block is empty {}", _scan_range.path);
239
0
            }
240
4
            _first_block_loaded = true;
241
4
        }
242
243
4
        PBlock pblock;
244
4
        if (!pblock.ParseFromArray(_first_block_buf.data(),
245
4
                                   static_cast<int>(_first_block_buf.size()))) {
246
0
            return Status::InternalError("Failed to parse native PBlock for schema from file {}",
247
0
                                         _scan_range.path);
248
0
        }
249
4
        RETURN_IF_ERROR(_init_schema_from_pblock(pblock));
250
4
    }
251
252
22
    for (size_t i = 0; i < _schema_col_names.size(); ++i) {
253
18
        name_to_type->emplace(_schema_col_names[i], _schema_col_types[i]);
254
18
    }
255
4
    return Status::OK();
256
4
}
257
258
2
Status NativeReader::init_schema_reader() {
259
2
    RETURN_IF_ERROR(init_reader());
260
2
    return Status::OK();
261
2
}
262
263
Status NativeReader::get_parsed_schema(std::vector<std::string>* col_names,
264
3
                                       std::vector<DataTypePtr>* col_types) {
265
3
    RETURN_IF_ERROR(init_reader());
266
267
3
    if (!_schema_inited) {
268
2
        if (!_first_block_loaded) {
269
2
            bool local_eof = false;
270
2
            RETURN_IF_ERROR(_read_next_pblock(&_first_block_buf, &local_eof));
271
            // Treat file as empty only if we reach EOF and there is no block data at all.
272
2
            if (local_eof && _first_block_buf.empty()) {
273
0
                return Status::EndOfFile("empty native file {}", _scan_range.path);
274
0
            }
275
            // Non-EOF but empty buffer means corrupted native file.
276
2
            if (_first_block_buf.empty()) {
277
0
                return Status::InternalError("first native block is empty {}", _scan_range.path);
278
0
            }
279
2
            _first_block_loaded = true;
280
2
        }
281
282
2
        PBlock pblock;
283
2
        if (!pblock.ParseFromArray(_first_block_buf.data(),
284
2
                                   static_cast<int>(_first_block_buf.size()))) {
285
0
            return Status::InternalError("Failed to parse native PBlock for schema from file {}",
286
0
                                         _scan_range.path);
287
0
        }
288
2
        RETURN_IF_ERROR(_init_schema_from_pblock(pblock));
289
2
    }
290
291
3
    *col_names = _schema_col_names;
292
3
    *col_types = _schema_col_types;
293
3
    return Status::OK();
294
3
}
295
296
17
Status NativeReader::close() {
297
17
    _file_reader.reset();
298
17
    return Status::OK();
299
17
}
300
301
99
Status NativeReader::_read_next_pblock(std::string* buff, bool* eof) {
302
99
    *eof = false;
303
99
    buff->clear();
304
305
99
    if (_file_reader == nullptr) {
306
0
        RETURN_IF_ERROR(init_reader());
307
0
    }
308
309
99
    if (_current_offset >= _file_size) {
310
1
        *eof = true;
311
1
        return Status::OK();
312
1
    }
313
314
98
    uint64_t len = 0;
315
98
    Slice len_slice(reinterpret_cast<char*>(&len), sizeof(len));
316
98
    size_t bytes_read = 0;
317
98
    RETURN_IF_ERROR(_file_reader->read_at(_current_offset, len_slice, &bytes_read));
318
98
    if (bytes_read == 0) {
319
0
        *eof = true;
320
0
        return Status::OK();
321
0
    }
322
98
    if (bytes_read != sizeof(len)) {
323
0
        return Status::InternalError(
324
0
                "Failed to read native block length from file {}, expect {}, "
325
0
                "actual {}",
326
0
                _scan_range.path, sizeof(len), bytes_read);
327
0
    }
328
329
98
    _current_offset += sizeof(len);
330
98
    if (len == 0) {
331
        // Empty block, nothing to read.
332
0
        *eof = (_current_offset >= _file_size);
333
0
        return Status::OK();
334
0
    }
335
336
98
    buff->assign(len, '\0');
337
98
    Slice data_slice(buff->data(), len);
338
98
    bytes_read = 0;
339
98
    RETURN_IF_ERROR(_file_reader->read_at(_current_offset, data_slice, &bytes_read));
340
98
    if (bytes_read != len) {
341
0
        return Status::InternalError(
342
0
                "Failed to read native block body from file {}, expect {}, "
343
0
                "actual {}",
344
0
                _scan_range.path, len, bytes_read);
345
0
    }
346
347
98
    _current_offset += len;
348
98
    *eof = (_current_offset >= _file_size);
349
98
    return Status::OK();
350
98
}
351
352
13
Status NativeReader::_init_schema_from_pblock(const PBlock& pblock) {
353
13
    _schema_col_names.clear();
354
13
    _schema_col_types.clear();
355
356
110
    for (const auto& pcol_meta : pblock.column_metas()) {
357
110
        DataTypePtr type = make_nullable(DataTypeFactory::instance().create_data_type(pcol_meta));
358
110
        VLOG_DEBUG << "init_schema_from_pblock, name=" << pcol_meta.name()
359
0
                   << ", type=" << type->get_name();
360
110
        _schema_col_names.emplace_back(pcol_meta.name());
361
110
        _schema_col_types.emplace_back(type);
362
110
    }
363
13
    _schema_inited = true;
364
13
    return Status::OK();
365
13
}
366
367
#include "common/compile_check_end.h"
368
369
} // namespace doris