Coverage Report

Created: 2026-06-25 13:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format_v2/file_reader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <algorithm>
21
#include <cstddef>
22
#include <cstdint>
23
#include <map>
24
#include <memory>
25
#include <string>
26
#include <utility>
27
#include <vector>
28
29
#include "common/status.h"
30
#include "core/data_type/data_type.h"
31
#include "core/field.h"
32
#include "exprs/vexpr_fwd.h"
33
#include "format_v2/column_data.h"
34
#include "gen_cpp/PlanNodes_types.h"
35
#include "io/file_factory.h"
36
#include "io/fs/file_reader_writer_fwd.h"
37
38
namespace doris {
39
class Block;
40
class ColumnPredicate;
41
struct ConditionCacheContext;
42
43
namespace io {
44
struct IOContext;
45
} // namespace io
46
} // namespace doris
47
48
namespace doris::format {
49
50
class TableColumnMapper;
51
struct TableColumnMapperOptions;
52
53
// Struct-only nested predicate target used by file-layer pruning.
54
//
55
// This intentionally models only a STRUCT field chain. LIST/MAP/repeated predicates need explicit
56
// quantified semantics, so they must not be encoded here.
57
struct FileStructPredicateTarget {
58
    int32_t file_local_id = -1;
59
    std::string file_child_name;
60
    std::unique_ptr<FileStructPredicateTarget> child;
61
62
    FileStructPredicateTarget() = default;
63
    FileStructPredicateTarget(int32_t local_id, std::string child_name,
64
                              std::unique_ptr<FileStructPredicateTarget> nested_child = nullptr)
65
20
            : file_local_id(local_id),
66
20
              file_child_name(std::move(child_name)),
67
20
              child(std::move(nested_child)) {}
68
    FileStructPredicateTarget(const FileStructPredicateTarget& other);
69
    FileStructPredicateTarget& operator=(const FileStructPredicateTarget& other);
70
    FileStructPredicateTarget(FileStructPredicateTarget&& other) noexcept = default;
71
    FileStructPredicateTarget& operator=(FileStructPredicateTarget&& other) noexcept = default;
72
};
73
74
struct FileNestedPredicateTarget {
75
    LocalColumnId file_column_id = LocalColumnId::invalid();
76
    // Null means the predicate targets the top-level primitive column itself.
77
    std::unique_ptr<FileStructPredicateTarget> struct_target;
78
79
136
    FileNestedPredicateTarget() = default;
80
19
    explicit FileNestedPredicateTarget(LocalColumnId column_id) : file_column_id(column_id) {}
81
    FileNestedPredicateTarget(LocalColumnId column_id,
82
                              std::unique_ptr<FileStructPredicateTarget> target)
83
18
            : file_column_id(column_id), struct_target(std::move(target)) {}
84
    FileNestedPredicateTarget(const FileNestedPredicateTarget& other);
85
    FileNestedPredicateTarget& operator=(const FileNestedPredicateTarget& other);
86
101
    FileNestedPredicateTarget(FileNestedPredicateTarget&& other) noexcept = default;
87
37
    FileNestedPredicateTarget& operator=(FileNestedPredicateTarget&& other) noexcept = default;
88
89
353
    bool is_valid() const { return file_column_id.is_valid(); }
90
};
91
92
// File-local single-column predicates for file-layer pruning, such as min/max, page index,
93
// dictionary and bloom filter.
94
//
95
// Predicates must all belong to target.file_column_id. target.struct_target points to the nested
96
// primitive leaf under that root; null means the top-level column itself is the primitive leaf.
97
// These predicates are pruning hints only and are not row-level conjuncts.
98
struct FileColumnPredicateFilter {
99
    FileNestedPredicateTarget target;
100
    // Compatibility fields for call sites and tests that still construct pruning filters directly.
101
    // New mapper code should fill target; file readers consume target first and only fall back to
102
    // these fields while the API migration is in progress.
103
    LocalColumnId file_column_id = LocalColumnId::invalid();
104
    std::vector<int32_t> file_child_id_path;
105
    std::vector<std::shared_ptr<ColumnPredicate>> predicates;
106
107
    LocalColumnId effective_file_column_id() const;
108
    std::vector<int32_t> effective_file_child_id_path() const;
109
    bool same_target_as(const FileColumnPredicateFilter& other) const;
110
    std::string debug_string() const;
111
};
112
113
enum class FileFormat {
114
    PARQUET,
115
    ORC,
116
    CSV,
117
    JSON,
118
    TEXT,
119
    JNI,
120
};
121
122
// 通用文件层 scan 请求。
123
// 该结构描述所有文件格式都可以共享的 file-local 读取输入。这里不出现 table/global
124
// schema。所有 schema change、filter localization、default/generated/partition
125
// 列都应在 table 层完成。
126
struct FileScanRequest {
127
254
    virtual ~FileScanRequest() = default;
128
129
    std::string debug_string() const;
130
131
    // Columns that must be read before row-level filtering. They are materialized eagerly because
132
    // conjuncts/delete_conjuncts need them to decide the selected rows.
133
    std::vector<LocalColumnIndex> predicate_columns;
134
    // Columns read after row-level filtering. Predicate columns are also available for output and
135
    // should not be duplicated here.
136
    std::vector<LocalColumnIndex> non_predicate_columns;
137
    // file-local column id -> file-local output block position.
138
    std::map<LocalColumnId, LocalIndex> local_positions;
139
    // Row-level filters converted to file-local expressions from table-level predicates.
140
    VExprContextSPtrs conjuncts;
141
    // Delete predicates converted to file-local expressions.
142
    VExprContextSPtrs delete_conjuncts;
143
    // Single-column predicates used only for file-layer pruning, such as statistics, page index,
144
    // dictionary and bloom filter. They must not be used for batch row-level filtering.
145
    std::vector<FileColumnPredicateFilter> column_predicate_filters;
146
};
147
148
// Helper for constructing the scan-column layout in FileScanRequest.
149
//
150
// FileScanRequest keeps predicate and non-predicate columns separate because columnar readers such
151
// as Parquet can read predicate columns first, filter rows, and then lazily read the remaining
152
// projected columns. The two lists still share one file-local output block, whose positions are
153
// stored in local_positions. This builder centralizes the mechanical rules for that shared layout:
154
// - each root file column gets one stable block position;
155
// - predicate columns dominate non-predicate columns because they are already returned in the file
156
//   block and can be reused for final materialization;
157
// - repeated nested projections for the same root are merged instead of duplicated.
158
//
159
// TableColumnMapper should still own table-to-file semantic resolution. This helper only owns the
160
// FileScanRequest layout contract after a file-local projection has been produced.
161
class FileScanRequestBuilder {
162
public:
163
176
    explicit FileScanRequestBuilder(FileScanRequest* request) : _request(request) {
164
176
        DORIS_CHECK(_request != nullptr);
165
176
    }
166
167
75
    Status add_predicate_column(LocalColumnIndex projection) {
168
75
        return _add_column(std::move(projection), &_request->predicate_columns,
169
75
                           /*is_predicate_column=*/true);
170
75
    }
171
172
104
    Status add_non_predicate_column(LocalColumnIndex projection) {
173
104
        return _add_column(std::move(projection), &_request->non_predicate_columns,
174
104
                           /*is_predicate_column=*/false);
175
104
    }
176
177
14
    Status add_predicate_column(LocalColumnId column_id) {
178
14
        return add_predicate_column(LocalColumnIndex::top_level(column_id));
179
14
    }
180
181
13
    Status add_non_predicate_column(LocalColumnId column_id) {
182
13
        return add_non_predicate_column(LocalColumnIndex::top_level(column_id));
183
13
    }
184
185
private:
186
168
    static LocalIndex _next_block_position(const FileScanRequest& request) {
187
168
        size_t next_position = 0;
188
168
        for (const auto& [_, block_position] : request.local_positions) {
189
65
            next_position = std::max(next_position, block_position.value() + 1);
190
65
        }
191
168
        return LocalIndex(next_position);
192
168
    }
193
194
231
    static void _sort_projection_children_by_file_id(LocalColumnIndex* projection) {
195
231
        DORIS_CHECK(projection != nullptr);
196
231
        if (projection->project_all_children) {
197
197
            return;
198
197
        }
199
50
        for (auto& child : projection->children) {
200
50
            _sort_projection_children_by_file_id(&child);
201
50
        }
202
34
        std::ranges::sort(projection->children,
203
34
                          [](const LocalColumnIndex& lhs, const LocalColumnIndex& rhs) {
204
23
                              return lhs.local_id() < rhs.local_id();
205
23
                          });
206
34
    }
207
208
    Status _add_column(LocalColumnIndex projection, std::vector<LocalColumnIndex>* scan_columns,
209
179
                       bool is_predicate_column) {
210
179
        DORIS_CHECK(scan_columns != nullptr);
211
179
        const auto file_column_id = projection.column_id();
212
179
        DORIS_CHECK(file_column_id != LocalColumnId::invalid());
213
179
        if (!is_predicate_column &&
214
179
            std::ranges::find_if(_request->predicate_columns, [&](const LocalColumnIndex& p) {
215
5
                return p.column_id() == file_column_id;
216
5
            }) != _request->predicate_columns.end()) {
217
2
            return Status::OK();
218
2
        }
219
177
        if (!_request->local_positions.contains(file_column_id)) {
220
168
            _request->local_positions.emplace(file_column_id, _next_block_position(*_request));
221
168
        }
222
223
177
        _sort_projection_children_by_file_id(&projection);
224
177
        auto existing_projection_it = std::ranges::find_if(
225
177
                *scan_columns,
226
177
                [&](const LocalColumnIndex& p) { return p.column_id() == file_column_id; });
227
177
        if (existing_projection_it == scan_columns->end()) {
228
173
            scan_columns->push_back(std::move(projection));
229
173
        } else {
230
4
            RETURN_IF_ERROR(merge_local_column_index(&*existing_projection_it, projection));
231
4
            _sort_projection_children_by_file_id(&*existing_projection_it);
232
4
        }
233
234
177
        if (is_predicate_column) {
235
75
            auto it = std::ranges::find_if(
236
75
                    _request->non_predicate_columns,
237
75
                    [&](const LocalColumnIndex& p) { return p.column_id() == file_column_id; });
238
75
            if (it != _request->non_predicate_columns.end()) {
239
4
                _request->non_predicate_columns.erase(it);
240
4
            }
241
75
        }
242
177
        return Status::OK();
243
177
    }
244
245
    FileScanRequest* _request = nullptr;
246
};
247
248
struct FileAggregateRequest {
249
    struct Column {
250
        // File-local projection for the aggregate column. For nested MIN/MAX, this points to the
251
        // single primitive leaf that can be represented by file statistics.
252
        LocalColumnIndex projection;
253
    };
254
255
    TPushAggOp::type agg_type = TPushAggOp::type::NONE;
256
    std::vector<Column> columns;
257
};
258
259
struct FileAggregateResult {
260
    struct Column {
261
        // Mirrors FileAggregateRequest::Column::projection so TableReader can put the returned
262
        // aggregate value back into the matching projected nested shape.
263
        LocalColumnIndex projection;
264
        bool has_min = false;
265
        bool has_max = false;
266
        Field min_value;
267
        Field max_value;
268
    };
269
270
    int64_t count = 0;
271
    std::vector<Column> columns;
272
};
273
274
// 文件物理读取层通用接口。
275
// 该接口只描述 file-local schema、file-local scan request 和 file-local block。
276
// TableReader/IcebergTableReader 可以通过它组合不同文件格式 reader。
277
/**
278
 *                                +-----> get_schema() -----------------+
279
 * FileReader() -----> init() ----|                                      -----> close()
280
 *                                +-----> open() -----> get_block() ----+
281
 */
282
class FileReader {
283
public:
284
    struct ReaderStatistics {
285
        int32_t filtered_row_groups = 0;
286
        int32_t filtered_row_groups_by_min_max = 0;
287
        int32_t filtered_row_groups_by_bloom_filter = 0;
288
        int32_t read_row_groups = 0;
289
        int64_t filtered_group_rows = 0;
290
        int64_t filtered_page_rows = 0;
291
        int64_t lazy_read_filtered_rows = 0;
292
        int64_t read_rows = 0;
293
        int64_t filtered_bytes = 0;
294
        int64_t column_read_time = 0;
295
        int64_t parse_meta_time = 0;
296
        int64_t parse_footer_time = 0;
297
        int64_t file_footer_read_calls = 0;
298
        int64_t file_footer_hit_cache = 0;
299
        int64_t file_reader_create_time = 0;
300
        int64_t open_file_num = 0;
301
        int64_t row_group_filter_time = 0;
302
        int64_t page_index_filter_time = 0;
303
        int64_t read_page_index_time = 0;
304
        int64_t parse_page_index_time = 0;
305
        int64_t predicate_filter_time = 0;
306
        int64_t dict_filter_rewrite_time = 0;
307
        int64_t bloom_filter_read_time = 0;
308
    };
309
310
    FileReader(std::shared_ptr<io::FileSystemProperties>& system_properties,
311
               std::unique_ptr<io::FileDescription>& file_description,
312
               std::shared_ptr<io::IOContext> io_ctx, RuntimeProfile* profile)
313
175
            : _system_properties(system_properties),
314
175
              _file_description(std::move(file_description)),
315
175
              _io_ctx(io_ctx),
316
175
              _profile(profile) {}
317
175
    virtual ~FileReader() = default;
318
319
    // Initialize file reader and parse file metadata.
320
    virtual Status init(RuntimeState* state);
321
322
    // Get semantic file-local schema from file metadata. The file schema is determined by file
323
    // format and file content, and does not contain table/global schema semantics. A file reader may
324
    // expose raw file identifiers, such as Parquet field_id, through ColumnDefinition::identifier,
325
    // but it must not interpret table-format semantics such as Iceberg name mapping,
326
    // default/generated columns, or partition columns. File-format physical wrappers should be
327
    // normalized away before exposing this schema; for example, Parquet MAP is exposed as key/value
328
    // children rather than key_value/entry.
329
    //
330
    // Doris plans external-table scan types as nullable, including all nested children of complex
331
    // types. This protects Doris from illegal or inconsistent values produced by external systems.
332
    // Therefore every ColumnDefinition::type returned here must be nullable. Complex types must
333
    // also expose nullable child types recursively, even if the physical file marks those fields as
334
    // required.
335
    //
336
    // This method can only be called after init() successfully, but does not require open() to be
337
    // called.
338
    virtual Status get_schema(std::vector<ColumnDefinition>* file_schema) const = 0;
339
340
    // Create the mapper that matches this reader's scan-request capabilities. TableReader still
341
    // owns table-format semantics such as BY_NAME/BY_FIELD_ID/BY_INDEX, partition values and
342
    // default expressions; the FileReader only chooses whether file-local requests support columnar
343
    // lazy materialization/pruning or must materialize one flat list of required columns.
344
    virtual std::unique_ptr<TableColumnMapper> create_column_mapper(
345
            TableColumnMapperOptions options) const;
346
347
    // Open the file reader with file-local scan request. The file reader should initialize its internal state according to the request, but does not need to interpret table/global schema semantics. For example, all schema change, filter localization, default/generated/partition columns should be handled in table reader layer. This method can only be called after init() successfully.
348
161
    virtual Status open(std::shared_ptr<FileScanRequest> request) {
349
161
        _request = std::move(request);
350
161
        return Status::OK();
351
161
    }
352
353
    // 读取下一批 file-local block。
354
    // 该方法只能在 open(FileScanRequest) 成功后调用。
355
    // file_block 的列顺序和类型必须遵守 FileScanRequest,而不是 table/global schema。
356
    // rows 返回当前批次输出行数;eof 表示当前文件 reader 是否读完;多文件切换由
357
    // TableReader 负责。
358
0
    virtual Status get_block(Block* file_block, size_t* rows, bool* eof) {
359
        // stub 默认立即 EOF。
360
0
        if (rows != nullptr) {
361
0
            *rows = 0;
362
0
        }
363
0
        if (eof != nullptr) {
364
0
            *eof = true;
365
0
        }
366
0
        _eof = true;
367
0
        return Status::OK();
368
0
    }
369
370
    virtual Status get_aggregate_result(const FileAggregateRequest& request,
371
0
                                        FileAggregateResult* result) {
372
0
        return Status::NotSupported("FileReader does not support aggregate pushdown");
373
0
    }
374
375
    // Condition cache is managed by TableReader and consumed by physical file readers.
376
    // On cache HIT, readers may skip granules whose cached bit is false before doing column IO.
377
    // On cache MISS, readers mark a granule true when row-level predicates keep at least one row
378
    // in that granule. Readers that cannot map batch rows to stable file-global row ids should
379
    // keep the default no-op implementation.
380
0
    virtual void set_condition_cache_context(std::shared_ptr<ConditionCacheContext> ctx) {}
381
382
    // Total rows covered by this physical reader. TableReader uses it to pre-size the miss bitmap.
383
    // Readers should return 0 if the metadata is unavailable or the row coordinate is unstable.
384
0
    virtual int64_t get_total_rows() const { return 0; }
385
386
    // 关闭当前物理文件 reader 并释放文件层状态。
387
    // 该方法不处理 table-level delete/finalize 状态,后者由 TableReader 子类管理。
388
67
    virtual Status close() {
389
67
        _file_reader.reset();
390
67
        _tracing_file_reader.reset();
391
67
        _io_ctx.reset();
392
67
        _eof = true;
393
67
        return Status::OK();
394
67
    }
395
396
protected:
397
0
    virtual void _init_profile() {}
398
399
    io::FileReaderSPtr _file_reader;
400
    // _tracing_file_reader wraps _file_reader.
401
    // _file_reader is original file reader.
402
    // _tracing_file_reader is tracing file reader with io context.
403
    // If io_ctx is null, _tracing_file_reader will be the same as file_reader.
404
    io::FileReaderSPtr _tracing_file_reader = nullptr;
405
    std::shared_ptr<FileScanRequest> _request;
406
    bool _eof = true;
407
    ReaderStatistics _reader_statistics;
408
    std::shared_ptr<io::FileSystemProperties> _system_properties;
409
    std::unique_ptr<io::FileDescription> _file_description;
410
    std::shared_ptr<io::IOContext> _io_ctx;
411
    RuntimeProfile* _profile = nullptr;
412
};
413
414
} // namespace doris::format