be/src/format_v2/file_reader.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // http://www.apache.org/licenses/LICENSE-2.0 |
9 | | // Unless required by applicable law or agreed to in writing, |
10 | | // software distributed under the License is distributed on an |
11 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
12 | | // KIND, either express or implied. See the License for the |
13 | | // specific language governing permissions and limitations |
14 | | // under the License. |
15 | | |
16 | | #pragma once |
17 | | |
18 | | #include <algorithm> |
19 | | #include <cstddef> |
20 | | #include <cstdint> |
21 | | #include <map> |
22 | | #include <memory> |
23 | | #include <string> |
24 | | #include <utility> |
25 | | #include <vector> |
26 | | |
27 | | #include "common/status.h" |
28 | | #include "core/data_type/data_type.h" |
29 | | #include "core/field.h" |
30 | | #include "exprs/vexpr_fwd.h" |
31 | | #include "format_v2/column_data.h" |
32 | | #include "gen_cpp/PlanNodes_types.h" |
33 | | #include "io/file_factory.h" |
34 | | #include "io/fs/file_reader_writer_fwd.h" |
35 | | |
36 | | namespace doris { |
37 | | class Block; |
38 | | class ColumnPredicate; |
39 | | struct ConditionCacheContext; |
40 | | |
41 | | namespace io { |
42 | | struct IOContext; |
43 | | } // namespace io |
44 | | } // namespace doris |
45 | | |
46 | | namespace doris::format { |
47 | | |
48 | | class TableColumnMapper; |
49 | | struct TableColumnMapperOptions; |
50 | | |
51 | | // File-local single-column predicates for file-layer pruning, such as min/max, page index, |
52 | | // dictionary and bloom filter. Predicates must all belong to file_column_id. |
53 | | // These predicates are pruning hints only and are not row-level conjuncts. |
54 | | struct FileColumnPredicateFilter { |
55 | | LocalColumnId file_column_id = LocalColumnId::invalid(); |
56 | | std::vector<std::shared_ptr<ColumnPredicate>> predicates; |
57 | | |
58 | | std::string debug_string() const; |
59 | | }; |
60 | | |
61 | | enum class FileFormat { |
62 | | PARQUET, |
63 | | ORC, |
64 | | CSV, |
65 | | JSON, |
66 | | TEXT, |
67 | | JNI, |
68 | | NATIVE, |
69 | | ARROW, |
70 | | }; |
71 | | |
72 | | struct FileScanRequest { |
73 | 277 | virtual ~FileScanRequest() = default; |
74 | | |
75 | | std::string debug_string() const; |
76 | | |
77 | | // Columns that must be read before row-level filtering. They are materialized eagerly because |
78 | | // conjuncts/delete_conjuncts need them to decide the selected rows. |
79 | | std::vector<LocalColumnIndex> predicate_columns; |
80 | | // Columns read after row-level filtering. Predicate columns are also available for output and |
81 | | // should not be duplicated here. |
82 | | std::vector<LocalColumnIndex> non_predicate_columns; |
83 | | // file-local column id -> file-local output block position. |
84 | | std::map<LocalColumnId, LocalIndex> local_positions; |
85 | | // Row-level filters converted to file-local expressions from table-level predicates. |
86 | | VExprContextSPtrs conjuncts; |
87 | | // Delete predicates converted to file-local expressions. |
88 | | VExprContextSPtrs delete_conjuncts; |
89 | | // Single-column predicates used only for file-layer pruning, such as statistics, page index, |
90 | | // dictionary and bloom filter. They must not be used for batch row-level filtering. |
91 | | std::vector<FileColumnPredicateFilter> column_predicate_filters; |
92 | | }; |
93 | | |
94 | | // Helper for constructing the scan-column layout in FileScanRequest. |
95 | | // FileScanRequest keeps predicate and non-predicate columns separate because columnar readers such |
96 | | // as Parquet can read predicate columns first, filter rows, and then lazily read the remaining |
97 | | // projected columns. The two lists still share one file-local output block, whose positions are |
98 | | // stored in local_positions. This builder centralizes the mechanical rules for that shared layout: |
99 | | // - each root file column gets one stable block position; |
100 | | // - predicate columns dominate non-predicate columns because they are already returned in the file |
101 | | // block and can be reused for final materialization; |
102 | | // - repeated nested projections for the same root are merged instead of duplicated. |
103 | | // TableColumnMapper should still own table-to-file semantic resolution. This helper only owns the |
104 | | // FileScanRequest layout contract after a file-local projection has been produced. |
105 | | class FileScanRequestBuilder { |
106 | | public: |
107 | 186 | explicit FileScanRequestBuilder(FileScanRequest* request) : _request(request) { |
108 | 186 | DORIS_CHECK(_request != nullptr); |
109 | 186 | } |
110 | | |
111 | 77 | Status add_predicate_column(LocalColumnIndex projection) { |
112 | 77 | return _add_column(std::move(projection), &_request->predicate_columns, |
113 | 77 | /*is_predicate_column=*/true); |
114 | 77 | } |
115 | | |
116 | 116 | Status add_non_predicate_column(LocalColumnIndex projection) { |
117 | 116 | return _add_column(std::move(projection), &_request->non_predicate_columns, |
118 | 116 | /*is_predicate_column=*/false); |
119 | 116 | } |
120 | | |
121 | 15 | Status add_predicate_column(LocalColumnId column_id) { |
122 | 15 | return add_predicate_column(LocalColumnIndex::top_level(column_id)); |
123 | 15 | } |
124 | | |
125 | 24 | Status add_non_predicate_column(LocalColumnId column_id) { |
126 | 24 | return add_non_predicate_column(LocalColumnIndex::top_level(column_id)); |
127 | 24 | } |
128 | | |
129 | | private: |
130 | 182 | static LocalIndex _next_block_position(const FileScanRequest& request) { |
131 | 182 | size_t next_position = 0; |
132 | 182 | for (const auto& [_, block_position] : request.local_positions) { |
133 | 70 | next_position = std::max(next_position, block_position.value() + 1); |
134 | 70 | } |
135 | 182 | return LocalIndex(next_position); |
136 | 182 | } |
137 | | |
138 | 245 | static void _sort_projection_children_by_file_id(LocalColumnIndex* projection) { |
139 | 245 | DORIS_CHECK(projection != nullptr); |
140 | 245 | if (projection->project_all_children) { |
141 | 211 | return; |
142 | 211 | } |
143 | 50 | for (auto& child : projection->children) { |
144 | 50 | _sort_projection_children_by_file_id(&child); |
145 | 50 | } |
146 | 34 | std::ranges::sort(projection->children, |
147 | 34 | [](const LocalColumnIndex& lhs, const LocalColumnIndex& rhs) { |
148 | 23 | return lhs.local_id() < rhs.local_id(); |
149 | 23 | }); |
150 | 34 | } |
151 | | |
152 | | Status _add_column(LocalColumnIndex projection, std::vector<LocalColumnIndex>* scan_columns, |
153 | 193 | bool is_predicate_column) { |
154 | 193 | DORIS_CHECK(scan_columns != nullptr); |
155 | 193 | const auto file_column_id = projection.column_id(); |
156 | 193 | DORIS_CHECK(file_column_id != LocalColumnId::invalid()); |
157 | 193 | if (!is_predicate_column && |
158 | 193 | std::ranges::find_if(_request->predicate_columns, [&](const LocalColumnIndex& p) { |
159 | 6 | return p.column_id() == file_column_id; |
160 | 6 | }) != _request->predicate_columns.end()) { |
161 | 2 | return Status::OK(); |
162 | 2 | } |
163 | 191 | if (!_request->local_positions.contains(file_column_id)) { |
164 | 182 | _request->local_positions.emplace(file_column_id, _next_block_position(*_request)); |
165 | 182 | } |
166 | | |
167 | 191 | _sort_projection_children_by_file_id(&projection); |
168 | 191 | auto existing_projection_it = std::ranges::find_if( |
169 | 191 | *scan_columns, |
170 | 191 | [&](const LocalColumnIndex& p) { return p.column_id() == file_column_id; }); |
171 | 191 | if (existing_projection_it == scan_columns->end()) { |
172 | 187 | scan_columns->push_back(std::move(projection)); |
173 | 187 | } else { |
174 | 4 | RETURN_IF_ERROR(merge_local_column_index(&*existing_projection_it, projection)); |
175 | 4 | _sort_projection_children_by_file_id(&*existing_projection_it); |
176 | 4 | } |
177 | | |
178 | 191 | if (is_predicate_column) { |
179 | 77 | auto it = std::ranges::find_if( |
180 | 77 | _request->non_predicate_columns, |
181 | 77 | [&](const LocalColumnIndex& p) { return p.column_id() == file_column_id; }); |
182 | 77 | if (it != _request->non_predicate_columns.end()) { |
183 | 4 | _request->non_predicate_columns.erase(it); |
184 | 4 | } |
185 | 77 | } |
186 | 191 | return Status::OK(); |
187 | 191 | } |
188 | | |
189 | | FileScanRequest* _request = nullptr; |
190 | | }; |
191 | | |
192 | | struct FileAggregateRequest { |
193 | | struct Column { |
194 | | // File-local projection for the aggregate column. For nested MIN/MAX, this points to the |
195 | | // single primitive leaf that can be represented by file statistics. For COUNT(col), this |
196 | | // points to the top-level column whose NULL-ness should be counted. |
197 | | LocalColumnIndex projection; |
198 | | }; |
199 | | |
200 | | TPushAggOp::type agg_type = TPushAggOp::type::NONE; |
201 | | // Empty for COUNT(*)/row-count pushdown. Non-empty for COUNT(col), where the file reader must |
202 | | // return the number of non-NULL rows for the requested column instead of total rows. |
203 | | std::vector<Column> columns; |
204 | | }; |
205 | | |
206 | | struct FileAggregateResult { |
207 | | struct Column { |
208 | | // Mirrors FileAggregateRequest::Column::projection so TableReader can put the returned |
209 | | // aggregate value back into the matching projected nested shape. |
210 | | LocalColumnIndex projection; |
211 | | bool has_min = false; |
212 | | bool has_max = false; |
213 | | Field min_value; |
214 | | Field max_value; |
215 | | }; |
216 | | |
217 | | int64_t count = 0; |
218 | | std::vector<Column> columns; |
219 | | }; |
220 | | |
221 | | /** |
222 | | * +-----> get_schema() -----------------+ |
223 | | * FileReader() -----> init() ----| -----> close() |
224 | | * +-----> open() -----> get_block() ----+ |
225 | | */ |
226 | | class FileReader { |
227 | | public: |
228 | | struct ReaderStatistics { |
229 | | int32_t filtered_row_groups = 0; |
230 | | int32_t filtered_row_groups_by_min_max = 0; |
231 | | int32_t filtered_row_groups_by_bloom_filter = 0; |
232 | | int32_t read_row_groups = 0; |
233 | | int64_t filtered_group_rows = 0; |
234 | | int64_t filtered_page_rows = 0; |
235 | | int64_t lazy_read_filtered_rows = 0; |
236 | | int64_t read_rows = 0; |
237 | | int64_t filtered_bytes = 0; |
238 | | int64_t column_read_time = 0; |
239 | | int64_t parse_meta_time = 0; |
240 | | int64_t parse_footer_time = 0; |
241 | | int64_t file_footer_read_calls = 0; |
242 | | int64_t file_footer_hit_cache = 0; |
243 | | int64_t file_reader_create_time = 0; |
244 | | int64_t open_file_num = 0; |
245 | | int64_t row_group_filter_time = 0; |
246 | | int64_t page_index_filter_time = 0; |
247 | | int64_t read_page_index_time = 0; |
248 | | int64_t parse_page_index_time = 0; |
249 | | int64_t predicate_filter_time = 0; |
250 | | int64_t dict_filter_rewrite_time = 0; |
251 | | int64_t bloom_filter_read_time = 0; |
252 | | }; |
253 | | |
254 | | FileReader(std::shared_ptr<io::FileSystemProperties>& system_properties, |
255 | | std::unique_ptr<io::FileDescription>& file_description, |
256 | | std::shared_ptr<io::IOContext> io_ctx, RuntimeProfile* profile) |
257 | 206 | : _system_properties(system_properties), |
258 | 206 | _file_description(std::move(file_description)), |
259 | 206 | _io_ctx(io_ctx), |
260 | 206 | _profile(profile) {} |
261 | 206 | virtual ~FileReader() = default; |
262 | | |
263 | | // Initialize file reader and parse file metadata. |
264 | | virtual Status init(RuntimeState* state); |
265 | | |
266 | | // Set the maximum row count for the next physical read batch. Readers that do not batch by |
267 | | // rows may ignore it. |
268 | 0 | virtual void set_batch_size(size_t batch_size) { (void)batch_size; } |
269 | | |
270 | | // Get semantic file-local schema from file metadata. The file schema is determined by file |
271 | | // format and file content, and does not contain table/global schema semantics. A file reader may |
272 | | // expose raw file identifiers, such as Parquet field_id, through ColumnDefinition::identifier, |
273 | | // but it must not interpret table-format semantics such as Iceberg name mapping, |
274 | | // default/generated columns, or partition columns. File-format physical wrappers should be |
275 | | // normalized away before exposing this schema; for example, Parquet MAP is exposed as key/value |
276 | | // children rather than key_value/entry. |
277 | | // Doris plans external-table scan types as nullable, including all nested children of complex |
278 | | // types. This protects Doris from illegal or inconsistent values produced by external systems. |
279 | | // Therefore every ColumnDefinition::type returned here must be nullable. Complex types must |
280 | | // also expose nullable child types recursively, even if the physical file marks those fields as |
281 | | // required. |
282 | | // This method can only be called after init() successfully, but does not require open() to be |
283 | | // called. |
284 | | virtual Status get_schema(std::vector<ColumnDefinition>* file_schema) const = 0; |
285 | | |
286 | | // Create the mapper that matches this reader's scan-request capabilities. TableReader still |
287 | | // owns table-format semantics such as BY_NAME/BY_FIELD_ID/BY_INDEX, partition values and |
288 | | // default expressions; the FileReader only chooses whether file-local requests support columnar |
289 | | // lazy materialization/pruning or must materialize one flat list of required columns. |
290 | | virtual std::unique_ptr<TableColumnMapper> create_column_mapper( |
291 | | TableColumnMapperOptions options) const; |
292 | | |
293 | | // Open the file reader with file-local scan request. The file reader should initialize its internal state according to the request, but does not need to interpret table/global schema semantics. For example, all schema change, filter localization, default/generated/partition columns should be handled in table reader layer. This method can only be called after init() successfully. |
294 | 184 | virtual Status open(std::shared_ptr<FileScanRequest> request) { |
295 | 184 | _request = std::move(request); |
296 | 184 | return Status::OK(); |
297 | 184 | } |
298 | | |
299 | 0 | virtual Status get_block(Block* file_block, size_t* rows, bool* eof) { |
300 | 0 | if (rows != nullptr) { |
301 | 0 | *rows = 0; |
302 | 0 | } |
303 | 0 | if (eof != nullptr) { |
304 | 0 | *eof = true; |
305 | 0 | } |
306 | 0 | _eof = true; |
307 | 0 | return Status::OK(); |
308 | 0 | } |
309 | | |
310 | | virtual Status get_aggregate_result(const FileAggregateRequest& request, |
311 | 0 | FileAggregateResult* result) { |
312 | 0 | return Status::NotSupported("FileReader does not support aggregate pushdown"); |
313 | 0 | } |
314 | | |
315 | | // Condition cache is managed by TableReader and consumed by physical file readers. |
316 | | // On cache HIT, readers may skip granules whose cached bit is false before doing column IO. |
317 | | // On cache MISS, readers mark a granule true when row-level predicates keep at least one row |
318 | | // in that granule. Readers that cannot map batch rows to stable file-global row ids should |
319 | | // keep the default no-op implementation. |
320 | 0 | virtual void set_condition_cache_context(std::shared_ptr<ConditionCacheContext> ctx) {} |
321 | | |
322 | | // Total rows covered by this physical reader. TableReader uses it to pre-size the miss bitmap. |
323 | | // Readers should return 0 if the metadata is unavailable or the row coordinate is unstable. |
324 | 0 | virtual int64_t get_total_rows() const { return 0; } |
325 | | |
326 | 67 | virtual Status close() { |
327 | 67 | _file_reader.reset(); |
328 | 67 | _tracing_file_reader.reset(); |
329 | 67 | _io_ctx.reset(); |
330 | 67 | _eof = true; |
331 | 67 | return Status::OK(); |
332 | 67 | } |
333 | | |
334 | | protected: |
335 | 10 | virtual void _init_profile() {} |
336 | | |
337 | | io::FileReaderSPtr _file_reader; |
338 | | // _tracing_file_reader wraps _file_reader. |
339 | | // _file_reader is original file reader. |
340 | | // _tracing_file_reader is tracing file reader with io context. |
341 | | // If io_ctx is null, _tracing_file_reader will be the same as file_reader. |
342 | | io::FileReaderSPtr _tracing_file_reader = nullptr; |
343 | | std::shared_ptr<FileScanRequest> _request; |
344 | | bool _eof = true; |
345 | | ReaderStatistics _reader_statistics; |
346 | | std::shared_ptr<io::FileSystemProperties> _system_properties; |
347 | | std::unique_ptr<io::FileDescription> _file_description; |
348 | | std::shared_ptr<io::IOContext> _io_ctx; |
349 | | RuntimeProfile* _profile = nullptr; |
350 | | }; |
351 | | |
352 | | } // namespace doris::format |