Coverage Report

Created: 2026-07-02 13:17

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format_v2/file_reader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//   http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing,
10
// software distributed under the License is distributed on an
11
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
12
// KIND, either express or implied.  See the License for the
13
// specific language governing permissions and limitations
14
// under the License.
15
16
#pragma once
17
18
#include <algorithm>
19
#include <cstddef>
20
#include <cstdint>
21
#include <map>
22
#include <memory>
23
#include <string>
24
#include <utility>
25
#include <vector>
26
27
#include "common/status.h"
28
#include "core/data_type/data_type.h"
29
#include "core/field.h"
30
#include "exprs/vexpr_fwd.h"
31
#include "format_v2/column_data.h"
32
#include "gen_cpp/PlanNodes_types.h"
33
#include "io/file_factory.h"
34
#include "io/fs/file_reader_writer_fwd.h"
35
36
namespace doris {
37
class Block;
38
class ColumnPredicate;
39
struct ConditionCacheContext;
40
41
namespace io {
42
struct IOContext;
43
} // namespace io
44
} // namespace doris
45
46
namespace doris::format {
47
48
class TableColumnMapper;
49
struct TableColumnMapperOptions;
50
51
// File-local single-column predicates for file-layer pruning, such as min/max, page index,
52
// dictionary and bloom filter. Predicates must all belong to file_column_id.
53
// These predicates are pruning hints only and are not row-level conjuncts.
54
struct FileColumnPredicateFilter {
55
    LocalColumnId file_column_id = LocalColumnId::invalid();
56
    std::vector<std::shared_ptr<ColumnPredicate>> predicates;
57
58
    std::string debug_string() const;
59
};
60
61
enum class FileFormat {
62
    PARQUET,
63
    ORC,
64
    CSV,
65
    JSON,
66
    TEXT,
67
    JNI,
68
    NATIVE,
69
    ARROW,
70
};
71
72
struct FileScanRequest {
73
277
    virtual ~FileScanRequest() = default;
74
75
    std::string debug_string() const;
76
77
    // Columns that must be read before row-level filtering. They are materialized eagerly because
78
    // conjuncts/delete_conjuncts need them to decide the selected rows.
79
    std::vector<LocalColumnIndex> predicate_columns;
80
    // Columns read after row-level filtering. Predicate columns are also available for output and
81
    // should not be duplicated here.
82
    std::vector<LocalColumnIndex> non_predicate_columns;
83
    // file-local column id -> file-local output block position.
84
    std::map<LocalColumnId, LocalIndex> local_positions;
85
    // Row-level filters converted to file-local expressions from table-level predicates.
86
    VExprContextSPtrs conjuncts;
87
    // Delete predicates converted to file-local expressions.
88
    VExprContextSPtrs delete_conjuncts;
89
    // Single-column predicates used only for file-layer pruning, such as statistics, page index,
90
    // dictionary and bloom filter. They must not be used for batch row-level filtering.
91
    std::vector<FileColumnPredicateFilter> column_predicate_filters;
92
};
93
94
// Helper for constructing the scan-column layout in FileScanRequest.
95
// FileScanRequest keeps predicate and non-predicate columns separate because columnar readers such
96
// as Parquet can read predicate columns first, filter rows, and then lazily read the remaining
97
// projected columns. The two lists still share one file-local output block, whose positions are
98
// stored in local_positions. This builder centralizes the mechanical rules for that shared layout:
99
// - each root file column gets one stable block position;
100
// - predicate columns dominate non-predicate columns because they are already returned in the file
101
//   block and can be reused for final materialization;
102
// - repeated nested projections for the same root are merged instead of duplicated.
103
// TableColumnMapper should still own table-to-file semantic resolution. This helper only owns the
104
// FileScanRequest layout contract after a file-local projection has been produced.
105
class FileScanRequestBuilder {
106
public:
107
186
    explicit FileScanRequestBuilder(FileScanRequest* request) : _request(request) {
108
186
        DORIS_CHECK(_request != nullptr);
109
186
    }
110
111
77
    Status add_predicate_column(LocalColumnIndex projection) {
112
77
        return _add_column(std::move(projection), &_request->predicate_columns,
113
77
                           /*is_predicate_column=*/true);
114
77
    }
115
116
116
    Status add_non_predicate_column(LocalColumnIndex projection) {
117
116
        return _add_column(std::move(projection), &_request->non_predicate_columns,
118
116
                           /*is_predicate_column=*/false);
119
116
    }
120
121
15
    Status add_predicate_column(LocalColumnId column_id) {
122
15
        return add_predicate_column(LocalColumnIndex::top_level(column_id));
123
15
    }
124
125
24
    Status add_non_predicate_column(LocalColumnId column_id) {
126
24
        return add_non_predicate_column(LocalColumnIndex::top_level(column_id));
127
24
    }
128
129
private:
130
182
    static LocalIndex _next_block_position(const FileScanRequest& request) {
131
182
        size_t next_position = 0;
132
182
        for (const auto& [_, block_position] : request.local_positions) {
133
70
            next_position = std::max(next_position, block_position.value() + 1);
134
70
        }
135
182
        return LocalIndex(next_position);
136
182
    }
137
138
245
    static void _sort_projection_children_by_file_id(LocalColumnIndex* projection) {
139
245
        DORIS_CHECK(projection != nullptr);
140
245
        if (projection->project_all_children) {
141
211
            return;
142
211
        }
143
50
        for (auto& child : projection->children) {
144
50
            _sort_projection_children_by_file_id(&child);
145
50
        }
146
34
        std::ranges::sort(projection->children,
147
34
                          [](const LocalColumnIndex& lhs, const LocalColumnIndex& rhs) {
148
23
                              return lhs.local_id() < rhs.local_id();
149
23
                          });
150
34
    }
151
152
    Status _add_column(LocalColumnIndex projection, std::vector<LocalColumnIndex>* scan_columns,
153
193
                       bool is_predicate_column) {
154
193
        DORIS_CHECK(scan_columns != nullptr);
155
193
        const auto file_column_id = projection.column_id();
156
193
        DORIS_CHECK(file_column_id != LocalColumnId::invalid());
157
193
        if (!is_predicate_column &&
158
193
            std::ranges::find_if(_request->predicate_columns, [&](const LocalColumnIndex& p) {
159
6
                return p.column_id() == file_column_id;
160
6
            }) != _request->predicate_columns.end()) {
161
2
            return Status::OK();
162
2
        }
163
191
        if (!_request->local_positions.contains(file_column_id)) {
164
182
            _request->local_positions.emplace(file_column_id, _next_block_position(*_request));
165
182
        }
166
167
191
        _sort_projection_children_by_file_id(&projection);
168
191
        auto existing_projection_it = std::ranges::find_if(
169
191
                *scan_columns,
170
191
                [&](const LocalColumnIndex& p) { return p.column_id() == file_column_id; });
171
191
        if (existing_projection_it == scan_columns->end()) {
172
187
            scan_columns->push_back(std::move(projection));
173
187
        } else {
174
4
            RETURN_IF_ERROR(merge_local_column_index(&*existing_projection_it, projection));
175
4
            _sort_projection_children_by_file_id(&*existing_projection_it);
176
4
        }
177
178
191
        if (is_predicate_column) {
179
77
            auto it = std::ranges::find_if(
180
77
                    _request->non_predicate_columns,
181
77
                    [&](const LocalColumnIndex& p) { return p.column_id() == file_column_id; });
182
77
            if (it != _request->non_predicate_columns.end()) {
183
4
                _request->non_predicate_columns.erase(it);
184
4
            }
185
77
        }
186
191
        return Status::OK();
187
191
    }
188
189
    FileScanRequest* _request = nullptr;
190
};
191
192
struct FileAggregateRequest {
193
    struct Column {
194
        // File-local projection for the aggregate column. For nested MIN/MAX, this points to the
195
        // single primitive leaf that can be represented by file statistics. For COUNT(col), this
196
        // points to the top-level column whose NULL-ness should be counted.
197
        LocalColumnIndex projection;
198
    };
199
200
    TPushAggOp::type agg_type = TPushAggOp::type::NONE;
201
    // Empty for COUNT(*)/row-count pushdown. Non-empty for COUNT(col), where the file reader must
202
    // return the number of non-NULL rows for the requested column instead of total rows.
203
    std::vector<Column> columns;
204
};
205
206
struct FileAggregateResult {
207
    struct Column {
208
        // Mirrors FileAggregateRequest::Column::projection so TableReader can put the returned
209
        // aggregate value back into the matching projected nested shape.
210
        LocalColumnIndex projection;
211
        bool has_min = false;
212
        bool has_max = false;
213
        Field min_value;
214
        Field max_value;
215
    };
216
217
    int64_t count = 0;
218
    std::vector<Column> columns;
219
};
220
221
/**
222
 *                                +-----> get_schema() -----------------+
223
 * FileReader() -----> init() ----|                                      -----> close()
224
 *                                +-----> open() -----> get_block() ----+
225
 */
226
class FileReader {
227
public:
228
    struct ReaderStatistics {
229
        int32_t filtered_row_groups = 0;
230
        int32_t filtered_row_groups_by_min_max = 0;
231
        int32_t filtered_row_groups_by_bloom_filter = 0;
232
        int32_t read_row_groups = 0;
233
        int64_t filtered_group_rows = 0;
234
        int64_t filtered_page_rows = 0;
235
        int64_t lazy_read_filtered_rows = 0;
236
        int64_t read_rows = 0;
237
        int64_t filtered_bytes = 0;
238
        int64_t column_read_time = 0;
239
        int64_t parse_meta_time = 0;
240
        int64_t parse_footer_time = 0;
241
        int64_t file_footer_read_calls = 0;
242
        int64_t file_footer_hit_cache = 0;
243
        int64_t file_reader_create_time = 0;
244
        int64_t open_file_num = 0;
245
        int64_t row_group_filter_time = 0;
246
        int64_t page_index_filter_time = 0;
247
        int64_t read_page_index_time = 0;
248
        int64_t parse_page_index_time = 0;
249
        int64_t predicate_filter_time = 0;
250
        int64_t dict_filter_rewrite_time = 0;
251
        int64_t bloom_filter_read_time = 0;
252
    };
253
254
    FileReader(std::shared_ptr<io::FileSystemProperties>& system_properties,
255
               std::unique_ptr<io::FileDescription>& file_description,
256
               std::shared_ptr<io::IOContext> io_ctx, RuntimeProfile* profile)
257
206
            : _system_properties(system_properties),
258
206
              _file_description(std::move(file_description)),
259
206
              _io_ctx(io_ctx),
260
206
              _profile(profile) {}
261
206
    virtual ~FileReader() = default;
262
263
    // Initialize file reader and parse file metadata.
264
    virtual Status init(RuntimeState* state);
265
266
    // Set the maximum row count for the next physical read batch. Readers that do not batch by
267
    // rows may ignore it.
268
0
    virtual void set_batch_size(size_t batch_size) { (void)batch_size; }
269
270
    // Get semantic file-local schema from file metadata. The file schema is determined by file
271
    // format and file content, and does not contain table/global schema semantics. A file reader may
272
    // expose raw file identifiers, such as Parquet field_id, through ColumnDefinition::identifier,
273
    // but it must not interpret table-format semantics such as Iceberg name mapping,
274
    // default/generated columns, or partition columns. File-format physical wrappers should be
275
    // normalized away before exposing this schema; for example, Parquet MAP is exposed as key/value
276
    // children rather than key_value/entry.
277
    // Doris plans external-table scan types as nullable, including all nested children of complex
278
    // types. This protects Doris from illegal or inconsistent values produced by external systems.
279
    // Therefore every ColumnDefinition::type returned here must be nullable. Complex types must
280
    // also expose nullable child types recursively, even if the physical file marks those fields as
281
    // required.
282
    // This method can only be called after init() successfully, but does not require open() to be
283
    // called.
284
    virtual Status get_schema(std::vector<ColumnDefinition>* file_schema) const = 0;
285
286
    // Create the mapper that matches this reader's scan-request capabilities. TableReader still
287
    // owns table-format semantics such as BY_NAME/BY_FIELD_ID/BY_INDEX, partition values and
288
    // default expressions; the FileReader only chooses whether file-local requests support columnar
289
    // lazy materialization/pruning or must materialize one flat list of required columns.
290
    virtual std::unique_ptr<TableColumnMapper> create_column_mapper(
291
            TableColumnMapperOptions options) const;
292
293
    // Open the file reader with file-local scan request. The file reader should initialize its internal state according to the request, but does not need to interpret table/global schema semantics. For example, all schema change, filter localization, default/generated/partition columns should be handled in table reader layer. This method can only be called after init() successfully.
294
184
    virtual Status open(std::shared_ptr<FileScanRequest> request) {
295
184
        _request = std::move(request);
296
184
        return Status::OK();
297
184
    }
298
299
0
    virtual Status get_block(Block* file_block, size_t* rows, bool* eof) {
300
0
        if (rows != nullptr) {
301
0
            *rows = 0;
302
0
        }
303
0
        if (eof != nullptr) {
304
0
            *eof = true;
305
0
        }
306
0
        _eof = true;
307
0
        return Status::OK();
308
0
    }
309
310
    virtual Status get_aggregate_result(const FileAggregateRequest& request,
311
0
                                        FileAggregateResult* result) {
312
0
        return Status::NotSupported("FileReader does not support aggregate pushdown");
313
0
    }
314
315
    // Condition cache is managed by TableReader and consumed by physical file readers.
316
    // On cache HIT, readers may skip granules whose cached bit is false before doing column IO.
317
    // On cache MISS, readers mark a granule true when row-level predicates keep at least one row
318
    // in that granule. Readers that cannot map batch rows to stable file-global row ids should
319
    // keep the default no-op implementation.
320
0
    virtual void set_condition_cache_context(std::shared_ptr<ConditionCacheContext> ctx) {}
321
322
    // Total rows covered by this physical reader. TableReader uses it to pre-size the miss bitmap.
323
    // Readers should return 0 if the metadata is unavailable or the row coordinate is unstable.
324
0
    virtual int64_t get_total_rows() const { return 0; }
325
326
67
    virtual Status close() {
327
67
        _file_reader.reset();
328
67
        _tracing_file_reader.reset();
329
67
        _io_ctx.reset();
330
67
        _eof = true;
331
67
        return Status::OK();
332
67
    }
333
334
protected:
335
10
    virtual void _init_profile() {}
336
337
    io::FileReaderSPtr _file_reader;
338
    // _tracing_file_reader wraps _file_reader.
339
    // _file_reader is original file reader.
340
    // _tracing_file_reader is tracing file reader with io context.
341
    // If io_ctx is null, _tracing_file_reader will be the same as file_reader.
342
    io::FileReaderSPtr _tracing_file_reader = nullptr;
343
    std::shared_ptr<FileScanRequest> _request;
344
    bool _eof = true;
345
    ReaderStatistics _reader_statistics;
346
    std::shared_ptr<io::FileSystemProperties> _system_properties;
347
    std::unique_ptr<io::FileDescription> _file_description;
348
    std::shared_ptr<io::IOContext> _io_ctx;
349
    RuntimeProfile* _profile = nullptr;
350
};
351
352
} // namespace doris::format