Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <algorithm> |
21 | | #include <cstddef> |
22 | | #include <cstdint> |
23 | | #include <map> |
24 | | #include <memory> |
25 | | #include <string> |
26 | | #include <utility> |
27 | | #include <vector> |
28 | | |
29 | | #include "common/status.h" |
30 | | #include "core/data_type/data_type.h" |
31 | | #include "core/field.h" |
32 | | #include "exprs/vexpr_fwd.h" |
33 | | #include "format_v2/column_data.h" |
34 | | #include "gen_cpp/PlanNodes_types.h" |
35 | | #include "io/file_factory.h" |
36 | | #include "io/fs/file_reader_writer_fwd.h" |
37 | | |
38 | | namespace doris { |
39 | | class Block; |
40 | | class ColumnPredicate; |
41 | | struct ConditionCacheContext; |
42 | | |
43 | | namespace io { |
44 | | struct IOContext; |
45 | | } // namespace io |
46 | | } // namespace doris |
47 | | |
48 | | namespace doris::format { |
49 | | |
50 | | class TableColumnMapper; |
51 | | struct TableColumnMapperOptions; |
52 | | |
53 | | // Struct-only nested predicate target used by file-layer pruning. |
54 | | // |
55 | | // This intentionally models only a STRUCT field chain. LIST/MAP/repeated predicates need explicit |
56 | | // quantified semantics, so they must not be encoded here. |
57 | | struct FileStructPredicateTarget { |
58 | | int32_t file_local_id = -1; |
59 | | std::string file_child_name; |
60 | | std::unique_ptr<FileStructPredicateTarget> child; |
61 | | |
62 | | FileStructPredicateTarget() = default; |
63 | | FileStructPredicateTarget(int32_t local_id, std::string child_name, |
64 | | std::unique_ptr<FileStructPredicateTarget> nested_child = nullptr) |
65 | 20 | : file_local_id(local_id), |
66 | 20 | file_child_name(std::move(child_name)), |
67 | 20 | child(std::move(nested_child)) {} |
68 | | FileStructPredicateTarget(const FileStructPredicateTarget& other); |
69 | | FileStructPredicateTarget& operator=(const FileStructPredicateTarget& other); |
70 | | FileStructPredicateTarget(FileStructPredicateTarget&& other) noexcept = default; |
71 | | FileStructPredicateTarget& operator=(FileStructPredicateTarget&& other) noexcept = default; |
72 | | }; |
73 | | |
74 | | struct FileNestedPredicateTarget { |
75 | | LocalColumnId file_column_id = LocalColumnId::invalid(); |
76 | | // Null means the predicate targets the top-level primitive column itself. |
77 | | std::unique_ptr<FileStructPredicateTarget> struct_target; |
78 | | |
79 | 136 | FileNestedPredicateTarget() = default; |
80 | 19 | explicit FileNestedPredicateTarget(LocalColumnId column_id) : file_column_id(column_id) {} |
81 | | FileNestedPredicateTarget(LocalColumnId column_id, |
82 | | std::unique_ptr<FileStructPredicateTarget> target) |
83 | 18 | : file_column_id(column_id), struct_target(std::move(target)) {} |
84 | | FileNestedPredicateTarget(const FileNestedPredicateTarget& other); |
85 | | FileNestedPredicateTarget& operator=(const FileNestedPredicateTarget& other); |
86 | 101 | FileNestedPredicateTarget(FileNestedPredicateTarget&& other) noexcept = default; |
87 | 37 | FileNestedPredicateTarget& operator=(FileNestedPredicateTarget&& other) noexcept = default; |
88 | | |
89 | 353 | bool is_valid() const { return file_column_id.is_valid(); } |
90 | | }; |
91 | | |
92 | | // File-local single-column predicates for file-layer pruning, such as min/max, page index, |
93 | | // dictionary and bloom filter. |
94 | | // |
95 | | // Predicates must all belong to target.file_column_id. target.struct_target points to the nested |
96 | | // primitive leaf under that root; null means the top-level column itself is the primitive leaf. |
97 | | // These predicates are pruning hints only and are not row-level conjuncts. |
98 | | struct FileColumnPredicateFilter { |
99 | | FileNestedPredicateTarget target; |
100 | | // Compatibility fields for call sites and tests that still construct pruning filters directly. |
101 | | // New mapper code should fill target; file readers consume target first and only fall back to |
102 | | // these fields while the API migration is in progress. |
103 | | LocalColumnId file_column_id = LocalColumnId::invalid(); |
104 | | std::vector<int32_t> file_child_id_path; |
105 | | std::vector<std::shared_ptr<ColumnPredicate>> predicates; |
106 | | |
107 | | LocalColumnId effective_file_column_id() const; |
108 | | std::vector<int32_t> effective_file_child_id_path() const; |
109 | | bool same_target_as(const FileColumnPredicateFilter& other) const; |
110 | | std::string debug_string() const; |
111 | | }; |
112 | | |
113 | | enum class FileFormat { |
114 | | PARQUET, |
115 | | ORC, |
116 | | CSV, |
117 | | JSON, |
118 | | TEXT, |
119 | | JNI, |
120 | | }; |
121 | | |
122 | | // 通用文件层 scan 请求。 |
123 | | // 该结构描述所有文件格式都可以共享的 file-local 读取输入。这里不出现 table/global |
124 | | // schema。所有 schema change、filter localization、default/generated/partition |
125 | | // 列都应在 table 层完成。 |
126 | | struct FileScanRequest { |
127 | 254 | virtual ~FileScanRequest() = default; |
128 | | |
129 | | std::string debug_string() const; |
130 | | |
131 | | // Columns that must be read before row-level filtering. They are materialized eagerly because |
132 | | // conjuncts/delete_conjuncts need them to decide the selected rows. |
133 | | std::vector<LocalColumnIndex> predicate_columns; |
134 | | // Columns read after row-level filtering. Predicate columns are also available for output and |
135 | | // should not be duplicated here. |
136 | | std::vector<LocalColumnIndex> non_predicate_columns; |
137 | | // file-local column id -> file-local output block position. |
138 | | std::map<LocalColumnId, LocalIndex> local_positions; |
139 | | // Row-level filters converted to file-local expressions from table-level predicates. |
140 | | VExprContextSPtrs conjuncts; |
141 | | // Delete predicates converted to file-local expressions. |
142 | | VExprContextSPtrs delete_conjuncts; |
143 | | // Single-column predicates used only for file-layer pruning, such as statistics, page index, |
144 | | // dictionary and bloom filter. They must not be used for batch row-level filtering. |
145 | | std::vector<FileColumnPredicateFilter> column_predicate_filters; |
146 | | }; |
147 | | |
148 | | // Helper for constructing the scan-column layout in FileScanRequest. |
149 | | // |
150 | | // FileScanRequest keeps predicate and non-predicate columns separate because columnar readers such |
151 | | // as Parquet can read predicate columns first, filter rows, and then lazily read the remaining |
152 | | // projected columns. The two lists still share one file-local output block, whose positions are |
153 | | // stored in local_positions. This builder centralizes the mechanical rules for that shared layout: |
154 | | // - each root file column gets one stable block position; |
155 | | // - predicate columns dominate non-predicate columns because they are already returned in the file |
156 | | // block and can be reused for final materialization; |
157 | | // - repeated nested projections for the same root are merged instead of duplicated. |
158 | | // |
159 | | // TableColumnMapper should still own table-to-file semantic resolution. This helper only owns the |
160 | | // FileScanRequest layout contract after a file-local projection has been produced. |
161 | | class FileScanRequestBuilder { |
162 | | public: |
163 | 176 | explicit FileScanRequestBuilder(FileScanRequest* request) : _request(request) { |
164 | 176 | DORIS_CHECK(_request != nullptr); |
165 | 176 | } |
166 | | |
167 | 75 | Status add_predicate_column(LocalColumnIndex projection) { |
168 | 75 | return _add_column(std::move(projection), &_request->predicate_columns, |
169 | 75 | /*is_predicate_column=*/true); |
170 | 75 | } |
171 | | |
172 | 104 | Status add_non_predicate_column(LocalColumnIndex projection) { |
173 | 104 | return _add_column(std::move(projection), &_request->non_predicate_columns, |
174 | 104 | /*is_predicate_column=*/false); |
175 | 104 | } |
176 | | |
177 | 14 | Status add_predicate_column(LocalColumnId column_id) { |
178 | 14 | return add_predicate_column(LocalColumnIndex::top_level(column_id)); |
179 | 14 | } |
180 | | |
181 | 13 | Status add_non_predicate_column(LocalColumnId column_id) { |
182 | 13 | return add_non_predicate_column(LocalColumnIndex::top_level(column_id)); |
183 | 13 | } |
184 | | |
185 | | private: |
186 | 168 | static LocalIndex _next_block_position(const FileScanRequest& request) { |
187 | 168 | size_t next_position = 0; |
188 | 168 | for (const auto& [_, block_position] : request.local_positions) { |
189 | 65 | next_position = std::max(next_position, block_position.value() + 1); |
190 | 65 | } |
191 | 168 | return LocalIndex(next_position); |
192 | 168 | } |
193 | | |
194 | 231 | static void _sort_projection_children_by_file_id(LocalColumnIndex* projection) { |
195 | 231 | DORIS_CHECK(projection != nullptr); |
196 | 231 | if (projection->project_all_children) { |
197 | 197 | return; |
198 | 197 | } |
199 | 50 | for (auto& child : projection->children) { |
200 | 50 | _sort_projection_children_by_file_id(&child); |
201 | 50 | } |
202 | 34 | std::ranges::sort(projection->children, |
203 | 34 | [](const LocalColumnIndex& lhs, const LocalColumnIndex& rhs) { |
204 | 23 | return lhs.local_id() < rhs.local_id(); |
205 | 23 | }); |
206 | 34 | } |
207 | | |
208 | | Status _add_column(LocalColumnIndex projection, std::vector<LocalColumnIndex>* scan_columns, |
209 | 179 | bool is_predicate_column) { |
210 | 179 | DORIS_CHECK(scan_columns != nullptr); |
211 | 179 | const auto file_column_id = projection.column_id(); |
212 | 179 | DORIS_CHECK(file_column_id != LocalColumnId::invalid()); |
213 | 179 | if (!is_predicate_column && |
214 | 179 | std::ranges::find_if(_request->predicate_columns, [&](const LocalColumnIndex& p) { |
215 | 5 | return p.column_id() == file_column_id; |
216 | 5 | }) != _request->predicate_columns.end()) { |
217 | 2 | return Status::OK(); |
218 | 2 | } |
219 | 177 | if (!_request->local_positions.contains(file_column_id)) { |
220 | 168 | _request->local_positions.emplace(file_column_id, _next_block_position(*_request)); |
221 | 168 | } |
222 | | |
223 | 177 | _sort_projection_children_by_file_id(&projection); |
224 | 177 | auto existing_projection_it = std::ranges::find_if( |
225 | 177 | *scan_columns, |
226 | 177 | [&](const LocalColumnIndex& p) { return p.column_id() == file_column_id; }); |
227 | 177 | if (existing_projection_it == scan_columns->end()) { |
228 | 173 | scan_columns->push_back(std::move(projection)); |
229 | 173 | } else { |
230 | 4 | RETURN_IF_ERROR(merge_local_column_index(&*existing_projection_it, projection)); |
231 | 4 | _sort_projection_children_by_file_id(&*existing_projection_it); |
232 | 4 | } |
233 | | |
234 | 177 | if (is_predicate_column) { |
235 | 75 | auto it = std::ranges::find_if( |
236 | 75 | _request->non_predicate_columns, |
237 | 75 | [&](const LocalColumnIndex& p) { return p.column_id() == file_column_id; }); |
238 | 75 | if (it != _request->non_predicate_columns.end()) { |
239 | 4 | _request->non_predicate_columns.erase(it); |
240 | 4 | } |
241 | 75 | } |
242 | 177 | return Status::OK(); |
243 | 177 | } |
244 | | |
245 | | FileScanRequest* _request = nullptr; |
246 | | }; |
247 | | |
248 | | struct FileAggregateRequest { |
249 | | struct Column { |
250 | | // File-local projection for the aggregate column. For nested MIN/MAX, this points to the |
251 | | // single primitive leaf that can be represented by file statistics. |
252 | | LocalColumnIndex projection; |
253 | | }; |
254 | | |
255 | | TPushAggOp::type agg_type = TPushAggOp::type::NONE; |
256 | | std::vector<Column> columns; |
257 | | }; |
258 | | |
259 | | struct FileAggregateResult { |
260 | | struct Column { |
261 | | // Mirrors FileAggregateRequest::Column::projection so TableReader can put the returned |
262 | | // aggregate value back into the matching projected nested shape. |
263 | | LocalColumnIndex projection; |
264 | | bool has_min = false; |
265 | | bool has_max = false; |
266 | | Field min_value; |
267 | | Field max_value; |
268 | | }; |
269 | | |
270 | | int64_t count = 0; |
271 | | std::vector<Column> columns; |
272 | | }; |
273 | | |
274 | | // 文件物理读取层通用接口。 |
275 | | // 该接口只描述 file-local schema、file-local scan request 和 file-local block。 |
276 | | // TableReader/IcebergTableReader 可以通过它组合不同文件格式 reader。 |
277 | | /** |
278 | | * +-----> get_schema() -----------------+ |
279 | | * FileReader() -----> init() ----| -----> close() |
280 | | * +-----> open() -----> get_block() ----+ |
281 | | */ |
282 | | class FileReader { |
283 | | public: |
284 | | struct ReaderStatistics { |
285 | | int32_t filtered_row_groups = 0; |
286 | | int32_t filtered_row_groups_by_min_max = 0; |
287 | | int32_t filtered_row_groups_by_bloom_filter = 0; |
288 | | int32_t read_row_groups = 0; |
289 | | int64_t filtered_group_rows = 0; |
290 | | int64_t filtered_page_rows = 0; |
291 | | int64_t lazy_read_filtered_rows = 0; |
292 | | int64_t read_rows = 0; |
293 | | int64_t filtered_bytes = 0; |
294 | | int64_t column_read_time = 0; |
295 | | int64_t parse_meta_time = 0; |
296 | | int64_t parse_footer_time = 0; |
297 | | int64_t file_footer_read_calls = 0; |
298 | | int64_t file_footer_hit_cache = 0; |
299 | | int64_t file_reader_create_time = 0; |
300 | | int64_t open_file_num = 0; |
301 | | int64_t row_group_filter_time = 0; |
302 | | int64_t page_index_filter_time = 0; |
303 | | int64_t read_page_index_time = 0; |
304 | | int64_t parse_page_index_time = 0; |
305 | | int64_t predicate_filter_time = 0; |
306 | | int64_t dict_filter_rewrite_time = 0; |
307 | | int64_t bloom_filter_read_time = 0; |
308 | | }; |
309 | | |
310 | | FileReader(std::shared_ptr<io::FileSystemProperties>& system_properties, |
311 | | std::unique_ptr<io::FileDescription>& file_description, |
312 | | std::shared_ptr<io::IOContext> io_ctx, RuntimeProfile* profile) |
313 | 175 | : _system_properties(system_properties), |
314 | 175 | _file_description(std::move(file_description)), |
315 | 175 | _io_ctx(io_ctx), |
316 | 175 | _profile(profile) {} |
317 | 175 | virtual ~FileReader() = default; |
318 | | |
319 | | // Initialize file reader and parse file metadata. |
320 | | virtual Status init(RuntimeState* state); |
321 | | |
322 | | // Get semantic file-local schema from file metadata. The file schema is determined by file |
323 | | // format and file content, and does not contain table/global schema semantics. A file reader may |
324 | | // expose raw file identifiers, such as Parquet field_id, through ColumnDefinition::identifier, |
325 | | // but it must not interpret table-format semantics such as Iceberg name mapping, |
326 | | // default/generated columns, or partition columns. File-format physical wrappers should be |
327 | | // normalized away before exposing this schema; for example, Parquet MAP is exposed as key/value |
328 | | // children rather than key_value/entry. |
329 | | // |
330 | | // Doris plans external-table scan types as nullable, including all nested children of complex |
331 | | // types. This protects Doris from illegal or inconsistent values produced by external systems. |
332 | | // Therefore every ColumnDefinition::type returned here must be nullable. Complex types must |
333 | | // also expose nullable child types recursively, even if the physical file marks those fields as |
334 | | // required. |
335 | | // |
336 | | // This method can only be called after init() successfully, but does not require open() to be |
337 | | // called. |
338 | | virtual Status get_schema(std::vector<ColumnDefinition>* file_schema) const = 0; |
339 | | |
340 | | // Create the mapper that matches this reader's scan-request capabilities. TableReader still |
341 | | // owns table-format semantics such as BY_NAME/BY_FIELD_ID/BY_INDEX, partition values and |
342 | | // default expressions; the FileReader only chooses whether file-local requests support columnar |
343 | | // lazy materialization/pruning or must materialize one flat list of required columns. |
344 | | virtual std::unique_ptr<TableColumnMapper> create_column_mapper( |
345 | | TableColumnMapperOptions options) const; |
346 | | |
347 | | // Open the file reader with file-local scan request. The file reader should initialize its internal state according to the request, but does not need to interpret table/global schema semantics. For example, all schema change, filter localization, default/generated/partition columns should be handled in table reader layer. This method can only be called after init() successfully. |
348 | 161 | virtual Status open(std::shared_ptr<FileScanRequest> request) { |
349 | 161 | _request = std::move(request); |
350 | 161 | return Status::OK(); |
351 | 161 | } |
352 | | |
353 | | // 读取下一批 file-local block。 |
354 | | // 该方法只能在 open(FileScanRequest) 成功后调用。 |
355 | | // file_block 的列顺序和类型必须遵守 FileScanRequest,而不是 table/global schema。 |
356 | | // rows 返回当前批次输出行数;eof 表示当前文件 reader 是否读完;多文件切换由 |
357 | | // TableReader 负责。 |
358 | 0 | virtual Status get_block(Block* file_block, size_t* rows, bool* eof) { |
359 | | // stub 默认立即 EOF。 |
360 | 0 | if (rows != nullptr) { |
361 | 0 | *rows = 0; |
362 | 0 | } |
363 | 0 | if (eof != nullptr) { |
364 | 0 | *eof = true; |
365 | 0 | } |
366 | 0 | _eof = true; |
367 | 0 | return Status::OK(); |
368 | 0 | } |
369 | | |
370 | | virtual Status get_aggregate_result(const FileAggregateRequest& request, |
371 | 0 | FileAggregateResult* result) { |
372 | 0 | return Status::NotSupported("FileReader does not support aggregate pushdown"); |
373 | 0 | } |
374 | | |
375 | | // Condition cache is managed by TableReader and consumed by physical file readers. |
376 | | // On cache HIT, readers may skip granules whose cached bit is false before doing column IO. |
377 | | // On cache MISS, readers mark a granule true when row-level predicates keep at least one row |
378 | | // in that granule. Readers that cannot map batch rows to stable file-global row ids should |
379 | | // keep the default no-op implementation. |
380 | 0 | virtual void set_condition_cache_context(std::shared_ptr<ConditionCacheContext> ctx) {} |
381 | | |
382 | | // Total rows covered by this physical reader. TableReader uses it to pre-size the miss bitmap. |
383 | | // Readers should return 0 if the metadata is unavailable or the row coordinate is unstable. |
384 | 0 | virtual int64_t get_total_rows() const { return 0; } |
385 | | |
386 | | // 关闭当前物理文件 reader 并释放文件层状态。 |
387 | | // 该方法不处理 table-level delete/finalize 状态,后者由 TableReader 子类管理。 |
388 | 67 | virtual Status close() { |
389 | 67 | _file_reader.reset(); |
390 | 67 | _tracing_file_reader.reset(); |
391 | 67 | _io_ctx.reset(); |
392 | 67 | _eof = true; |
393 | 67 | return Status::OK(); |
394 | 67 | } |
395 | | |
396 | | protected: |
397 | 0 | virtual void _init_profile() {} |
398 | | |
399 | | io::FileReaderSPtr _file_reader; |
400 | | // _tracing_file_reader wraps _file_reader. |
401 | | // _file_reader is original file reader. |
402 | | // _tracing_file_reader is tracing file reader with io context. |
403 | | // If io_ctx is null, _tracing_file_reader will be the same as file_reader. |
404 | | io::FileReaderSPtr _tracing_file_reader = nullptr; |
405 | | std::shared_ptr<FileScanRequest> _request; |
406 | | bool _eof = true; |
407 | | ReaderStatistics _reader_statistics; |
408 | | std::shared_ptr<io::FileSystemProperties> _system_properties; |
409 | | std::unique_ptr<io::FileDescription> _file_description; |
410 | | std::shared_ptr<io::IOContext> _io_ctx; |
411 | | RuntimeProfile* _profile = nullptr; |
412 | | }; |
413 | | |
414 | | } // namespace doris::format |