be/src/format/generic_reader.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <gen_cpp/PlanNodes_types.h> |
21 | | |
22 | | #include <functional> |
23 | | #include <memory> |
24 | | #include <set> |
25 | | #include <string> |
26 | | #include <tuple> |
27 | | #include <unordered_map> |
28 | | #include <vector> |
29 | | |
30 | | #include "common/status.h" |
31 | | #include "core/column/column.h" |
32 | | #include "core/column/column_nullable.h" |
33 | | #include "core/data_type/data_type.h" |
34 | | #include "exprs/vexpr.h" |
35 | | #include "exprs/vexpr_context.h" |
36 | | #include "exprs/vexpr_fwd.h" |
37 | | #include "format/column_descriptor.h" |
38 | | #include "format/table/table_schema_change_helper.h" |
39 | | #include "runtime/descriptors.h" |
40 | | #include "runtime/runtime_state.h" |
41 | | #include "storage/predicate/block_column_predicate.h" |
42 | | #include "storage/segment/common.h" |
43 | | #include "util/profile_collector.h" |
44 | | |
45 | | namespace doris { |
46 | | class ColumnPredicate; |
47 | | } // namespace doris |
48 | | |
49 | | namespace doris { |
50 | | |
51 | | class Block; |
52 | | class VSlotRef; |
53 | | |
54 | | // Context passed from FileScanner to readers for condition cache integration. |
55 | | // On MISS: readers populate filter_result per-granule during predicate evaluation. |
56 | | // On HIT: readers skip granules where filter_result[granule] == false. |
57 | | struct ConditionCacheContext { |
58 | | bool is_hit = false; |
59 | | std::shared_ptr<std::vector<bool>> filter_result; // per-granule: true = has surviving rows |
60 | | int64_t base_granule = 0; // global granule index of the first granule in filter_result |
61 | | static constexpr int GRANULE_SIZE = 2048; |
62 | | }; |
63 | | |
64 | | /// Base context for the unified init_reader(ReaderInitContext*) template method. |
65 | | /// Contains fields shared by ALL reader types. Format-specific readers define |
66 | | /// subclasses (ParquetInitContext, OrcInitContext, etc.) with extra fields. |
67 | | /// FileScanner allocates the appropriate subclass and populates the shared fields |
68 | | /// before calling init_reader(). |
69 | | struct ReaderInitContext { |
70 | 79.6k | virtual ~ReaderInitContext() = default; |
71 | | |
72 | | // ---- Owned by FileScanner, shared by all readers ---- |
73 | | std::vector<ColumnDescriptor>* column_descs = nullptr; |
74 | | std::unordered_map<std::string, uint32_t>* col_name_to_block_idx = nullptr; |
75 | | RuntimeState* state = nullptr; |
76 | | const TupleDescriptor* tuple_descriptor = nullptr; |
77 | | const RowDescriptor* row_descriptor = nullptr; |
78 | | const TFileScanRangeParams* params = nullptr; |
79 | | const TFileRangeDesc* range = nullptr; |
80 | | TPushAggOp::type push_down_agg_type = TPushAggOp::type::NONE; |
81 | | |
82 | | // ---- Output slots (populated by on_before_init_reader, consumed by _do_init_reader) ---- |
83 | | // column_names: the list of file columns to read. Populated by on_before_init_reader |
84 | | // from column_descs (slot→name mapping). _do_init_reader uses this to configure the |
85 | | // format-specific parsing engine. For standalone readers (column_descs==nullptr), |
86 | | // callers populate column_names directly before calling init_reader. |
87 | | std::vector<std::string> column_names; |
88 | | std::shared_ptr<TableSchemaChangeHelper::Node> table_info_node = |
89 | | TableSchemaChangeHelper::ConstNode::get_instance(); |
90 | | std::set<uint64_t> column_ids; |
91 | | std::set<uint64_t> filter_column_ids; |
92 | | }; |
93 | | |
94 | | /// Safe downcast for ReaderInitContext subclasses. |
95 | | /// Uses dynamic_cast + DORIS_CHECK: crashes on type mismatch (per Doris coding standards). |
96 | | template <typename To, typename From> |
97 | 74.2k | To* checked_context_cast(From* ptr) { |
98 | 74.2k | auto* result = dynamic_cast<To*>(ptr); |
99 | 74.2k | DORIS_CHECK(result != nullptr); |
100 | 74.2k | return result; |
101 | 74.2k | } _ZN5doris20checked_context_castINS_14CsvInitContextENS_17ReaderInitContextEEEPT_PT0_ Line | Count | Source | 97 | 7.39k | To* checked_context_cast(From* ptr) { | 98 | 7.39k | auto* result = dynamic_cast<To*>(ptr); | 99 | 7.39k | DORIS_CHECK(result != nullptr); | 100 | 7.39k | return result; | 101 | 7.39k | } |
_ZN5doris20checked_context_castINS_14OrcInitContextENS_17ReaderInitContextEEEPT_PT0_ Line | Count | Source | 97 | 29.1k | To* checked_context_cast(From* ptr) { | 98 | 29.1k | auto* result = dynamic_cast<To*>(ptr); | 99 | 29.1k | DORIS_CHECK(result != nullptr); | 100 | 29.1k | return result; | 101 | 29.1k | } |
_ZN5doris20checked_context_castINS_15JsonInitContextENS_17ReaderInitContextEEEPT_PT0_ Line | Count | Source | 97 | 823 | To* checked_context_cast(From* ptr) { | 98 | 823 | auto* result = dynamic_cast<To*>(ptr); | 99 | 823 | DORIS_CHECK(result != nullptr); | 100 | 823 | return result; | 101 | 823 | } |
_ZN5doris20checked_context_castINS_18ParquetInitContextENS_17ReaderInitContextEEEPT_PT0_ Line | Count | Source | 97 | 36.8k | To* checked_context_cast(From* ptr) { | 98 | 36.8k | auto* result = dynamic_cast<To*>(ptr); | 99 | 36.8k | DORIS_CHECK(result != nullptr); | 100 | 36.8k | return result; | 101 | 36.8k | } |
Unexecuted instantiation: _ZN5doris20checked_context_castINS_14WalInitContextENS_17ReaderInitContextEEEPT_PT0_ |
102 | | |
103 | | /// Base reader interface for all file readers. |
104 | | /// A GenericReader is responsible for reading a file and returning |
105 | | /// a set of blocks with specified schema. |
106 | | /// |
107 | | /// Provides hook virtual methods that implement the Template Method pattern: |
108 | | /// init_reader: _open_file_reader → on_before_init_reader → _do_init_reader → on_after_init_reader |
109 | | /// get_next_block: on_before_read_block → _do_get_next_block → on_after_read_block |
110 | | /// |
111 | | /// Column-filling logic (partition/missing/synthesized) lives in TableFormatReader. |
112 | | class GenericReader : public ProfileCollector { |
113 | | public: |
114 | 88.5k | GenericReader() : _push_down_agg_type(TPushAggOp::type::NONE) {} |
115 | 86.2k | void set_push_down_agg_type(TPushAggOp::type push_down_agg_type) { |
116 | 86.2k | _push_down_agg_type = push_down_agg_type; |
117 | 86.2k | } |
118 | 71.9k | TPushAggOp::type get_push_down_agg_type() const { return _push_down_agg_type; } |
119 | | |
120 | | /// Template method for reading blocks. |
121 | | /// Calls: on_before_read_block → _do_get_next_block → on_after_read_block |
122 | 243k | Status get_next_block(Block* block, size_t* read_rows, bool* eof) { |
123 | 243k | RETURN_IF_ERROR(on_before_read_block(block)); |
124 | 243k | RETURN_IF_ERROR(_do_get_next_block(block, read_rows, eof)); |
125 | 243k | RETURN_IF_ERROR(on_after_read_block(block, read_rows)); |
126 | 243k | return Status::OK(); |
127 | 243k | } |
128 | | |
129 | | // Override this in readers that can adjust batch size between consecutive reads. |
130 | 0 | virtual void set_batch_size(size_t batch_size) {} |
131 | 0 | virtual size_t get_batch_size() const { return 0; } |
132 | | |
133 | | // Type is always nullable to process illegal values. |
134 | | // Results are cached after the first successful call. |
135 | 83.1k | Status get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type) { |
136 | 83.1k | if (_get_columns_cached) { |
137 | 8.33k | *name_to_type = _cached_name_to_type; |
138 | 8.33k | return Status::OK(); |
139 | 8.33k | } |
140 | 74.7k | RETURN_IF_ERROR(_get_columns_impl(name_to_type)); |
141 | 74.7k | _cached_name_to_type = *name_to_type; |
142 | 74.7k | _get_columns_cached = true; |
143 | | |
144 | 74.7k | return Status::OK(); |
145 | 74.7k | } |
146 | | |
147 | 0 | virtual Status _get_columns_impl(std::unordered_map<std::string, DataTypePtr>* name_to_type) { |
148 | 0 | return Status::NotSupported("get_columns is not implemented"); |
149 | 0 | } |
150 | | |
151 | | // This method is responsible for initializing the resource for parsing schema. |
152 | | // It will be called before `get_parsed_schema`. |
153 | 0 | virtual Status init_schema_reader() { |
154 | 0 | return Status::NotSupported("init_schema_reader is not implemented for this reader."); |
155 | 0 | } |
156 | | // `col_types` is always nullable to process illegal values. |
157 | | virtual Status get_parsed_schema(std::vector<std::string>* col_names, |
158 | 0 | std::vector<DataTypePtr>* col_types) { |
159 | 0 | return Status::NotSupported("get_parsed_schema is not implemented for this reader."); |
160 | 0 | } |
161 | 88.7k | ~GenericReader() override = default; |
162 | | |
163 | 27.3k | virtual Status close() { return Status::OK(); } |
164 | | |
165 | 3.19k | Status read_by_rows(const std::list<int64_t>& row_ids) { |
166 | 3.19k | _read_by_rows = true; |
167 | 3.19k | _row_ids = row_ids; |
168 | 3.19k | return _set_read_one_line_impl(); |
169 | 3.19k | } |
170 | | |
171 | | /// The reader is responsible for counting the number of rows read, |
172 | | /// because some readers, such as parquet/orc, |
173 | | /// can skip some pages/rowgroups through indexes. |
174 | 20.4k | virtual bool count_read_rows() { return false; } |
175 | | |
176 | | /// Returns true if on_before_init_reader has already set _column_descs. |
177 | 66.0k | bool has_column_descs() const { return _column_descs != nullptr; } |
178 | | |
179 | | /// Unified initialization entry point (NVI pattern). |
180 | | /// Enforces the template method sequence for ALL readers: |
181 | | /// _open_file_reader → on_before_init_reader → _do_init_reader → on_after_init_reader |
182 | | /// Subclasses implement _open_file_reader and _do_init_reader(ReaderInitContext*). |
183 | | /// FileScanner constructs the appropriate ReaderInitContext subclass and calls this. |
184 | | /// |
185 | | /// NOTE: During migration, readers not yet ported to this API still use their |
186 | | /// format-specific init_reader(...) methods. This method is non-virtual so it |
187 | | /// cannot be accidentally overridden. |
188 | 79.6k | Status init_reader(ReaderInitContext* ctx) { |
189 | | // Apply push_down_agg_type early so _open_file_reader and _do_init_reader |
190 | | // can use it (e.g., PaimonCppReader skips full init on COUNT pushdown). |
191 | | // on_after_init_reader may reset this (e.g., Iceberg with equality deletes). |
192 | 79.6k | set_push_down_agg_type(ctx->push_down_agg_type); |
193 | | |
194 | 79.6k | RETURN_IF_ERROR(_open_file_reader(ctx)); |
195 | | |
196 | | // Standalone readers (delete file readers, push handler) set column_descs=nullptr |
197 | | // and pre-populate column_names directly. Skip hooks for them. |
198 | 79.6k | if (ctx->column_descs != nullptr) { |
199 | 74.4k | RETURN_IF_ERROR(on_before_init_reader(ctx)); |
200 | 74.4k | } |
201 | | |
202 | 79.6k | RETURN_IF_ERROR(_do_init_reader(ctx)); |
203 | | |
204 | 79.6k | if (ctx->column_descs != nullptr) { |
205 | 74.4k | RETURN_IF_ERROR(on_after_init_reader(ctx)); |
206 | 74.4k | } |
207 | | |
208 | 79.6k | return Status::OK(); |
209 | 79.6k | } |
210 | | |
211 | | /// Hook called before core init. Default just sets _column_descs. |
212 | | /// TableFormatReader overrides with partition/missing column computation. |
213 | | /// ORC/Parquet/Hive/Iceberg further override with format-specific schema matching. |
214 | 5.18k | virtual Status on_before_init_reader(ReaderInitContext* ctx) { |
215 | 5.18k | _column_descs = ctx->column_descs; |
216 | 5.18k | return Status::OK(); |
217 | 5.18k | } |
218 | | |
219 | | protected: |
220 | | // ---- Init-time hooks (Template Method for init_reader) ---- |
221 | | |
222 | | /// Opens the file and prepares I/O resources before hooks run. Override in |
223 | | /// subclasses to open files, read metadata, set up decompressors, etc. |
224 | | /// For Parquet/ORC, opens the file and reads footer metadata. |
225 | | /// For CSV/JSON, opens the file, creates decompressors, and sets up line readers. |
226 | | /// Default is no-op (for JNI, Native, Arrow readers). |
227 | 5.29k | virtual Status _open_file_reader(ReaderInitContext* /*ctx*/) { return Status::OK(); } |
228 | | |
229 | | /// Core initialization (format-specific). Subclasses override to perform |
230 | | /// their actual parsing engine setup. The context should be downcast to |
231 | | /// the appropriate subclass using checked_context_cast<T>. |
232 | | /// Default returns NotSupported — readers not yet migrated to the unified |
233 | | /// init_reader(ReaderInitContext*) API still use their old init methods. |
234 | 0 | virtual Status _do_init_reader(ReaderInitContext* /*ctx*/) { |
235 | 0 | return Status::NotSupported( |
236 | 0 | "_do_init_reader(ReaderInitContext*) not yet implemented for this reader"); |
237 | 0 | } |
238 | | |
239 | | // ---- Existing init-time hooks ---- |
240 | | |
241 | | /// Called after core init completes. Subclasses override to process |
242 | | /// delete files, deletion vectors, etc. |
243 | 69.4k | virtual Status on_after_init_reader(ReaderInitContext* /*ctx*/) { return Status::OK(); } |
244 | | |
245 | | // ---- Read-time hooks ---- |
246 | | |
247 | | /// Called before reading a block. Subclasses override to modify block |
248 | | /// structure (e.g. add ACID columns, expand for equality delete). |
249 | 188k | virtual Status on_before_read_block(Block* block) { return Status::OK(); } |
250 | | |
251 | | /// Core block reading. Subclasses must override with actual read logic. |
252 | | virtual Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof) = 0; |
253 | | |
254 | | /// Called after reading a block. Subclasses override to post-process |
255 | | /// (e.g. remove ACID columns, apply equality delete filter). |
256 | 15.2k | virtual Status on_after_read_block(Block* block, size_t* read_rows) { return Status::OK(); } |
257 | | |
258 | 0 | virtual Status _set_read_one_line_impl() { |
259 | 0 | return Status::NotSupported("read_by_rows is not implemented for this reader."); |
260 | 0 | } |
261 | | |
262 | | const size_t _MIN_BATCH_SIZE = 4064; // 4094 - 32(padding) |
263 | | |
264 | | // never let batch size be 0 because _do_get_next_block uses it as the |
265 | | // upper bound of a `while (block->rows() < batch_size)` loop and a 0 would make the reader |
266 | | // return without setting eof, causing the scanner to spin on empty blocks. |
267 | | const size_t _DEFAULT_BATCH_SIZE = 4064; // 4094 - 32(padding) |
268 | | TPushAggOp::type _push_down_agg_type {}; |
269 | | |
270 | | public: |
271 | | // Pass condition cache context to the reader for HIT/MISS tracking. |
272 | 0 | virtual void set_condition_cache_context(std::shared_ptr<ConditionCacheContext> ctx) {} |
273 | | |
274 | | // Returns true if this reader can produce an accurate total row count from metadata |
275 | | // without reading actual data. Used to determine if CountReader decorator can be applied. |
276 | | // Only ORC and Parquet readers support this (via file footer metadata). |
277 | 1.03k | virtual bool supports_count_pushdown() const { return false; } |
278 | | |
279 | | // Returns the total number of rows the reader will produce. |
280 | | // Used to pre-allocate condition cache with the correct number of granules. |
281 | 1.80k | virtual int64_t get_total_rows() const { return 0; } |
282 | | |
283 | | // Returns true if this reader has delete operations (e.g. Iceberg position/equality deletes, |
284 | | // Hive ACID deletes). Used to disable condition cache when deletes are present, since cached |
285 | | // granule results may become stale if delete files change between queries. |
286 | 1.80k | virtual bool has_delete_operations() const { return false; } |
287 | | |
288 | | protected: |
289 | | bool _read_by_rows = false; |
290 | | std::list<int64_t> _row_ids; |
291 | | |
292 | | // Cache to save some common part such as file footer. |
293 | | // Maybe null if not used |
294 | | FileMetaCache* _meta_cache = nullptr; |
295 | | |
296 | | // ---- Column descriptors (set by init_reader, owned by FileScanner) ---- |
297 | | const std::vector<ColumnDescriptor>* _column_descs = nullptr; |
298 | | |
299 | | // ---- get_columns cache ---- |
300 | | bool _get_columns_cached = false; |
301 | | std::unordered_map<std::string, DataTypePtr> _cached_name_to_type; |
302 | | }; |
303 | | |
304 | | /// Provides an accessor for the current batch's row positions within the file. |
305 | | /// Implemented by RowGroupReader (Parquet) and OrcReader. |
306 | | class RowPositionProvider { |
307 | | public: |
308 | 70.7k | virtual ~RowPositionProvider() = default; |
309 | | virtual const std::vector<segment_v2::rowid_t>& current_batch_row_positions() const = 0; |
310 | | }; |
311 | | |
312 | | } // namespace doris |