be/src/format_v2/file_reader.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <algorithm> |
21 | | #include <cstddef> |
22 | | #include <cstdint> |
23 | | #include <map> |
24 | | #include <memory> |
25 | | #include <string> |
26 | | #include <utility> |
27 | | #include <vector> |
28 | | |
29 | | #include "common/status.h" |
30 | | #include "core/data_type/data_type.h" |
31 | | #include "core/field.h" |
32 | | #include "exprs/vexpr_fwd.h" |
33 | | #include "format_v2/column_data.h" |
34 | | #include "gen_cpp/PlanNodes_types.h" |
35 | | #include "io/file_factory.h" |
36 | | #include "io/fs/file_reader_writer_fwd.h" |
37 | | |
38 | | namespace doris { |
39 | | class Block; |
40 | | class ColumnPredicate; |
41 | | struct ConditionCacheContext; |
42 | | |
43 | | namespace io { |
44 | | struct IOContext; |
45 | | } // namespace io |
46 | | } // namespace doris |
47 | | |
48 | | namespace doris::format { |
49 | | |
50 | | class TableColumnMapper; |
51 | | struct TableColumnMapperOptions; |
52 | | |
53 | | // Struct-only nested predicate target used by file-layer pruning. |
54 | | // |
55 | | // This intentionally models only a STRUCT field chain. LIST/MAP/repeated predicates need explicit |
56 | | // quantified semantics, so they must not be encoded here. |
57 | | struct FileStructPredicateTarget { |
58 | | int32_t file_local_id = -1; |
59 | | std::string file_child_name; |
60 | | std::unique_ptr<FileStructPredicateTarget> child; |
61 | | |
62 | | FileStructPredicateTarget() = default; |
63 | | FileStructPredicateTarget(int32_t local_id, std::string child_name, |
64 | | std::unique_ptr<FileStructPredicateTarget> nested_child = nullptr) |
65 | 20 | : file_local_id(local_id), |
66 | 20 | file_child_name(std::move(child_name)), |
67 | 20 | child(std::move(nested_child)) {} |
68 | | FileStructPredicateTarget(const FileStructPredicateTarget& other); |
69 | | FileStructPredicateTarget& operator=(const FileStructPredicateTarget& other); |
70 | | FileStructPredicateTarget(FileStructPredicateTarget&& other) noexcept = default; |
71 | | FileStructPredicateTarget& operator=(FileStructPredicateTarget&& other) noexcept = default; |
72 | | }; |
73 | | |
74 | | struct FileNestedPredicateTarget { |
75 | | LocalColumnId file_column_id = LocalColumnId::invalid(); |
76 | | // Null means the predicate targets the top-level primitive column itself. |
77 | | std::unique_ptr<FileStructPredicateTarget> struct_target; |
78 | | |
79 | 144 | FileNestedPredicateTarget() = default; |
80 | 23 | explicit FileNestedPredicateTarget(LocalColumnId column_id) : file_column_id(column_id) {} |
81 | | FileNestedPredicateTarget(LocalColumnId column_id, |
82 | | std::unique_ptr<FileStructPredicateTarget> target) |
83 | 18 | : file_column_id(column_id), struct_target(std::move(target)) {} |
84 | | FileNestedPredicateTarget(const FileNestedPredicateTarget& other); |
85 | | FileNestedPredicateTarget& operator=(const FileNestedPredicateTarget& other); |
86 | 105 | FileNestedPredicateTarget(FileNestedPredicateTarget&& other) noexcept = default; |
87 | 41 | FileNestedPredicateTarget& operator=(FileNestedPredicateTarget&& other) noexcept = default; |
88 | | |
89 | 433 | bool is_valid() const { return file_column_id.is_valid(); } |
90 | | }; |
91 | | |
92 | | // File-local single-column predicates for file-layer pruning, such as min/max, page index, |
93 | | // dictionary and bloom filter. |
94 | | // |
95 | | // Predicates must all belong to target.file_column_id. target.struct_target points to the nested |
96 | | // primitive leaf under that root; null means the top-level column itself is the primitive leaf. |
97 | | // These predicates are pruning hints only and are not row-level conjuncts. |
98 | | struct FileColumnPredicateFilter { |
99 | | FileNestedPredicateTarget target; |
100 | | // Compatibility fields for call sites and tests that still construct pruning filters directly. |
101 | | // New mapper code should fill target; file readers consume target first and only fall back to |
102 | | // these fields while the API migration is in progress. |
103 | | LocalColumnId file_column_id = LocalColumnId::invalid(); |
104 | | std::vector<int32_t> file_child_id_path; |
105 | | std::vector<std::shared_ptr<ColumnPredicate>> predicates; |
106 | | |
107 | | LocalColumnId effective_file_column_id() const; |
108 | | std::vector<int32_t> effective_file_child_id_path() const; |
109 | | bool same_target_as(const FileColumnPredicateFilter& other) const; |
110 | | std::string debug_string() const; |
111 | | }; |
112 | | |
113 | | enum class FileFormat { |
114 | | PARQUET, |
115 | | ORC, |
116 | | CSV, |
117 | | JSON, |
118 | | TEXT, |
119 | | JNI, |
120 | | NATIVE, |
121 | | }; |
122 | | |
123 | | // 通用文件层 scan 请求。 |
124 | | // 该结构描述所有文件格式都可以共享的 file-local 读取输入。这里不出现 table/global |
125 | | // schema。所有 schema change、filter localization、default/generated/partition |
126 | | // 列都应在 table 层完成。 |
127 | | struct FileScanRequest { |
128 | 731 | virtual ~FileScanRequest() = default; |
129 | | |
130 | | std::string debug_string() const; |
131 | | |
132 | | // Columns that must be read before row-level filtering. They are materialized eagerly because |
133 | | // conjuncts/delete_conjuncts need them to decide the selected rows. |
134 | | std::vector<LocalColumnIndex> predicate_columns; |
135 | | // Columns read after row-level filtering. Predicate columns are also available for output and |
136 | | // should not be duplicated here. |
137 | | std::vector<LocalColumnIndex> non_predicate_columns; |
138 | | // file-local column id -> file-local output block position. |
139 | | std::map<LocalColumnId, LocalIndex> local_positions; |
140 | | // Row-level filters converted to file-local expressions from table-level predicates. |
141 | | VExprContextSPtrs conjuncts; |
142 | | // Delete predicates converted to file-local expressions. |
143 | | VExprContextSPtrs delete_conjuncts; |
144 | | // Single-column predicates used only for file-layer pruning, such as statistics, page index, |
145 | | // dictionary and bloom filter. They must not be used for batch row-level filtering. |
146 | | std::vector<FileColumnPredicateFilter> column_predicate_filters; |
147 | | }; |
148 | | |
149 | | // Helper for constructing the scan-column layout in FileScanRequest. |
150 | | // |
151 | | // FileScanRequest keeps predicate and non-predicate columns separate because columnar readers such |
152 | | // as Parquet can read predicate columns first, filter rows, and then lazily read the remaining |
153 | | // projected columns. The two lists still share one file-local output block, whose positions are |
154 | | // stored in local_positions. This builder centralizes the mechanical rules for that shared layout: |
155 | | // - each root file column gets one stable block position; |
156 | | // - predicate columns dominate non-predicate columns because they are already returned in the file |
157 | | // block and can be reused for final materialization; |
158 | | // - repeated nested projections for the same root are merged instead of duplicated. |
159 | | // |
160 | | // TableColumnMapper should still own table-to-file semantic resolution. This helper only owns the |
161 | | // FileScanRequest layout contract after a file-local projection has been produced. |
162 | | class FileScanRequestBuilder { |
163 | | public: |
164 | 3.25k | explicit FileScanRequestBuilder(FileScanRequest* request) : _request(request) { |
165 | 3.25k | DORIS_CHECK(_request != nullptr); |
166 | 3.25k | } |
167 | | |
168 | 79 | Status add_predicate_column(LocalColumnIndex projection) { |
169 | 79 | return _add_column(std::move(projection), &_request->predicate_columns, |
170 | 79 | /*is_predicate_column=*/true); |
171 | 79 | } |
172 | | |
173 | 3.17k | Status add_non_predicate_column(LocalColumnIndex projection) { |
174 | 3.17k | return _add_column(std::move(projection), &_request->non_predicate_columns, |
175 | 3.17k | /*is_predicate_column=*/false); |
176 | 3.17k | } |
177 | | |
178 | 14 | Status add_predicate_column(LocalColumnId column_id) { |
179 | 14 | return add_predicate_column(LocalColumnIndex::top_level(column_id)); |
180 | 14 | } |
181 | | |
182 | 18 | Status add_non_predicate_column(LocalColumnId column_id) { |
183 | 18 | return add_non_predicate_column(LocalColumnIndex::top_level(column_id)); |
184 | 18 | } |
185 | | |
186 | | private: |
187 | 3.24k | static LocalIndex _next_block_position(const FileScanRequest& request) { |
188 | 3.24k | size_t next_position = 0; |
189 | 19.5k | for (const auto& [_, block_position] : request.local_positions) { |
190 | 19.5k | next_position = std::max(next_position, block_position.value() + 1); |
191 | 19.5k | } |
192 | 3.24k | return LocalIndex(next_position); |
193 | 3.24k | } |
194 | | |
195 | 3.31k | static void _sort_projection_children_by_file_id(LocalColumnIndex* projection) { |
196 | 3.31k | DORIS_CHECK(projection != nullptr); |
197 | 3.31k | if (projection->project_all_children) { |
198 | 3.28k | return; |
199 | 3.28k | } |
200 | 50 | for (auto& child : projection->children) { |
201 | 50 | _sort_projection_children_by_file_id(&child); |
202 | 50 | } |
203 | 34 | std::ranges::sort(projection->children, |
204 | 34 | [](const LocalColumnIndex& lhs, const LocalColumnIndex& rhs) { |
205 | 23 | return lhs.local_id() < rhs.local_id(); |
206 | 23 | }); |
207 | 34 | } |
208 | | |
209 | | Status _add_column(LocalColumnIndex projection, std::vector<LocalColumnIndex>* scan_columns, |
210 | 3.25k | bool is_predicate_column) { |
211 | 3.25k | DORIS_CHECK(scan_columns != nullptr); |
212 | 3.25k | const auto file_column_id = projection.column_id(); |
213 | 3.25k | DORIS_CHECK(file_column_id != LocalColumnId::invalid()); |
214 | 3.25k | if (!is_predicate_column && |
215 | 3.25k | std::ranges::find_if(_request->predicate_columns, [&](const LocalColumnIndex& p) { |
216 | 5 | return p.column_id() == file_column_id; |
217 | 5 | }) != _request->predicate_columns.end()) { |
218 | 2 | return Status::OK(); |
219 | 2 | } |
220 | 3.25k | if (!_request->local_positions.contains(file_column_id)) { |
221 | 3.24k | _request->local_positions.emplace(file_column_id, _next_block_position(*_request)); |
222 | 3.24k | } |
223 | | |
224 | 3.25k | _sort_projection_children_by_file_id(&projection); |
225 | 3.25k | auto existing_projection_it = std::ranges::find_if( |
226 | 3.25k | *scan_columns, |
227 | 19.4k | [&](const LocalColumnIndex& p) { return p.column_id() == file_column_id; }); |
228 | 3.25k | if (existing_projection_it == scan_columns->end()) { |
229 | 3.24k | scan_columns->push_back(std::move(projection)); |
230 | 3.24k | } else { |
231 | 9 | RETURN_IF_ERROR(merge_local_column_index(&*existing_projection_it, projection)); |
232 | 9 | _sort_projection_children_by_file_id(&*existing_projection_it); |
233 | 9 | } |
234 | | |
235 | 3.25k | if (is_predicate_column) { |
236 | 79 | auto it = std::ranges::find_if( |
237 | 79 | _request->non_predicate_columns, |
238 | 79 | [&](const LocalColumnIndex& p) { return p.column_id() == file_column_id; }); |
239 | 79 | if (it != _request->non_predicate_columns.end()) { |
240 | 4 | _request->non_predicate_columns.erase(it); |
241 | 4 | } |
242 | 79 | } |
243 | 3.25k | return Status::OK(); |
244 | 3.25k | } |
245 | | |
246 | | FileScanRequest* _request = nullptr; |
247 | | }; |
248 | | |
249 | | struct FileAggregateRequest { |
250 | | struct Column { |
251 | | // File-local projection for the aggregate column. For nested MIN/MAX, this points to the |
252 | | // single primitive leaf that can be represented by file statistics. |
253 | | LocalColumnIndex projection; |
254 | | }; |
255 | | |
256 | | TPushAggOp::type agg_type = TPushAggOp::type::NONE; |
257 | | std::vector<Column> columns; |
258 | | }; |
259 | | |
260 | | struct FileAggregateResult { |
261 | | struct Column { |
262 | | // Mirrors FileAggregateRequest::Column::projection so TableReader can put the returned |
263 | | // aggregate value back into the matching projected nested shape. |
264 | | LocalColumnIndex projection; |
265 | | bool has_min = false; |
266 | | bool has_max = false; |
267 | | Field min_value; |
268 | | Field max_value; |
269 | | }; |
270 | | |
271 | | int64_t count = 0; |
272 | | std::vector<Column> columns; |
273 | | }; |
274 | | |
275 | | // 文件物理读取层通用接口。 |
276 | | // 该接口只描述 file-local schema、file-local scan request 和 file-local block。 |
277 | | // TableReader/IcebergTableReader 可以通过它组合不同文件格式 reader。 |
278 | | /** |
279 | | * +-----> get_schema() -----------------+ |
280 | | * FileReader() -----> init() ----| -----> close() |
281 | | * +-----> open() -----> get_block() ----+ |
282 | | */ |
283 | | class FileReader { |
284 | | public: |
285 | | struct ReaderStatistics { |
286 | | int32_t filtered_row_groups = 0; |
287 | | int32_t filtered_row_groups_by_min_max = 0; |
288 | | int32_t filtered_row_groups_by_bloom_filter = 0; |
289 | | int32_t read_row_groups = 0; |
290 | | int64_t filtered_group_rows = 0; |
291 | | int64_t filtered_page_rows = 0; |
292 | | int64_t lazy_read_filtered_rows = 0; |
293 | | int64_t read_rows = 0; |
294 | | int64_t filtered_bytes = 0; |
295 | | int64_t column_read_time = 0; |
296 | | int64_t parse_meta_time = 0; |
297 | | int64_t parse_footer_time = 0; |
298 | | int64_t file_footer_read_calls = 0; |
299 | | int64_t file_footer_hit_cache = 0; |
300 | | int64_t file_reader_create_time = 0; |
301 | | int64_t open_file_num = 0; |
302 | | int64_t row_group_filter_time = 0; |
303 | | int64_t page_index_filter_time = 0; |
304 | | int64_t read_page_index_time = 0; |
305 | | int64_t parse_page_index_time = 0; |
306 | | int64_t predicate_filter_time = 0; |
307 | | int64_t dict_filter_rewrite_time = 0; |
308 | | int64_t bloom_filter_read_time = 0; |
309 | | }; |
310 | | |
311 | | FileReader(std::shared_ptr<io::FileSystemProperties>& system_properties, |
312 | | std::unique_ptr<io::FileDescription>& file_description, |
313 | | std::shared_ptr<io::IOContext> io_ctx, RuntimeProfile* profile) |
314 | 659 | : _system_properties(system_properties), |
315 | 659 | _file_description(std::move(file_description)), |
316 | 659 | _io_ctx(io_ctx), |
317 | 659 | _profile(profile) {} |
318 | 659 | virtual ~FileReader() = default; |
319 | | |
320 | | // Initialize file reader and parse file metadata. |
321 | | virtual Status init(RuntimeState* state); |
322 | | |
323 | | // Get semantic file-local schema from file metadata. The file schema is determined by file |
324 | | // format and file content, and does not contain table/global schema semantics. A file reader may |
325 | | // expose raw file identifiers, such as Parquet field_id, through ColumnDefinition::identifier, |
326 | | // but it must not interpret table-format semantics such as Iceberg name mapping, |
327 | | // default/generated columns, or partition columns. File-format physical wrappers should be |
328 | | // normalized away before exposing this schema; for example, Parquet MAP is exposed as key/value |
329 | | // children rather than key_value/entry. |
330 | | // |
331 | | // Doris plans external-table scan types as nullable, including all nested children of complex |
332 | | // types. This protects Doris from illegal or inconsistent values produced by external systems. |
333 | | // Therefore every ColumnDefinition::type returned here must be nullable. Complex types must |
334 | | // also expose nullable child types recursively, even if the physical file marks those fields as |
335 | | // required. |
336 | | // |
337 | | // This method can only be called after init() successfully, but does not require open() to be |
338 | | // called. |
339 | | virtual Status get_schema(std::vector<ColumnDefinition>* file_schema) const = 0; |
340 | | |
341 | | // Create the mapper that matches this reader's scan-request capabilities. TableReader still |
342 | | // owns table-format semantics such as BY_NAME/BY_FIELD_ID/BY_INDEX, partition values and |
343 | | // default expressions; the FileReader only chooses whether file-local requests support columnar |
344 | | // lazy materialization/pruning or must materialize one flat list of required columns. |
345 | | virtual std::unique_ptr<TableColumnMapper> create_column_mapper( |
346 | | TableColumnMapperOptions options) const; |
347 | | |
348 | | // Open the file reader with file-local scan request. The file reader should initialize its internal state according to the request, but does not need to interpret table/global schema semantics. For example, all schema change, filter localization, default/generated/partition columns should be handled in table reader layer. This method can only be called after init() successfully. |
349 | 638 | virtual Status open(std::shared_ptr<FileScanRequest> request) { |
350 | 638 | _request = std::move(request); |
351 | 638 | return Status::OK(); |
352 | 638 | } |
353 | | |
354 | | // 读取下一批 file-local block。 |
355 | | // 该方法只能在 open(FileScanRequest) 成功后调用。 |
356 | | // file_block 的列顺序和类型必须遵守 FileScanRequest,而不是 table/global schema。 |
357 | | // rows 返回当前批次输出行数;eof 表示当前文件 reader 是否读完;多文件切换由 |
358 | | // TableReader 负责。 |
359 | 0 | virtual Status get_block(Block* file_block, size_t* rows, bool* eof) { |
360 | | // stub 默认立即 EOF。 |
361 | 0 | if (rows != nullptr) { |
362 | 0 | *rows = 0; |
363 | 0 | } |
364 | 0 | if (eof != nullptr) { |
365 | 0 | *eof = true; |
366 | 0 | } |
367 | 0 | _eof = true; |
368 | 0 | return Status::OK(); |
369 | 0 | } |
370 | | |
371 | | virtual Status get_aggregate_result(const FileAggregateRequest& request, |
372 | 0 | FileAggregateResult* result) { |
373 | 0 | return Status::NotSupported("FileReader does not support aggregate pushdown"); |
374 | 0 | } |
375 | | |
376 | | // Condition cache is managed by TableReader and consumed by physical file readers. |
377 | | // On cache HIT, readers may skip granules whose cached bit is false before doing column IO. |
378 | | // On cache MISS, readers mark a granule true when row-level predicates keep at least one row |
379 | | // in that granule. Readers that cannot map batch rows to stable file-global row ids should |
380 | | // keep the default no-op implementation. |
381 | 0 | virtual void set_condition_cache_context(std::shared_ptr<ConditionCacheContext> ctx) {} |
382 | | |
383 | | // Total rows covered by this physical reader. TableReader uses it to pre-size the miss bitmap. |
384 | | // Readers should return 0 if the metadata is unavailable or the row coordinate is unstable. |
385 | 5 | virtual int64_t get_total_rows() const { return 0; } |
386 | | |
387 | | // 关闭当前物理文件 reader 并释放文件层状态。 |
388 | | // 该方法不处理 table-level delete/finalize 状态,后者由 TableReader 子类管理。 |
389 | 177 | virtual Status close() { |
390 | 177 | _file_reader.reset(); |
391 | 177 | _tracing_file_reader.reset(); |
392 | 177 | _io_ctx.reset(); |
393 | 177 | _eof = true; |
394 | 177 | return Status::OK(); |
395 | 177 | } |
396 | | |
397 | | protected: |
398 | 12 | virtual void _init_profile() {} |
399 | | |
400 | | io::FileReaderSPtr _file_reader; |
401 | | // _tracing_file_reader wraps _file_reader. |
402 | | // _file_reader is original file reader. |
403 | | // _tracing_file_reader is tracing file reader with io context. |
404 | | // If io_ctx is null, _tracing_file_reader will be the same as file_reader. |
405 | | io::FileReaderSPtr _tracing_file_reader = nullptr; |
406 | | std::shared_ptr<FileScanRequest> _request; |
407 | | bool _eof = true; |
408 | | ReaderStatistics _reader_statistics; |
409 | | std::shared_ptr<io::FileSystemProperties> _system_properties; |
410 | | std::unique_ptr<io::FileDescription> _file_description; |
411 | | std::shared_ptr<io::IOContext> _io_ctx; |
412 | | RuntimeProfile* _profile = nullptr; |
413 | | }; |
414 | | |
415 | | } // namespace doris::format |