Coverage Report

Created: 2026-06-27 16:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format_v2/file_reader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <algorithm>
21
#include <cstddef>
22
#include <cstdint>
23
#include <map>
24
#include <memory>
25
#include <string>
26
#include <utility>
27
#include <vector>
28
29
#include "common/status.h"
30
#include "core/data_type/data_type.h"
31
#include "core/field.h"
32
#include "exprs/vexpr_fwd.h"
33
#include "format_v2/column_data.h"
34
#include "gen_cpp/PlanNodes_types.h"
35
#include "io/file_factory.h"
36
#include "io/fs/file_reader_writer_fwd.h"
37
38
namespace doris {
39
class Block;
40
class ColumnPredicate;
41
struct ConditionCacheContext;
42
43
namespace io {
44
struct IOContext;
45
} // namespace io
46
} // namespace doris
47
48
namespace doris::format {
49
50
class TableColumnMapper;
51
struct TableColumnMapperOptions;
52
53
// Struct-only nested predicate target used by file-layer pruning.
54
//
55
// This intentionally models only a STRUCT field chain. LIST/MAP/repeated predicates need explicit
56
// quantified semantics, so they must not be encoded here.
57
struct FileStructPredicateTarget {
58
    int32_t file_local_id = -1;
59
    std::string file_child_name;
60
    std::unique_ptr<FileStructPredicateTarget> child;
61
62
    FileStructPredicateTarget() = default;
63
    FileStructPredicateTarget(int32_t local_id, std::string child_name,
64
                              std::unique_ptr<FileStructPredicateTarget> nested_child = nullptr)
65
20
            : file_local_id(local_id),
66
20
              file_child_name(std::move(child_name)),
67
20
              child(std::move(nested_child)) {}
68
    FileStructPredicateTarget(const FileStructPredicateTarget& other);
69
    FileStructPredicateTarget& operator=(const FileStructPredicateTarget& other);
70
    FileStructPredicateTarget(FileStructPredicateTarget&& other) noexcept = default;
71
    FileStructPredicateTarget& operator=(FileStructPredicateTarget&& other) noexcept = default;
72
};
73
74
struct FileNestedPredicateTarget {
75
    LocalColumnId file_column_id = LocalColumnId::invalid();
76
    // Null means the predicate targets the top-level primitive column itself.
77
    std::unique_ptr<FileStructPredicateTarget> struct_target;
78
79
144
    FileNestedPredicateTarget() = default;
80
23
    explicit FileNestedPredicateTarget(LocalColumnId column_id) : file_column_id(column_id) {}
81
    FileNestedPredicateTarget(LocalColumnId column_id,
82
                              std::unique_ptr<FileStructPredicateTarget> target)
83
18
            : file_column_id(column_id), struct_target(std::move(target)) {}
84
    FileNestedPredicateTarget(const FileNestedPredicateTarget& other);
85
    FileNestedPredicateTarget& operator=(const FileNestedPredicateTarget& other);
86
105
    FileNestedPredicateTarget(FileNestedPredicateTarget&& other) noexcept = default;
87
41
    FileNestedPredicateTarget& operator=(FileNestedPredicateTarget&& other) noexcept = default;
88
89
433
    bool is_valid() const { return file_column_id.is_valid(); }
90
};
91
92
// File-local single-column predicates for file-layer pruning, such as min/max, page index,
93
// dictionary and bloom filter.
94
//
95
// Predicates must all belong to target.file_column_id. target.struct_target points to the nested
96
// primitive leaf under that root; null means the top-level column itself is the primitive leaf.
97
// These predicates are pruning hints only and are not row-level conjuncts.
98
struct FileColumnPredicateFilter {
99
    FileNestedPredicateTarget target;
100
    // Compatibility fields for call sites and tests that still construct pruning filters directly.
101
    // New mapper code should fill target; file readers consume target first and only fall back to
102
    // these fields while the API migration is in progress.
103
    LocalColumnId file_column_id = LocalColumnId::invalid();
104
    std::vector<int32_t> file_child_id_path;
105
    std::vector<std::shared_ptr<ColumnPredicate>> predicates;
106
107
    LocalColumnId effective_file_column_id() const;
108
    std::vector<int32_t> effective_file_child_id_path() const;
109
    bool same_target_as(const FileColumnPredicateFilter& other) const;
110
    std::string debug_string() const;
111
};
112
113
enum class FileFormat {
114
    PARQUET,
115
    ORC,
116
    CSV,
117
    JSON,
118
    TEXT,
119
    JNI,
120
    NATIVE,
121
};
122
123
// 通用文件层 scan 请求。
124
// 该结构描述所有文件格式都可以共享的 file-local 读取输入。这里不出现 table/global
125
// schema。所有 schema change、filter localization、default/generated/partition
126
// 列都应在 table 层完成。
127
struct FileScanRequest {
128
731
    virtual ~FileScanRequest() = default;
129
130
    std::string debug_string() const;
131
132
    // Columns that must be read before row-level filtering. They are materialized eagerly because
133
    // conjuncts/delete_conjuncts need them to decide the selected rows.
134
    std::vector<LocalColumnIndex> predicate_columns;
135
    // Columns read after row-level filtering. Predicate columns are also available for output and
136
    // should not be duplicated here.
137
    std::vector<LocalColumnIndex> non_predicate_columns;
138
    // file-local column id -> file-local output block position.
139
    std::map<LocalColumnId, LocalIndex> local_positions;
140
    // Row-level filters converted to file-local expressions from table-level predicates.
141
    VExprContextSPtrs conjuncts;
142
    // Delete predicates converted to file-local expressions.
143
    VExprContextSPtrs delete_conjuncts;
144
    // Single-column predicates used only for file-layer pruning, such as statistics, page index,
145
    // dictionary and bloom filter. They must not be used for batch row-level filtering.
146
    std::vector<FileColumnPredicateFilter> column_predicate_filters;
147
};
148
149
// Helper for constructing the scan-column layout in FileScanRequest.
150
//
151
// FileScanRequest keeps predicate and non-predicate columns separate because columnar readers such
152
// as Parquet can read predicate columns first, filter rows, and then lazily read the remaining
153
// projected columns. The two lists still share one file-local output block, whose positions are
154
// stored in local_positions. This builder centralizes the mechanical rules for that shared layout:
155
// - each root file column gets one stable block position;
156
// - predicate columns dominate non-predicate columns because they are already returned in the file
157
//   block and can be reused for final materialization;
158
// - repeated nested projections for the same root are merged instead of duplicated.
159
//
160
// TableColumnMapper should still own table-to-file semantic resolution. This helper only owns the
161
// FileScanRequest layout contract after a file-local projection has been produced.
162
class FileScanRequestBuilder {
163
public:
164
3.25k
    explicit FileScanRequestBuilder(FileScanRequest* request) : _request(request) {
165
3.25k
        DORIS_CHECK(_request != nullptr);
166
3.25k
    }
167
168
79
    Status add_predicate_column(LocalColumnIndex projection) {
169
79
        return _add_column(std::move(projection), &_request->predicate_columns,
170
79
                           /*is_predicate_column=*/true);
171
79
    }
172
173
3.17k
    Status add_non_predicate_column(LocalColumnIndex projection) {
174
3.17k
        return _add_column(std::move(projection), &_request->non_predicate_columns,
175
3.17k
                           /*is_predicate_column=*/false);
176
3.17k
    }
177
178
14
    Status add_predicate_column(LocalColumnId column_id) {
179
14
        return add_predicate_column(LocalColumnIndex::top_level(column_id));
180
14
    }
181
182
18
    Status add_non_predicate_column(LocalColumnId column_id) {
183
18
        return add_non_predicate_column(LocalColumnIndex::top_level(column_id));
184
18
    }
185
186
private:
187
3.24k
    static LocalIndex _next_block_position(const FileScanRequest& request) {
188
3.24k
        size_t next_position = 0;
189
19.5k
        for (const auto& [_, block_position] : request.local_positions) {
190
19.5k
            next_position = std::max(next_position, block_position.value() + 1);
191
19.5k
        }
192
3.24k
        return LocalIndex(next_position);
193
3.24k
    }
194
195
3.31k
    static void _sort_projection_children_by_file_id(LocalColumnIndex* projection) {
196
3.31k
        DORIS_CHECK(projection != nullptr);
197
3.31k
        if (projection->project_all_children) {
198
3.28k
            return;
199
3.28k
        }
200
50
        for (auto& child : projection->children) {
201
50
            _sort_projection_children_by_file_id(&child);
202
50
        }
203
34
        std::ranges::sort(projection->children,
204
34
                          [](const LocalColumnIndex& lhs, const LocalColumnIndex& rhs) {
205
23
                              return lhs.local_id() < rhs.local_id();
206
23
                          });
207
34
    }
208
209
    Status _add_column(LocalColumnIndex projection, std::vector<LocalColumnIndex>* scan_columns,
210
3.25k
                       bool is_predicate_column) {
211
3.25k
        DORIS_CHECK(scan_columns != nullptr);
212
3.25k
        const auto file_column_id = projection.column_id();
213
3.25k
        DORIS_CHECK(file_column_id != LocalColumnId::invalid());
214
3.25k
        if (!is_predicate_column &&
215
3.25k
            std::ranges::find_if(_request->predicate_columns, [&](const LocalColumnIndex& p) {
216
5
                return p.column_id() == file_column_id;
217
5
            }) != _request->predicate_columns.end()) {
218
2
            return Status::OK();
219
2
        }
220
3.25k
        if (!_request->local_positions.contains(file_column_id)) {
221
3.24k
            _request->local_positions.emplace(file_column_id, _next_block_position(*_request));
222
3.24k
        }
223
224
3.25k
        _sort_projection_children_by_file_id(&projection);
225
3.25k
        auto existing_projection_it = std::ranges::find_if(
226
3.25k
                *scan_columns,
227
19.4k
                [&](const LocalColumnIndex& p) { return p.column_id() == file_column_id; });
228
3.25k
        if (existing_projection_it == scan_columns->end()) {
229
3.24k
            scan_columns->push_back(std::move(projection));
230
3.24k
        } else {
231
9
            RETURN_IF_ERROR(merge_local_column_index(&*existing_projection_it, projection));
232
9
            _sort_projection_children_by_file_id(&*existing_projection_it);
233
9
        }
234
235
3.25k
        if (is_predicate_column) {
236
79
            auto it = std::ranges::find_if(
237
79
                    _request->non_predicate_columns,
238
79
                    [&](const LocalColumnIndex& p) { return p.column_id() == file_column_id; });
239
79
            if (it != _request->non_predicate_columns.end()) {
240
4
                _request->non_predicate_columns.erase(it);
241
4
            }
242
79
        }
243
3.25k
        return Status::OK();
244
3.25k
    }
245
246
    FileScanRequest* _request = nullptr;
247
};
248
249
struct FileAggregateRequest {
250
    struct Column {
251
        // File-local projection for the aggregate column. For nested MIN/MAX, this points to the
252
        // single primitive leaf that can be represented by file statistics.
253
        LocalColumnIndex projection;
254
    };
255
256
    TPushAggOp::type agg_type = TPushAggOp::type::NONE;
257
    std::vector<Column> columns;
258
};
259
260
struct FileAggregateResult {
261
    struct Column {
262
        // Mirrors FileAggregateRequest::Column::projection so TableReader can put the returned
263
        // aggregate value back into the matching projected nested shape.
264
        LocalColumnIndex projection;
265
        bool has_min = false;
266
        bool has_max = false;
267
        Field min_value;
268
        Field max_value;
269
    };
270
271
    int64_t count = 0;
272
    std::vector<Column> columns;
273
};
274
275
// 文件物理读取层通用接口。
276
// 该接口只描述 file-local schema、file-local scan request 和 file-local block。
277
// TableReader/IcebergTableReader 可以通过它组合不同文件格式 reader。
278
/**
279
 *                                +-----> get_schema() -----------------+
280
 * FileReader() -----> init() ----|                                      -----> close()
281
 *                                +-----> open() -----> get_block() ----+
282
 */
283
class FileReader {
284
public:
285
    struct ReaderStatistics {
286
        int32_t filtered_row_groups = 0;
287
        int32_t filtered_row_groups_by_min_max = 0;
288
        int32_t filtered_row_groups_by_bloom_filter = 0;
289
        int32_t read_row_groups = 0;
290
        int64_t filtered_group_rows = 0;
291
        int64_t filtered_page_rows = 0;
292
        int64_t lazy_read_filtered_rows = 0;
293
        int64_t read_rows = 0;
294
        int64_t filtered_bytes = 0;
295
        int64_t column_read_time = 0;
296
        int64_t parse_meta_time = 0;
297
        int64_t parse_footer_time = 0;
298
        int64_t file_footer_read_calls = 0;
299
        int64_t file_footer_hit_cache = 0;
300
        int64_t file_reader_create_time = 0;
301
        int64_t open_file_num = 0;
302
        int64_t row_group_filter_time = 0;
303
        int64_t page_index_filter_time = 0;
304
        int64_t read_page_index_time = 0;
305
        int64_t parse_page_index_time = 0;
306
        int64_t predicate_filter_time = 0;
307
        int64_t dict_filter_rewrite_time = 0;
308
        int64_t bloom_filter_read_time = 0;
309
    };
310
311
    FileReader(std::shared_ptr<io::FileSystemProperties>& system_properties,
312
               std::unique_ptr<io::FileDescription>& file_description,
313
               std::shared_ptr<io::IOContext> io_ctx, RuntimeProfile* profile)
314
659
            : _system_properties(system_properties),
315
659
              _file_description(std::move(file_description)),
316
659
              _io_ctx(io_ctx),
317
659
              _profile(profile) {}
318
659
    virtual ~FileReader() = default;
319
320
    // Initialize file reader and parse file metadata.
321
    virtual Status init(RuntimeState* state);
322
323
    // Get semantic file-local schema from file metadata. The file schema is determined by file
324
    // format and file content, and does not contain table/global schema semantics. A file reader may
325
    // expose raw file identifiers, such as Parquet field_id, through ColumnDefinition::identifier,
326
    // but it must not interpret table-format semantics such as Iceberg name mapping,
327
    // default/generated columns, or partition columns. File-format physical wrappers should be
328
    // normalized away before exposing this schema; for example, Parquet MAP is exposed as key/value
329
    // children rather than key_value/entry.
330
    //
331
    // Doris plans external-table scan types as nullable, including all nested children of complex
332
    // types. This protects Doris from illegal or inconsistent values produced by external systems.
333
    // Therefore every ColumnDefinition::type returned here must be nullable. Complex types must
334
    // also expose nullable child types recursively, even if the physical file marks those fields as
335
    // required.
336
    //
337
    // This method can only be called after init() successfully, but does not require open() to be
338
    // called.
339
    virtual Status get_schema(std::vector<ColumnDefinition>* file_schema) const = 0;
340
341
    // Create the mapper that matches this reader's scan-request capabilities. TableReader still
342
    // owns table-format semantics such as BY_NAME/BY_FIELD_ID/BY_INDEX, partition values and
343
    // default expressions; the FileReader only chooses whether file-local requests support columnar
344
    // lazy materialization/pruning or must materialize one flat list of required columns.
345
    virtual std::unique_ptr<TableColumnMapper> create_column_mapper(
346
            TableColumnMapperOptions options) const;
347
348
    // Open the file reader with file-local scan request. The file reader should initialize its internal state according to the request, but does not need to interpret table/global schema semantics. For example, all schema change, filter localization, default/generated/partition columns should be handled in table reader layer. This method can only be called after init() successfully.
349
638
    virtual Status open(std::shared_ptr<FileScanRequest> request) {
350
638
        _request = std::move(request);
351
638
        return Status::OK();
352
638
    }
353
354
    // 读取下一批 file-local block。
355
    // 该方法只能在 open(FileScanRequest) 成功后调用。
356
    // file_block 的列顺序和类型必须遵守 FileScanRequest,而不是 table/global schema。
357
    // rows 返回当前批次输出行数;eof 表示当前文件 reader 是否读完;多文件切换由
358
    // TableReader 负责。
359
0
    virtual Status get_block(Block* file_block, size_t* rows, bool* eof) {
360
        // stub 默认立即 EOF。
361
0
        if (rows != nullptr) {
362
0
            *rows = 0;
363
0
        }
364
0
        if (eof != nullptr) {
365
0
            *eof = true;
366
0
        }
367
0
        _eof = true;
368
0
        return Status::OK();
369
0
    }
370
371
    virtual Status get_aggregate_result(const FileAggregateRequest& request,
372
0
                                        FileAggregateResult* result) {
373
0
        return Status::NotSupported("FileReader does not support aggregate pushdown");
374
0
    }
375
376
    // Condition cache is managed by TableReader and consumed by physical file readers.
377
    // On cache HIT, readers may skip granules whose cached bit is false before doing column IO.
378
    // On cache MISS, readers mark a granule true when row-level predicates keep at least one row
379
    // in that granule. Readers that cannot map batch rows to stable file-global row ids should
380
    // keep the default no-op implementation.
381
0
    virtual void set_condition_cache_context(std::shared_ptr<ConditionCacheContext> ctx) {}
382
383
    // Total rows covered by this physical reader. TableReader uses it to pre-size the miss bitmap.
384
    // Readers should return 0 if the metadata is unavailable or the row coordinate is unstable.
385
5
    virtual int64_t get_total_rows() const { return 0; }
386
387
    // 关闭当前物理文件 reader 并释放文件层状态。
388
    // 该方法不处理 table-level delete/finalize 状态,后者由 TableReader 子类管理。
389
177
    virtual Status close() {
390
177
        _file_reader.reset();
391
177
        _tracing_file_reader.reset();
392
177
        _io_ctx.reset();
393
177
        _eof = true;
394
177
        return Status::OK();
395
177
    }
396
397
protected:
398
12
    virtual void _init_profile() {}
399
400
    io::FileReaderSPtr _file_reader;
401
    // _tracing_file_reader wraps _file_reader.
402
    // _file_reader is original file reader.
403
    // _tracing_file_reader is tracing file reader with io context.
404
    // If io_ctx is null, _tracing_file_reader will be the same as file_reader.
405
    io::FileReaderSPtr _tracing_file_reader = nullptr;
406
    std::shared_ptr<FileScanRequest> _request;
407
    bool _eof = true;
408
    ReaderStatistics _reader_statistics;
409
    std::shared_ptr<io::FileSystemProperties> _system_properties;
410
    std::unique_ptr<io::FileDescription> _file_description;
411
    std::shared_ptr<io::IOContext> _io_ctx;
412
    RuntimeProfile* _profile = nullptr;
413
};
414
415
} // namespace doris::format