be/src/format_v2/table_reader.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <bvar/status.h> |
21 | | |
22 | | #include <algorithm> |
23 | | #include <exception> |
24 | | #include <map> |
25 | | #include <memory> |
26 | | #include <optional> |
27 | | #include <string> |
28 | | #include <string_view> |
29 | | #include <utility> |
30 | | #include <vector> |
31 | | |
32 | | #include "common/cast_set.h" |
33 | | #include "common/exception.h" |
34 | | #include "common/logging.h" |
35 | | #include "common/status.h" |
36 | | #include "core/assert_cast.h" |
37 | | #include "core/block/block.h" |
38 | | #include "core/column/column_array.h" |
39 | | #include "core/column/column_const.h" |
40 | | #include "core/column/column_map.h" |
41 | | #include "core/column/column_nullable.h" |
42 | | #include "core/column/column_struct.h" |
43 | | #include "core/column/column_vector.h" |
44 | | #include "core/data_type/data_type.h" |
45 | | #include "core/data_type/data_type_array.h" |
46 | | #include "core/data_type/data_type_map.h" |
47 | | #include "core/data_type/data_type_nullable.h" |
48 | | #include "core/data_type/data_type_number.h" |
49 | | #include "core/data_type/data_type_string.h" |
50 | | #include "core/data_type/data_type_struct.h" |
51 | | #include "core/field.h" |
52 | | #include "exec/common/stringop_substring.h" |
53 | | #include "exprs/vexpr.h" |
54 | | #include "exprs/vexpr_context.h" |
55 | | #include "exprs/vexpr_fwd.h" |
56 | | #include "exprs/vslot_ref.h" |
57 | | #include "format_v2/column_data.h" |
58 | | #include "format_v2/column_mapper.h" |
59 | | #include "format_v2/expr/cast.h" |
60 | | #include "format_v2/expr/delete_predicate.h" |
61 | | #include "format_v2/file_reader.h" |
62 | | #include "format_v2/parquet/reader/column_reader.h" |
63 | | #include "format_v2/schema_projection.h" |
64 | | #include "gen_cpp/PlanNodes_types.h" |
65 | | #include "runtime/descriptors.h" |
66 | | #include "storage/segment/condition_cache.h" |
67 | | |
68 | | namespace doris { |
69 | | class Block; |
70 | | class ColumnPredicate; |
71 | | struct DeleteFileDesc; |
72 | | class RuntimeState; |
73 | | } // namespace doris |
74 | | |
75 | | namespace doris::format { |
76 | | |
77 | | using DeleteRows = std::vector<int64_t>; |
78 | | |
79 | | // Row-level predicates on table/global schema. They are rewritten to file-local expressions when |
80 | | // possible, and remain the source of row-level filtering after localization. |
81 | | struct TableFilter { |
82 | | VExprContextSPtr conjunct; |
83 | | std::vector<GlobalIndex> global_indices; |
84 | | }; |
85 | | |
86 | | struct ScanTask { |
87 | 547 | virtual ~ScanTask() = default; |
88 | | |
89 | | std::unique_ptr<io::FileDescription> data_file; |
90 | | }; |
91 | | |
92 | | struct ProjectedColumnBuildContext { |
93 | | const TFileScanRangeParams* scan_params = nullptr; |
94 | | const TFileRangeDesc* range = nullptr; |
95 | | RuntimeState* runtime_state = nullptr; |
96 | | std::optional<ColumnDefinition> schema_column = std::nullopt; |
97 | | size_t next_file_column_idx = 0; |
98 | | }; |
99 | | |
100 | | struct ReadProfile { |
101 | | RuntimeProfile::Counter* num_delete_files = nullptr; |
102 | | RuntimeProfile::Counter* num_delete_rows = nullptr; |
103 | | RuntimeProfile::Counter* parse_delete_file_time = nullptr; |
104 | | RuntimeProfile::Counter* exec_timer = nullptr; |
105 | | RuntimeProfile::Counter* prepare_split_timer = nullptr; |
106 | | RuntimeProfile::Counter* finalize_timer = nullptr; |
107 | | RuntimeProfile::Counter* create_reader_timer = nullptr; |
108 | | RuntimeProfile::Counter* pushdown_agg_timer = nullptr; |
109 | | RuntimeProfile::Counter* open_reader_timer = nullptr; |
110 | | }; |
111 | | |
112 | | struct TableReadOptions { |
113 | | // Columns need to be read from file and output by table reader. They are all in table/global |
114 | | // schema semantics. |
115 | | const std::vector<ColumnDefinition> projected_columns; |
116 | | // Simple predicates for a single column, which is parsed on scan operator. |
117 | | const TableColumnPredicates column_predicates; |
118 | | // All complex conjuncts from scan operator |
119 | | const VExprContextSPtrs conjuncts; |
120 | | // File format of the underlying data files, needed for reader initialization and reader-level |
121 | | // filter pushdown. |
122 | | const FileFormat format; |
123 | | TFileScanRangeParams* scan_params; |
124 | | std::shared_ptr<io::IOContext> io_ctx; |
125 | | RuntimeState* runtime_state; |
126 | | RuntimeProfile* scanner_profile; |
127 | | // File formats without complete self-describing metadata, such as CSV, Text, and JSON, need |
128 | | // the FE-planned physical file slots to build their file-local schema and deserialize values. |
129 | | const std::vector<SlotDescriptor*>* file_slot_descs = nullptr; |
130 | | // Push-down aggregate type. |
131 | | const TPushAggOp::type push_down_agg_type = TPushAggOp::type::NONE; |
132 | | // Digest of stable pushed-down predicates. A zero digest disables condition cache. |
133 | | uint64_t condition_cache_digest = 0; |
134 | | }; |
135 | | |
136 | | struct SplitReadOptions { |
137 | | // Split-level information for reader initialization, which may include file path, partition values, delete file info, etc. The content is table format specific and opaque to table reader base class; it's the responsibility of the concrete table reader implementation to parse necessary information for reader initialization and filter pushdown. |
138 | | std::map<std::string, Field> partition_values; |
139 | | ShardedKVCache* cache; |
140 | | TFileRangeDesc current_range; |
141 | | FileFormat current_split_format = FileFormat::PARQUET; |
142 | | std::optional<GlobalRowIdContext> global_rowid_context; |
143 | | }; |
144 | | |
145 | | // Base class for table-level readers. |
146 | | // This layer owns common table-level orchestration, such as split iteration, dynamic partition |
147 | | // pruning, delete handling and conversion from file-local blocks to table-schema blocks. Concrete |
148 | | // table-format readers only need to provide format-specific hooks for opening readers and parsing |
149 | | // split metadata. |
150 | | class TableReader { |
151 | | public: |
152 | 565 | virtual ~TableReader() = default; |
153 | | |
154 | | // Initialize common runtime options for the table reader. Subclasses may call this from their |
155 | | // own init(options); table-format schema and split metadata are provided later per split. |
156 | | virtual Status init(TableReadOptions&& options); |
157 | | |
158 | | // FileScannerV2 adjusts this before each get_block() using an adaptive bytes-per-row estimate. |
159 | | // Store it here as well as forwarding to the current reader so newly opened split readers start |
160 | | // with the latest predicted batch size. |
161 | 1.11k | void set_batch_size(size_t batch_size) { |
162 | 1.11k | _batch_size = std::max<size_t>(1, batch_size); |
163 | 1.11k | if (_data_reader.reader != nullptr) { |
164 | 648 | _data_reader.reader->set_batch_size(_batch_size); |
165 | 648 | } |
166 | 1.11k | } |
167 | | |
168 | | // Prepare for reading a new split/task. |
169 | | // 1. Pass a new split/task to reader, which will be used in subsequent open_reader() to initialize the underlying file reader. |
170 | | // 2. Parse delete predicates from split/task information, which will be used for later dynamic filtering and delete handling. |
171 | | virtual Status prepare_split(const SplitReadOptions& options); |
172 | | |
173 | | // Public entry point for reading a table-schema block. The base class opens the current reader, |
174 | | // advances across EOF, and closes exhausted readers. Subclasses provide protected hooks for |
175 | | // table-format-specific behavior. |
176 | 1.20k | virtual Status get_block(Block* block, bool* eos) { |
177 | 1.20k | SCOPED_TIMER(_profile.exec_timer); |
178 | 1.20k | DORIS_CHECK(block->columns() == _projected_columns.size()); |
179 | 1.20k | block->clear_column_data(_projected_columns.size()); |
180 | | |
181 | 1.68k | while (true) { |
182 | 1.68k | if (*eos) { |
183 | 0 | return Status::OK(); |
184 | 0 | } |
185 | 1.68k | if (!_data_reader.reader) { |
186 | 1.01k | if (_is_table_level_count_active()) { |
187 | 6 | RETURN_IF_ERROR(_read_table_level_count(block, eos)); |
188 | 6 | return Status::OK(); |
189 | 6 | } |
190 | 1.01k | RETURN_IF_ERROR(create_next_reader(eos)); |
191 | 1.01k | if (!_data_reader.reader) { |
192 | 479 | DCHECK(*eos); |
193 | 479 | return Status::OK(); |
194 | 479 | } |
195 | 1.01k | } |
196 | | |
197 | | // Materialize a reduced row set for upper aggregate operators when aggregate |
198 | | // pushdown can be applied. This is not the final aggregate result: COUNT emits |
199 | | // `count` default rows for the upper COUNT(*), and MIN/MAX emits two rows containing |
200 | | // file-level min/max values for the upper MIN/MAX. |
201 | 1.19k | if (!_aggregate_pushdown_tried) { |
202 | 531 | SCOPED_TIMER(_profile.pushdown_agg_timer); |
203 | 531 | bool pushed_down = false; |
204 | 531 | RETURN_IF_ERROR(_try_materialize_aggregate_pushdown_rows(block, &pushed_down)); |
205 | 531 | if (pushed_down) { |
206 | 6 | return Status::OK(); |
207 | 6 | } |
208 | 531 | } |
209 | | |
210 | 1.19k | bool current_eof = false; |
211 | 1.19k | _data_reader.block_template.clear_column_data( |
212 | 1.19k | cast_set<int64_t>(_data_reader.file_block_layout.size())); |
213 | 1.19k | size_t current_rows = 0; |
214 | 1.19k | RETURN_IF_ERROR(_data_reader.reader->get_block(&_data_reader.block_template, |
215 | 1.19k | ¤t_rows, ¤t_eof)); |
216 | 1.19k | if (current_rows == 0) { |
217 | 477 | if (current_eof) { |
218 | 477 | _current_reader_reached_eof = true; |
219 | 477 | RETURN_IF_ERROR(close_current_reader()); |
220 | 477 | } |
221 | 477 | continue; |
222 | 477 | } |
223 | 1.19k | DCHECK_EQ(_data_reader.block_template.columns(), _data_reader.file_block_layout.size()) |
224 | 0 | << _data_reader.block_template.dump_structure(); |
225 | 713 | #ifndef NDEBUG |
226 | 713 | RETURN_IF_ERROR(_check_file_block_columns("after file reader get_block", current_rows)); |
227 | 713 | #endif |
228 | 713 | DORIS_CHECK(block->columns() == _data_reader.column_mapper->mappings().size()); |
229 | 713 | RETURN_IF_ERROR(finalize_chunk(block, current_rows)); |
230 | 713 | #ifndef NDEBUG |
231 | 713 | RETURN_IF_ERROR( |
232 | 713 | _check_table_block_columns("after finalize_chunk", block, current_rows)); |
233 | 713 | #endif |
234 | 713 | if (current_eof) { |
235 | 6 | _current_reader_reached_eof = true; |
236 | 6 | RETURN_IF_ERROR(close_current_reader()); |
237 | 6 | } |
238 | 713 | return Status::OK(); |
239 | 713 | } |
240 | 1.20k | } |
241 | | |
242 | | // Close the table reader and the currently active file reader. Subclasses that hold additional |
243 | | // table-format resources should override this and call TableReader::close() first. |
244 | 531 | virtual Status close() { |
245 | 531 | if (_data_reader.reader) { |
246 | 42 | RETURN_IF_ERROR(close_current_reader()); |
247 | 42 | } |
248 | 531 | _current_task.reset(); |
249 | 531 | _current_file_description.reset(); |
250 | 531 | _remaining_table_level_count = -1; |
251 | 531 | return Status::OK(); |
252 | 531 | } |
253 | | |
254 | 931 | int64_t condition_cache_hit_count() const { return _condition_cache_hit_count; } |
255 | | |
256 | | virtual std::string debug_string() const; |
257 | | |
258 | | virtual Status annotate_projected_column(const TFileScanSlotInfo& slot_info, |
259 | | ProjectedColumnBuildContext* context, |
260 | | ColumnDefinition* column) const; |
261 | | |
262 | 464 | virtual Status validate_projected_columns(const ProjectedColumnBuildContext& context) const { |
263 | 464 | (void)context; |
264 | 464 | return Status::OK(); |
265 | 464 | } |
266 | | |
267 | | protected: |
268 | | // Parse deletion vector information from table format specific file description. |
269 | | virtual Status _parse_deletion_vector_file(const TTableFormatFileDesc& t_desc, |
270 | 519 | DeleteFileDesc* desc, bool* has_delete_file) { |
271 | 519 | *has_delete_file = false; |
272 | 519 | return Status::OK(); |
273 | 519 | } |
274 | | |
275 | | // Advance to the next reader. This closes the current reader first and then opens the next |
276 | | // concrete reader. Subclasses should not duplicate this loop. |
277 | | Status create_next_reader(bool* eos); |
278 | | virtual Status create_file_reader(std::unique_ptr<FileReader>* reader); |
279 | 512 | virtual TableColumnMappingMode mapping_mode() const { return TableColumnMappingMode::BY_NAME; } |
280 | 531 | virtual Status annotate_file_schema(std::vector<ColumnDefinition>* file_schema) { |
281 | 531 | DORIS_CHECK(file_schema != nullptr); |
282 | 531 | return Status::OK(); |
283 | 531 | } |
284 | | |
285 | | // Open the concrete reader for the current split/task and build the file-local scan request. |
286 | 532 | virtual Status open_reader() { |
287 | 532 | SCOPED_TIMER(_profile.open_reader_timer); |
288 | | // 1. Get file schema and create column mapping. |
289 | 532 | std::vector<ColumnDefinition> file_schema; |
290 | 532 | RETURN_IF_ERROR(_data_reader.reader->get_schema(&file_schema)); |
291 | | // For Paimon/Hudi, FE can provide field ids through `history_schema_info`. Annotate the |
292 | | // file schema before column mapping when the table format maps columns by field id. |
293 | 532 | RETURN_IF_ERROR(annotate_file_schema(&file_schema)); |
294 | 532 | _data_reader.file_schema = file_schema; |
295 | 532 | _mapper_options.mode = mapping_mode(); |
296 | | |
297 | 532 | _data_reader.column_mapper = _data_reader.reader->create_column_mapper(_mapper_options); |
298 | 532 | DORIS_CHECK(_data_reader.column_mapper != nullptr); |
299 | 532 | RETURN_IF_ERROR(_data_reader.column_mapper->create_mapping(_projected_columns, |
300 | 532 | _partition_values, file_schema)); |
301 | 532 | DORIS_CHECK(_data_reader.column_mapper->mappings().size() == _projected_columns.size()); |
302 | | |
303 | | // 2. Build table filters based on conjuncts and column predicates. |
304 | 532 | RETURN_IF_ERROR(_build_table_filters_from_conjuncts()); |
305 | | |
306 | | // 3. Create file scan request based on column mapping and table filters, then open file |
307 | | // reader with the request. File scan request carries row-level expression filters and |
308 | | // file-level pruning hints. Only expression filters decide returned rows; column predicates |
309 | | // are pruning hints. |
310 | 532 | auto file_request = std::make_shared<FileScanRequest>(); |
311 | 532 | RETURN_IF_ERROR(_data_reader.column_mapper->create_scan_request( |
312 | 532 | _table_filters, _table_column_predicates, _projected_columns, file_request.get(), |
313 | 532 | _runtime_state)); |
314 | 532 | bool constant_filter_pruned_split = false; |
315 | 532 | RETURN_IF_ERROR(_evaluate_constant_filters(&constant_filter_pruned_split)); |
316 | 532 | if (constant_filter_pruned_split) { |
317 | 1 | RETURN_IF_ERROR(close_current_reader()); |
318 | 1 | return Status::OK(); |
319 | 1 | } |
320 | 531 | RETURN_IF_ERROR(customize_file_scan_request(file_request.get())); |
321 | 531 | RETURN_IF_ERROR(_open_local_filter_exprs(*file_request)); |
322 | 531 | _data_reader.file_block_layout.clear(); |
323 | 531 | _data_reader.block_template.clear(); |
324 | 531 | _data_reader.file_block_layout.resize(file_request->local_positions.size()); |
325 | | |
326 | | // 4. Build file block layout from file schema and column mapping. The layout describes |
327 | | // the block returned by file reader before table-column materialization. |
328 | 3.16k | for (const auto& [file_column_id, block_position] : file_request->local_positions) { |
329 | 3.16k | DORIS_CHECK(block_position.value() < _data_reader.file_block_layout.size()); |
330 | 3.16k | const auto* field = _find_column_definition(_data_reader.file_schema, file_column_id); |
331 | 3.16k | DORIS_CHECK(field != nullptr); |
332 | | |
333 | 3.16k | ColumnDefinition projected_field; |
334 | 3.16k | { |
335 | 3.16k | auto it = std::find_if( |
336 | 3.16k | file_request->non_predicate_columns.begin(), |
337 | 3.16k | file_request->non_predicate_columns.end(), |
338 | 22.6k | [&](const LocalColumnIndex& p) { return p.column_id() == file_column_id; }); |
339 | 3.16k | if (it != file_request->non_predicate_columns.end()) { |
340 | 3.13k | RETURN_IF_ERROR(project_column_definition(*field, *it, &projected_field)); |
341 | 3.13k | } |
342 | 3.16k | } |
343 | 3.16k | { |
344 | 3.16k | auto it = std::find_if( |
345 | 3.16k | file_request->predicate_columns.begin(), |
346 | 3.16k | file_request->predicate_columns.end(), |
347 | 3.16k | [&](const LocalColumnIndex& p) { return p.column_id() == file_column_id; }); |
348 | 3.16k | if (it != file_request->predicate_columns.end()) { |
349 | 31 | RETURN_IF_ERROR(project_column_definition(*field, *it, &projected_field)); |
350 | 31 | } |
351 | 3.16k | } |
352 | 3.16k | _data_reader.file_block_layout[block_position.value()] = { |
353 | 3.16k | .file_column_id = file_column_id, |
354 | 3.16k | .name = projected_field.name, |
355 | 3.16k | .type = projected_field.type, |
356 | 3.16k | }; |
357 | 3.16k | DORIS_CHECK(_data_reader.file_block_layout[block_position.value()].type != nullptr); |
358 | 3.16k | } |
359 | | |
360 | | // 5. Prepare block template from file block layout. The block template stores the block |
361 | | // returned by file reader before table-column materialization. |
362 | 531 | _data_reader.block_template.reserve(_data_reader.file_block_layout.size()); |
363 | 3.16k | for (const auto& column : _data_reader.file_block_layout) { |
364 | 3.16k | _data_reader.block_template.insert( |
365 | 3.16k | {column.type->create_column(), column.type, column.name}); |
366 | 3.16k | } |
367 | 531 | if (VLOG_DEBUG_IS_ON) { |
368 | 0 | VLOG_DEBUG << "TableReader debug: " << debug_string(); |
369 | 0 | } |
370 | 531 | RETURN_IF_ERROR(_open_mapping_exprs()); |
371 | 531 | RETURN_IF_ERROR(_data_reader.reader->open(file_request)); |
372 | 531 | RETURN_IF_ERROR(_init_reader_condition_cache(*file_request)); |
373 | 531 | return Status::OK(); |
374 | 531 | } |
375 | | |
376 | | Status _build_table_filters_from_conjuncts(); |
377 | | Status _open_local_filter_exprs(const FileScanRequest& file_request); |
378 | | Status _init_reader_condition_cache(const FileScanRequest& file_request); |
379 | | void _finalize_reader_condition_cache(); |
380 | | bool _should_enable_condition_cache(const FileScanRequest& file_request) const; |
381 | | |
382 | 532 | Status _evaluate_constant_filters(bool* can_filter_all) { |
383 | 532 | DORIS_CHECK(can_filter_all != nullptr); |
384 | 532 | *can_filter_all = false; |
385 | 532 | for (const auto& table_filter : _table_filters) { |
386 | 29 | if (table_filter.conjunct == nullptr || |
387 | | // RuntimeFilterExpr does not implement execute_column_impl(); it is evaluated by |
388 | | // the row-level filter path through execute_filter(). Constant split pruning uses |
389 | | // VExprContext::execute() on a one-row synthetic block, so runtime filters must not |
390 | | // be pre-executed here even when their referenced slot maps to a constant value. |
391 | 29 | table_filter.conjunct->root()->is_rf_wrapper() || |
392 | 29 | !_table_filter_has_only_constant_entries(table_filter)) { |
393 | 27 | continue; |
394 | 27 | } |
395 | 2 | Block eval_block; |
396 | 2 | RETURN_IF_ERROR(_build_constant_filter_block(table_filter, &eval_block)); |
397 | 2 | RowDescriptor row_desc; |
398 | 2 | RETURN_IF_ERROR(table_filter.conjunct->prepare(_runtime_state, row_desc)); |
399 | 2 | RETURN_IF_ERROR(table_filter.conjunct->open(_runtime_state)); |
400 | 2 | int result_column_id = -1; |
401 | 2 | RETURN_IF_ERROR(table_filter.conjunct->execute(&eval_block, &result_column_id)); |
402 | 2 | DORIS_CHECK(result_column_id >= 0); |
403 | 2 | if (_filter_result_filters_all(eval_block.get_by_position(result_column_id).column)) { |
404 | 1 | *can_filter_all = true; |
405 | 1 | return Status::OK(); |
406 | 1 | } |
407 | 2 | } |
408 | 531 | return Status::OK(); |
409 | 532 | } |
410 | | |
411 | 27 | bool _table_filter_has_only_constant_entries(const TableFilter& table_filter) const { |
412 | 27 | const auto& filter_entries = _data_reader.column_mapper->filter_entries(); |
413 | 27 | for (const auto global_index : table_filter.global_indices) { |
414 | 27 | const auto entry_it = filter_entries.find(global_index); |
415 | 27 | if (entry_it == filter_entries.end() || !entry_it->second.is_constant()) { |
416 | 25 | return false; |
417 | 25 | } |
418 | 27 | } |
419 | 2 | return !table_filter.global_indices.empty(); |
420 | 27 | } |
421 | | |
422 | 2 | Status _build_constant_filter_block(const TableFilter& table_filter, Block* eval_block) { |
423 | 2 | DORIS_CHECK(eval_block != nullptr); |
424 | 2 | eval_block->clear(); |
425 | 2 | const auto& mappings = _data_reader.column_mapper->mappings(); |
426 | 2 | const auto& filter_entries = _data_reader.column_mapper->filter_entries(); |
427 | 2 | DORIS_CHECK(mappings.size() == _projected_columns.size()); |
428 | 4 | for (size_t column_idx = 0; column_idx < mappings.size(); ++column_idx) { |
429 | 2 | const auto global_index = GlobalIndex(column_idx); |
430 | 2 | const auto& mapping = mappings[column_idx]; |
431 | 2 | const auto entry_it = filter_entries.find(global_index); |
432 | 2 | const bool referenced_by_filter = |
433 | 2 | std::find(table_filter.global_indices.begin(), |
434 | 2 | table_filter.global_indices.end(), |
435 | 2 | global_index) != table_filter.global_indices.end(); |
436 | 2 | if (referenced_by_filter && entry_it != filter_entries.end() && |
437 | 2 | entry_it->second.is_constant()) { |
438 | 2 | ColumnPtr constant_column; |
439 | 2 | RETURN_IF_ERROR(_materialize_constant_filter_column( |
440 | 2 | entry_it->second.constant_index(), &constant_column)); |
441 | 2 | eval_block->insert({std::move(constant_column), mapping.table_type, |
442 | 2 | mapping.table_column_name}); |
443 | 2 | } else { |
444 | 0 | eval_block->insert({mapping.table_type->create_column_const_with_default_value(1), |
445 | 0 | mapping.table_type, mapping.table_column_name}); |
446 | 0 | } |
447 | 2 | } |
448 | 2 | return Status::OK(); |
449 | 2 | } |
450 | | |
451 | 2 | Status _materialize_constant_filter_column(ConstantIndex constant_index, ColumnPtr* column) { |
452 | 2 | DORIS_CHECK(column != nullptr); |
453 | 2 | const auto& constant_entry = _data_reader.column_mapper->constant_map().get(constant_index); |
454 | 2 | DORIS_CHECK(constant_entry.expr != nullptr); |
455 | 2 | DORIS_CHECK(constant_entry.type != nullptr); |
456 | 2 | RowDescriptor row_desc; |
457 | 2 | RETURN_IF_ERROR(constant_entry.expr->prepare(_runtime_state, row_desc)); |
458 | 2 | RETURN_IF_ERROR(constant_entry.expr->open(_runtime_state)); |
459 | 2 | Block eval_block; |
460 | 2 | eval_block.insert({constant_entry.type->create_column_const_with_default_value(1), |
461 | 2 | constant_entry.type, "__table_reader_constant_filter"}); |
462 | 2 | int result_column_id = -1; |
463 | 2 | RETURN_IF_ERROR(constant_entry.expr->execute(&eval_block, &result_column_id)); |
464 | 2 | DORIS_CHECK(result_column_id >= 0); |
465 | 2 | *column = eval_block.get_by_position(result_column_id).column; |
466 | 2 | DORIS_CHECK((*column)->size() == 1); |
467 | 2 | return Status::OK(); |
468 | 2 | } |
469 | | |
470 | 2 | static bool _filter_result_filters_all(const ColumnPtr& filter_column) { |
471 | 2 | DORIS_CHECK(filter_column.get() != nullptr); |
472 | 2 | DORIS_CHECK(filter_column->size() == 1); |
473 | 2 | return !filter_column->get_bool(0); |
474 | 2 | } |
475 | | |
476 | 532 | virtual Status customize_file_scan_request(FileScanRequest* file_request) { |
477 | 532 | return _append_delete_predicate(file_request); |
478 | 532 | } |
479 | | |
480 | 1.59k | bool _is_table_level_count_active() const { return _remaining_table_level_count >= 0; } |
481 | | |
482 | 6 | Status _materialize_count_rows(size_t rows, Block* block) const { |
483 | 6 | DORIS_CHECK(block != nullptr); |
484 | 6 | DORIS_CHECK(block->columns() > 0 || rows == 0); |
485 | 12 | for (size_t column_idx = 0; column_idx < block->columns(); ++column_idx) { |
486 | 6 | auto column = block->get_by_position(column_idx).type->create_column(); |
487 | 6 | column->resize(rows); |
488 | 6 | block->replace_by_position(column_idx, std::move(column)); |
489 | 6 | } |
490 | 6 | return Status::OK(); |
491 | 6 | } |
492 | | |
493 | 6 | Status _read_table_level_count(Block* block, bool* eos) { |
494 | 6 | DORIS_CHECK(block != nullptr); |
495 | 6 | DORIS_CHECK(eos != nullptr); |
496 | 6 | DORIS_CHECK(_push_down_agg_type == TPushAggOp::type::COUNT); |
497 | 6 | DORIS_CHECK(_remaining_table_level_count >= 0); |
498 | 6 | if (_remaining_table_level_count == 0) { |
499 | 2 | _remaining_table_level_count = -1; |
500 | 2 | _current_task.reset(); |
501 | 2 | *eos = true; |
502 | 2 | return Status::OK(); |
503 | 2 | } |
504 | | |
505 | 4 | const int64_t batch_size = _runtime_state == nullptr |
506 | 4 | ? _remaining_table_level_count |
507 | 4 | : static_cast<int64_t>(_runtime_state->batch_size()); |
508 | 4 | const auto rows = std::min(_remaining_table_level_count, batch_size); |
509 | 4 | RETURN_IF_ERROR(_materialize_count_rows(cast_set<size_t>(rows), block)); |
510 | 4 | _remaining_table_level_count -= rows; |
511 | 4 | *eos = false; |
512 | 4 | return Status::OK(); |
513 | 4 | } |
514 | | |
515 | | void _append_file_scan_column(FileScanRequest* request, LocalColumnId column_id, |
516 | 24 | std::vector<LocalColumnIndex>* scan_columns) { |
517 | 24 | DORIS_CHECK(request != nullptr); |
518 | 24 | DORIS_CHECK(scan_columns != nullptr); |
519 | 24 | FileScanRequestBuilder builder(request); |
520 | 24 | Status status; |
521 | 24 | if (scan_columns == &request->predicate_columns) { |
522 | 13 | status = builder.add_predicate_column(column_id); |
523 | 13 | } else { |
524 | 11 | DORIS_CHECK(scan_columns == &request->non_predicate_columns); |
525 | 11 | status = builder.add_non_predicate_column(column_id); |
526 | 11 | } |
527 | 24 | DORIS_CHECK(status.ok()) << status.to_string(); |
528 | 24 | if (column_id == LocalColumnId(ROW_POSITION_COLUMN_ID) && |
529 | 24 | _find_column_definition(_data_reader.file_schema, column_id) == nullptr) { |
530 | 17 | _data_reader.file_schema.push_back(row_position_column_definition()); |
531 | 17 | } |
532 | 24 | } |
533 | | |
534 | | // Append DeletePredicate to file scan request if there are deletes. The predicate will be evaluated in file reader level and filter out deleted rows before returning data to table reader. |
535 | 532 | Status _append_delete_predicate(FileScanRequest* request) { |
536 | 532 | DORIS_CHECK(request != nullptr); |
537 | 532 | if (_delete_rows == nullptr || _delete_rows->empty()) { |
538 | 522 | return Status::OK(); |
539 | 522 | } |
540 | 10 | const auto row_position_column_id = LocalColumnId(ROW_POSITION_COLUMN_ID); |
541 | 10 | _append_file_scan_column(request, row_position_column_id, &request->predicate_columns); |
542 | | |
543 | 10 | auto delete_predicate = std::make_shared<DeletePredicate>(*_delete_rows); |
544 | 10 | const auto block_position = request->local_positions.at(row_position_column_id); |
545 | 10 | delete_predicate->add_child(VSlotRef::create_shared( |
546 | 10 | cast_set<int>(block_position.value()), cast_set<int>(block_position.value()), -1, |
547 | 10 | std::make_shared<DataTypeInt64>(), ROW_POSITION_COLUMN_NAME)); |
548 | | |
549 | 10 | request->delete_conjuncts.push_back( |
550 | 10 | VExprContext::create_shared(std::move(delete_predicate))); |
551 | 10 | return Status::OK(); |
552 | 532 | } |
553 | | |
554 | | // Close the current concrete reader. This hook is called by both create_next_reader() and |
555 | | // close(), so it should remain idempotent. |
556 | 532 | virtual Status close_current_reader() { |
557 | 532 | _finalize_reader_condition_cache(); |
558 | 532 | RETURN_IF_ERROR(_data_reader.reader->close()); |
559 | 532 | _data_reader.reader.reset(); |
560 | 532 | if (_data_reader.column_mapper != nullptr) { |
561 | 532 | _data_reader.column_mapper->clear(); |
562 | 532 | _data_reader.column_mapper.reset(); |
563 | 532 | } |
564 | 532 | _table_filters.clear(); |
565 | 532 | _data_reader.file_schema.clear(); |
566 | 532 | _data_reader.file_block_layout.clear(); |
567 | 532 | _data_reader.block_template.clear(); |
568 | 532 | _current_task.reset(); |
569 | 532 | _current_file_description.reset(); |
570 | 532 | _current_reader_reached_eof = false; |
571 | 532 | return Status::OK(); |
572 | 532 | } |
573 | | |
574 | | // Finalize file-local block to table/global schema block. |
575 | 713 | Status finalize_chunk(Block* block, const size_t rows) { |
576 | 713 | SCOPED_TIMER(_profile.finalize_timer); |
577 | 713 | size_t idx = 0; |
578 | 6.08k | for (const auto& mapping : _data_reader.column_mapper->mappings()) { |
579 | 6.08k | ColumnPtr column; |
580 | 6.08k | RETURN_IF_ERROR(_materialize_mapping_column(mapping, &_data_reader.block_template, rows, |
581 | 6.08k | &column)); |
582 | 6.08k | block->replace_by_position(idx, IColumn::mutate(std::move(column))); |
583 | 6.08k | idx++; |
584 | 6.08k | } |
585 | 713 | RETURN_IF_ERROR(materialize_virtual_columns(block)); |
586 | | // Enforce CHAR/VARCHAR length declared by the table schema after all file-to-table |
587 | | // materialization has finished. |
588 | 713 | RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block)); |
589 | 713 | return Status::OK(); |
590 | 713 | } |
591 | | |
592 | | // Materialize virtual columns in the table block, such as Iceberg _row_id and |
593 | | // _last_updated_sequence_number. This runs after normal column materialization so finalize |
594 | | // expressions can reference those virtual columns. |
595 | 694 | virtual Status materialize_virtual_columns(Block* table_block) { return Status::OK(); } |
596 | | |
597 | | #ifndef NDEBUG |
598 | 713 | Status _check_file_block_columns(std::string_view stage, size_t rows) { |
599 | 713 | DORIS_CHECK(_data_reader.block_template.columns() == _data_reader.file_block_layout.size()); |
600 | 6.79k | for (size_t idx = 0; idx < _data_reader.block_template.columns(); ++idx) { |
601 | 6.08k | const auto& file_block_column = _data_reader.file_block_layout[idx]; |
602 | 6.08k | const auto& column_with_type = _data_reader.block_template.get_by_position(idx); |
603 | 6.08k | const auto* column = column_with_type.column.get(); |
604 | 6.08k | try { |
605 | 6.08k | if (column == nullptr) { |
606 | 0 | auto st = Status::InternalError( |
607 | 0 | "Invalid file block column {} at {}: file_column_id={}, name='{}', " |
608 | 0 | "type={}, column=null, expected_rows={}, reader={}", |
609 | 0 | idx, stage, file_block_column.file_column_id.value(), |
610 | 0 | file_block_column.name, |
611 | 0 | file_block_column.type == nullptr ? "null" |
612 | 0 | : file_block_column.type->get_name(), |
613 | 0 | rows, debug_string()); |
614 | 0 | LOG(WARNING) << st; |
615 | 0 | return st; |
616 | 0 | } |
617 | 6.08k | column->sanity_check(); |
618 | 6.08k | auto st = column_with_type.check_type_and_column_match(); |
619 | 6.08k | if (!st.ok()) { |
620 | 0 | auto contextual_status = Status::InternalError( |
621 | 0 | "Invalid file block column {} at {}: file_column_id={}, name='{}', " |
622 | 0 | "type={}, column={}, column_size={}, expected_rows={}, error={}, " |
623 | 0 | "reader={}", |
624 | 0 | idx, stage, file_block_column.file_column_id.value(), |
625 | 0 | file_block_column.name, |
626 | 0 | file_block_column.type == nullptr ? "null" |
627 | 0 | : file_block_column.type->get_name(), |
628 | 0 | column->get_name(), column->size(), rows, st.to_string(), |
629 | 0 | debug_string()); |
630 | 0 | LOG(WARNING) << contextual_status; |
631 | 0 | return contextual_status; |
632 | 0 | } |
633 | 6.08k | } catch (const Exception& e) { |
634 | 0 | auto st = Status::InternalError( |
635 | 0 | "Invalid file block column {} at {}: file_column_id={}, name='{}', " |
636 | 0 | "type={}, column={}, column_size={}, expected_rows={}, error={}, " |
637 | 0 | "reader={}", |
638 | 0 | idx, stage, file_block_column.file_column_id.value(), |
639 | 0 | file_block_column.name, |
640 | 0 | file_block_column.type == nullptr ? "null" |
641 | 0 | : file_block_column.type->get_name(), |
642 | 0 | column == nullptr ? "null" : column->get_name(), |
643 | 0 | column == nullptr ? 0 : column->size(), rows, e.to_string(), |
644 | 0 | debug_string()); |
645 | 0 | LOG(WARNING) << st; |
646 | 0 | return st; |
647 | 0 | } catch (const std::exception& e) { |
648 | 0 | auto st = Status::InternalError( |
649 | 0 | "Invalid file block column {} at {}: file_column_id={}, name='{}', " |
650 | 0 | "type={}, column={}, column_size={}, expected_rows={}, error={}, " |
651 | 0 | "reader={}", |
652 | 0 | idx, stage, file_block_column.file_column_id.value(), |
653 | 0 | file_block_column.name, |
654 | 0 | file_block_column.type == nullptr ? "null" |
655 | 0 | : file_block_column.type->get_name(), |
656 | 0 | column == nullptr ? "null" : column->get_name(), |
657 | 0 | column == nullptr ? 0 : column->size(), rows, e.what(), debug_string()); |
658 | 0 | LOG(WARNING) << st; |
659 | 0 | return st; |
660 | 0 | } |
661 | 6.08k | } |
662 | 713 | return Status::OK(); |
663 | 713 | } |
664 | | |
665 | 713 | Status _check_table_block_columns(std::string_view stage, const Block* block, size_t rows) { |
666 | 713 | DORIS_CHECK(block != nullptr); |
667 | 713 | DORIS_CHECK(block->columns() == _data_reader.column_mapper->mappings().size()); |
668 | 6.79k | for (size_t idx = 0; idx < block->columns(); ++idx) { |
669 | 6.08k | const auto& mapping = _data_reader.column_mapper->mappings()[idx]; |
670 | 6.08k | const auto& column_with_type = block->get_by_position(idx); |
671 | 6.08k | const auto* column = column_with_type.column.get(); |
672 | 6.08k | try { |
673 | 6.08k | if (column == nullptr) { |
674 | 0 | auto st = Status::InternalError( |
675 | 0 | "Invalid table block column {} at {}: table_column='{}', " |
676 | 0 | "global_index={}, type={}, column=null, expected_rows={}, mapping={}", |
677 | 0 | idx, stage, mapping.table_column_name, mapping.global_index.value(), |
678 | 0 | mapping.table_type == nullptr ? "null" : mapping.table_type->get_name(), |
679 | 0 | rows, mapping.debug_string()); |
680 | 0 | LOG(WARNING) << st; |
681 | 0 | return st; |
682 | 0 | } |
683 | 6.08k | column->sanity_check(); |
684 | 6.08k | auto st = column_with_type.check_type_and_column_match(); |
685 | 6.08k | if (!st.ok()) { |
686 | 0 | auto contextual_status = Status::InternalError( |
687 | 0 | "Invalid table block column {} at {}: table_column='{}', " |
688 | 0 | "global_index={}, type={}, column={}, column_size={}, " |
689 | 0 | "expected_rows={}, error={}, mapping={}", |
690 | 0 | idx, stage, mapping.table_column_name, mapping.global_index.value(), |
691 | 0 | mapping.table_type == nullptr ? "null" : mapping.table_type->get_name(), |
692 | 0 | column->get_name(), column->size(), rows, st.to_string(), |
693 | 0 | mapping.debug_string()); |
694 | 0 | LOG(WARNING) << contextual_status; |
695 | 0 | return contextual_status; |
696 | 0 | } |
697 | 6.08k | } catch (const Exception& e) { |
698 | 0 | auto st = Status::InternalError( |
699 | 0 | "Invalid table block column {} at {}: table_column='{}', global_index={}, " |
700 | 0 | "type={}, column={}, column_size={}, expected_rows={}, error={}, " |
701 | 0 | "mapping={}", |
702 | 0 | idx, stage, mapping.table_column_name, mapping.global_index.value(), |
703 | 0 | mapping.table_type == nullptr ? "null" : mapping.table_type->get_name(), |
704 | 0 | column == nullptr ? "null" : column->get_name(), |
705 | 0 | column == nullptr ? 0 : column->size(), rows, e.to_string(), |
706 | 0 | mapping.debug_string()); |
707 | 0 | LOG(WARNING) << st; |
708 | 0 | return st; |
709 | 0 | } catch (const std::exception& e) { |
710 | 0 | auto st = Status::InternalError( |
711 | 0 | "Invalid table block column {} at {}: table_column='{}', global_index={}, " |
712 | 0 | "type={}, column={}, column_size={}, expected_rows={}, error={}, " |
713 | 0 | "mapping={}", |
714 | 0 | idx, stage, mapping.table_column_name, mapping.global_index.value(), |
715 | 0 | mapping.table_type == nullptr ? "null" : mapping.table_type->get_name(), |
716 | 0 | column == nullptr ? "null" : column->get_name(), |
717 | 0 | column == nullptr ? 0 : column->size(), rows, e.what(), |
718 | 0 | mapping.debug_string()); |
719 | 0 | LOG(WARNING) << st; |
720 | 0 | return st; |
721 | 0 | } |
722 | 6.08k | } |
723 | 713 | return Status::OK(); |
724 | 713 | } |
725 | | #endif |
726 | | |
727 | 713 | Status _truncate_char_or_varchar_columns(Block* block) { |
728 | 713 | DORIS_CHECK(block != nullptr); |
729 | 713 | if (_runtime_state == nullptr || |
730 | 713 | !_runtime_state->query_options().truncate_char_or_varchar_columns) { |
731 | 713 | return Status::OK(); |
732 | 713 | } |
733 | 0 | DORIS_CHECK(block->columns() == _data_reader.column_mapper->mappings().size()); |
734 | 0 | for (size_t idx = 0; idx < _data_reader.column_mapper->mappings().size(); ++idx) { |
735 | 0 | const auto& mapping = _data_reader.column_mapper->mappings()[idx]; |
736 | 0 | if (!_should_truncate_char_or_varchar_column(mapping)) { |
737 | 0 | continue; |
738 | 0 | } |
739 | 0 | const auto target_len = |
740 | 0 | assert_cast<const DataTypeString*>(remove_nullable(mapping.table_type).get()) |
741 | 0 | ->len(); |
742 | 0 | _truncate_char_or_varchar_column(block, idx, target_len); |
743 | 0 | } |
744 | 0 | return Status::OK(); |
745 | 713 | } |
746 | | |
747 | | // Return true when the table schema has a bounded CHAR/VARCHAR length that is stricter than |
748 | | // the file-side type. Examples: |
749 | | // - table VARCHAR(10), file VARCHAR(20): truncate to 10; |
750 | | // - table VARCHAR(10), file STRING: truncate to 10 because STRING has no declared bound; |
751 | | // - table STRING, any file type: no truncation because the target has no bound. |
752 | 5 | static bool _should_truncate_char_or_varchar_column(const ColumnMapping& mapping) { |
753 | 5 | if (mapping.table_type == nullptr) { |
754 | 0 | return false; |
755 | 0 | } |
756 | 5 | const auto table_type = remove_nullable(mapping.table_type); |
757 | 5 | const auto primitive_type = table_type->get_primitive_type(); |
758 | 5 | if (primitive_type != TYPE_VARCHAR && primitive_type != TYPE_CHAR) { |
759 | 1 | return false; |
760 | 1 | } |
761 | 4 | const auto target_len = assert_cast<const DataTypeString*>(table_type.get())->len(); |
762 | 4 | if (target_len <= 0) { |
763 | 0 | return false; |
764 | 0 | } |
765 | 4 | if (mapping.file_type == nullptr) { |
766 | 0 | return true; |
767 | 0 | } |
768 | 4 | const auto file_type = remove_nullable(mapping.file_type); |
769 | 4 | DORIS_CHECK(file_type != nullptr); |
770 | 4 | int file_len = -1; |
771 | 4 | if (file_type->get_primitive_type() == TYPE_VARCHAR || |
772 | 4 | file_type->get_primitive_type() == TYPE_CHAR || |
773 | 4 | file_type->get_primitive_type() == TYPE_STRING) { |
774 | 3 | file_len = assert_cast<const DataTypeString*>(file_type.get())->len(); |
775 | 3 | } |
776 | | |
777 | 4 | return file_len < 0 || target_len < file_len; |
778 | 4 | } |
779 | | |
780 | | // Truncate a materialized CHAR/VARCHAR column in place by reusing the vectorized substring |
781 | | // implementation: substring(column, 1, len). Nullable columns are unwrapped before substring |
782 | | // execution and wrapped back with the original null map afterward, because substring operates |
783 | | // on the nested string payload only. |
784 | 1 | static void _truncate_char_or_varchar_column(Block* block, size_t idx, int len) { |
785 | 1 | DORIS_CHECK(block != nullptr); |
786 | 1 | auto int_type = std::make_shared<DataTypeInt32>(); |
787 | 1 | const auto num_columns_without_result = cast_set<uint32_t>(block->columns()); |
788 | 1 | auto& target = block->get_by_position(idx); |
789 | 1 | const bool is_nullable = target.type->is_nullable(); |
790 | 1 | ColumnPtr input_column = target.column; |
791 | 1 | ColumnPtr null_map_column; |
792 | 1 | if (is_nullable) { |
793 | 1 | const auto* nullable_column = assert_cast<const ColumnNullable*>(target.column.get()); |
794 | 1 | input_column = nullable_column->get_nested_column_ptr(); |
795 | 1 | null_map_column = nullable_column->get_null_map_column_ptr(); |
796 | 1 | } |
797 | 1 | block->replace_by_position(idx, std::move(input_column)); |
798 | 1 | block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(1)), |
799 | 1 | int_type, "const 1"}); |
800 | 1 | block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(len)), |
801 | 1 | int_type, "const len"}); |
802 | 1 | block->insert({nullptr, std::make_shared<DataTypeString>(), "result"}); |
803 | | |
804 | 1 | ColumnNumbers temp_arguments(3); |
805 | 1 | temp_arguments[0] = cast_set<uint32_t>(idx); |
806 | 1 | temp_arguments[1] = num_columns_without_result; |
807 | 1 | temp_arguments[2] = num_columns_without_result + 1; |
808 | 1 | const uint32_t result_column_id = num_columns_without_result + 2; |
809 | 1 | SubstringUtil::substring_execute(*block, temp_arguments, result_column_id, block->rows()); |
810 | | |
811 | 1 | ColumnPtr result_column = block->get_by_position(result_column_id).column; |
812 | 1 | if (is_nullable) { |
813 | 1 | result_column = ColumnNullable::create(std::move(result_column), null_map_column); |
814 | 1 | } |
815 | 1 | block->replace_by_position(idx, std::move(result_column)); |
816 | 1 | block->erase_tail(num_columns_without_result); |
817 | 1 | } |
818 | | |
819 | 531 | Status _try_materialize_aggregate_pushdown_rows(Block* block, bool* pushed_down) { |
820 | 531 | DORIS_CHECK(block != nullptr); |
821 | 531 | DORIS_CHECK(pushed_down != nullptr); |
822 | 531 | *pushed_down = false; |
823 | 531 | block->clear_column_data(_projected_columns.size()); |
824 | 531 | _aggregate_pushdown_tried = true; |
825 | 531 | if (!_supports_aggregate_pushdown(_push_down_agg_type)) { |
826 | 524 | return Status::OK(); |
827 | 524 | } |
828 | | |
829 | 7 | FileAggregateRequest file_request; |
830 | 7 | RETURN_IF_ERROR(_build_file_aggregate_request(_push_down_agg_type, &file_request)); |
831 | 7 | FileAggregateResult file_result; |
832 | 7 | const auto status = _data_reader.reader->get_aggregate_result(file_request, &file_result); |
833 | 7 | if (status.is<ErrorCode::NOT_IMPLEMENTED_ERROR>()) { |
834 | 1 | return Status::OK(); |
835 | 1 | } |
836 | 6 | RETURN_IF_ERROR(status); |
837 | 6 | RETURN_IF_ERROR( |
838 | 6 | _materialize_aggregate_pushdown_rows(_push_down_agg_type, file_result, block)); |
839 | 6 | *pushed_down = true; |
840 | 6 | RETURN_IF_ERROR(close_current_reader()); |
841 | 6 | return Status::OK(); |
842 | 6 | } |
843 | | |
844 | 538 | virtual bool _supports_aggregate_pushdown(TPushAggOp::type agg_type) const { |
845 | | // Only COUNT and MIN/MAX can be push down. |
846 | 538 | if (agg_type != TPushAggOp::type::COUNT && agg_type != TPushAggOp::type::MINMAX) { |
847 | 515 | return false; |
848 | 515 | } |
849 | | // Only support aggregate pushdown when there is no delete, filter and column predicate, so |
850 | | // the reduced rows consumed by the upper aggregate remain semantically equivalent to a |
851 | | // normal scan. |
852 | 23 | if (_delete_rows != nullptr && !_delete_rows->empty()) { |
853 | 3 | return false; |
854 | 3 | } |
855 | 20 | if (!_table_filters.empty() || !_table_column_predicates.empty()) { |
856 | 3 | return false; |
857 | 3 | } |
858 | 17 | if (agg_type == TPushAggOp::type::COUNT) { |
859 | 5 | return true; |
860 | 5 | } |
861 | | // For MIN/MAX, only support direct file-to-table column mappings. The two emitted rows |
862 | | // must be enough for the upper MIN/MAX aggregate without evaluating default expressions or |
863 | | // virtual columns. |
864 | 14 | for (const auto& mapping : _data_reader.column_mapper->mappings()) { |
865 | 14 | if (!mapping.file_local_id.has_value() || |
866 | 14 | mapping.virtual_column_type != TableVirtualColumnType::INVALID || |
867 | 14 | mapping.default_expr != nullptr || mapping.file_type == nullptr || |
868 | 14 | mapping.table_type == nullptr) { |
869 | 1 | return false; |
870 | 1 | } |
871 | 13 | if (!_can_push_down_minmax_for_mapping(mapping)) { |
872 | 1 | return false; |
873 | 1 | } |
874 | 13 | } |
875 | 10 | return true; |
876 | 12 | } |
877 | | |
878 | 6.08k | static ColumnPtr _detach_column(ColumnPtr column) { |
879 | 6.08k | DORIS_CHECK(column.get() != nullptr); |
880 | 6.08k | return IColumn::mutate(std::move(column)); |
881 | 6.08k | } |
882 | | |
883 | 58 | static Status _align_column_nullability(ColumnPtr* column, const DataTypePtr& table_type) { |
884 | 58 | DORIS_CHECK(column != nullptr); |
885 | 58 | DORIS_CHECK(column->get() != nullptr); |
886 | 58 | DORIS_CHECK(table_type != nullptr); |
887 | | // Must return non-const column |
888 | 58 | *column = (*column)->convert_to_full_column_if_const(); |
889 | 58 | if (table_type->is_nullable()) { |
890 | 23 | const auto& nested_type = |
891 | 23 | assert_cast<const DataTypeNullable&>(*table_type).get_nested_type(); |
892 | 23 | if (!(*column)->is_nullable()) { |
893 | 2 | RETURN_IF_ERROR(_align_column_nullability(column, nested_type)); |
894 | 2 | *column = make_nullable(*column); |
895 | 2 | return Status::OK(); |
896 | 2 | } |
897 | 21 | const auto& nullable_column = assert_cast<const ColumnNullable&>(**column); |
898 | 21 | ColumnPtr nested_column = nullable_column.get_nested_column_ptr(); |
899 | 21 | RETURN_IF_ERROR(_align_column_nullability(&nested_column, nested_type)); |
900 | 21 | *column = ColumnNullable::create(nested_column, |
901 | 21 | nullable_column.get_null_map_column_ptr()); |
902 | 21 | return Status::OK(); |
903 | 21 | } |
904 | 35 | if ((*column)->is_nullable()) { |
905 | 0 | const auto& nullable_column = assert_cast<const ColumnNullable&>(**column); |
906 | 0 | if (nullable_column.has_null()) { |
907 | 0 | return Status::InternalError( |
908 | 0 | "Default expression produced NULL for non-nullable table column"); |
909 | 0 | } |
910 | 0 | ColumnPtr nested_column = nullable_column.get_nested_column_ptr(); |
911 | 0 | RETURN_IF_ERROR(_align_column_nullability(&nested_column, table_type)); |
912 | 0 | *column = nested_column; |
913 | 0 | return Status::OK(); |
914 | 0 | } |
915 | 35 | if (const auto* array_type = typeid_cast<const DataTypeArray*>(table_type.get())) { |
916 | 1 | const auto& array_column = assert_cast<const ColumnArray&>(**column); |
917 | 1 | ColumnPtr nested_column = array_column.get_data_ptr(); |
918 | 1 | RETURN_IF_ERROR( |
919 | 1 | _align_column_nullability(&nested_column, array_type->get_nested_type())); |
920 | 1 | *column = ColumnArray::create(nested_column, array_column.get_offsets_ptr()); |
921 | 1 | return Status::OK(); |
922 | 1 | } |
923 | 34 | if (const auto* map_type = typeid_cast<const DataTypeMap*>(table_type.get())) { |
924 | 0 | const auto& map_column = assert_cast<const ColumnMap&>(**column); |
925 | 0 | ColumnPtr key_column = map_column.get_keys_ptr(); |
926 | 0 | ColumnPtr value_column = map_column.get_values_ptr(); |
927 | 0 | RETURN_IF_ERROR(_align_column_nullability(&key_column, map_type->get_key_type())); |
928 | 0 | RETURN_IF_ERROR(_align_column_nullability(&value_column, map_type->get_value_type())); |
929 | 0 | *column = ColumnMap::create(key_column, value_column, map_column.get_offsets_ptr()); |
930 | 0 | return Status::OK(); |
931 | 0 | } |
932 | 34 | if (const auto* struct_type = typeid_cast<const DataTypeStruct*>(table_type.get())) { |
933 | 5 | const auto& struct_column = assert_cast<const ColumnStruct&>(**column); |
934 | 5 | Columns columns = struct_column.get_columns_copy(); |
935 | 5 | DORIS_CHECK(columns.size() == struct_type->get_elements().size()); |
936 | 16 | for (size_t i = 0; i < columns.size(); ++i) { |
937 | 11 | RETURN_IF_ERROR( |
938 | 11 | _align_column_nullability(&columns[i], struct_type->get_element(i))); |
939 | 11 | } |
940 | 5 | *column = ColumnStruct::create(columns); |
941 | 5 | return Status::OK(); |
942 | 5 | } |
943 | 29 | return Status::OK(); |
944 | 34 | } |
945 | | |
946 | | static Status _execute_default_expr_without_root_type_check( |
947 | | const VExprContextSPtr& default_expr, const Block* block, |
948 | 5 | ColumnWithTypeAndName* result_data) { |
949 | 5 | DORIS_CHECK(default_expr != nullptr); |
950 | 5 | DORIS_CHECK(block != nullptr); |
951 | 5 | DORIS_CHECK(result_data != nullptr); |
952 | 5 | ColumnPtr result_column; |
953 | 5 | Status st; |
954 | 5 | RETURN_IF_CATCH_EXCEPTION({ |
955 | 5 | st = default_expr->root()->execute_column_impl(default_expr.get(), block, nullptr, |
956 | 5 | block->rows(), result_column); |
957 | 5 | }); |
958 | 5 | RETURN_IF_ERROR(st); |
959 | 5 | DORIS_CHECK(result_column.get() != nullptr); |
960 | 5 | if (result_column->size() != block->rows()) { |
961 | 0 | return Status::InternalError( |
962 | 0 | "Default expr {} return column size {} not equal to expected size {}", |
963 | 0 | default_expr->expr_name(), result_column->size(), block->rows()); |
964 | 0 | } |
965 | 5 | result_data->column = result_column; |
966 | 5 | result_data->type = default_expr->execute_type(block); |
967 | 5 | result_data->name = default_expr->expr_name(); |
968 | 5 | return Status::OK(); |
969 | 5 | } |
970 | | |
971 | | Status _cast_column_to_type(ColumnPtr* column, const DataTypePtr& file_type, |
972 | | const DataTypePtr& table_type, |
973 | 0 | const std::string& column_name) const { |
974 | 0 | DORIS_CHECK(column != nullptr); |
975 | 0 | DORIS_CHECK(column->get() != nullptr); |
976 | 0 | DORIS_CHECK(file_type != nullptr); |
977 | 0 | DORIS_CHECK(table_type != nullptr); |
978 | 0 | if (file_type->equals(*table_type)) { |
979 | 0 | return Status::OK(); |
980 | 0 | } |
981 | | |
982 | 0 | DataTypePtr input_type = file_type; |
983 | 0 | if ((*column)->is_nullable() && !input_type->is_nullable()) { |
984 | 0 | input_type = make_nullable(input_type); |
985 | 0 | } |
986 | 0 | Block cast_block; |
987 | 0 | cast_block.insert({*column, input_type, column_name}); |
988 | 0 | auto slot_ref = VSlotRef::create_shared(0, 0, -1, input_type, column_name); |
989 | 0 | auto cast_expr = Cast::create_shared(table_type); |
990 | 0 | cast_expr->add_child(std::move(slot_ref)); |
991 | 0 | auto cast_ctx = VExprContext::create_shared(std::move(cast_expr)); |
992 | 0 | RowDescriptor row_desc; |
993 | 0 | RETURN_IF_ERROR(cast_ctx->prepare(_runtime_state, row_desc)); |
994 | 0 | RETURN_IF_ERROR(cast_ctx->open(_runtime_state)); |
995 | 0 | ColumnPtr cast_column; |
996 | 0 | RETURN_IF_ERROR(cast_ctx->execute(&cast_block, cast_column)); |
997 | 0 | *column = std::move(cast_column); |
998 | 0 | return Status::OK(); |
999 | 0 | } |
1000 | | |
1001 | | Status _materialize_present_child_mapping_column(const ColumnMapping& mapping, |
1002 | | const ColumnPtr& file_column, |
1003 | 18 | const size_t rows, ColumnPtr* column) { |
1004 | 18 | DORIS_CHECK(column != nullptr); |
1005 | 18 | DORIS_CHECK(mapping.file_type != nullptr); |
1006 | 18 | DORIS_CHECK(mapping.table_type != nullptr); |
1007 | 18 | *column = file_column; |
1008 | 18 | if (!mapping.is_trivial) { |
1009 | 5 | if (!mapping.child_mappings.empty()) { |
1010 | 5 | RETURN_IF_ERROR( |
1011 | 5 | _materialize_complex_mapping_column(mapping, *column, rows, column)); |
1012 | 5 | } else { |
1013 | 0 | RETURN_IF_ERROR(_cast_column_to_type(column, mapping.file_type, mapping.table_type, |
1014 | 0 | mapping.file_column_name)); |
1015 | 0 | } |
1016 | 5 | } |
1017 | 18 | RETURN_IF_ERROR(_align_column_nullability(column, mapping.table_type)); |
1018 | 18 | return Status::OK(); |
1019 | 18 | } |
1020 | | |
1021 | | Status _materialize_mapping_column(const ColumnMapping& mapping, Block* current_block, |
1022 | 6.08k | const size_t rows, ColumnPtr* column) { |
1023 | 6.08k | if (!mapping.is_trivial && mapping.file_local_id.has_value() && |
1024 | 6.08k | !mapping.child_mappings.empty()) { |
1025 | 5 | DCHECK(mapping.projection != nullptr); |
1026 | 5 | int res_id; |
1027 | 5 | auto st = mapping.projection->execute(current_block, &res_id); |
1028 | 5 | if (!st.ok()) { |
1029 | 0 | return Status::InternalError( |
1030 | 0 | "Failed to execute complex mapping projection for table column '{}' " |
1031 | 0 | "(global_index={}, file_local_id={}, rows={}): {}, mapping={}", |
1032 | 0 | mapping.table_column_name, mapping.global_index.value(), |
1033 | 0 | *mapping.file_local_id, rows, st.to_string(), mapping.debug_string()); |
1034 | 0 | } |
1035 | 5 | ColumnPtr result_column = current_block->get_by_position(res_id).column; |
1036 | 5 | RETURN_IF_ERROR( |
1037 | 5 | _materialize_complex_mapping_column(mapping, result_column, rows, column)); |
1038 | 5 | return Status::OK(); |
1039 | 5 | } |
1040 | 6.08k | if (mapping.projection != nullptr) { |
1041 | 6.06k | int res_id; |
1042 | 6.06k | auto st = mapping.projection->execute(current_block, &res_id); |
1043 | 6.06k | if (!st.ok()) { |
1044 | 0 | std::string file_local_id = "null"; |
1045 | 0 | if (mapping.file_local_id.has_value()) { |
1046 | 0 | file_local_id = std::to_string(*mapping.file_local_id); |
1047 | 0 | } |
1048 | 0 | return Status::InternalError( |
1049 | 0 | "Failed to execute mapping projection for table column '{}' " |
1050 | 0 | "(global_index={}, file_local_id={}, rows={}): {}, mapping={}", |
1051 | 0 | mapping.table_column_name, mapping.global_index.value(), file_local_id, |
1052 | 0 | rows, st.to_string(), mapping.debug_string()); |
1053 | 0 | } |
1054 | 6.06k | ColumnPtr result_column = current_block->get_by_position(res_id).column; |
1055 | 6.06k | *column = _detach_column(std::move(result_column)); |
1056 | 6.06k | return Status::OK(); |
1057 | 6.06k | } |
1058 | 18 | if (mapping.default_expr != nullptr) { |
1059 | 5 | if (current_block->rows() == rows) { |
1060 | 0 | ColumnWithTypeAndName result; |
1061 | 0 | RETURN_IF_ERROR(_execute_default_expr_without_root_type_check( |
1062 | 0 | mapping.default_expr, current_block, &result)); |
1063 | 0 | ColumnPtr result_column = result.column; |
1064 | 0 | RETURN_IF_ERROR(_align_column_nullability(&result_column, mapping.table_type)); |
1065 | 0 | *column = _detach_column(std::move(result_column)); |
1066 | 5 | } else { |
1067 | 5 | DORIS_CHECK(mapping.constant_index.has_value()); |
1068 | 5 | Block eval_block; |
1069 | 5 | eval_block.insert({mapping.table_type->create_column_const_with_default_value(rows), |
1070 | 5 | mapping.table_type, "__table_reader_const_rows"}); |
1071 | 5 | ColumnWithTypeAndName result; |
1072 | 5 | RETURN_IF_ERROR(_execute_default_expr_without_root_type_check( |
1073 | 5 | mapping.default_expr, &eval_block, &result)); |
1074 | 5 | ColumnPtr result_column = result.column; |
1075 | 5 | RETURN_IF_ERROR(_align_column_nullability(&result_column, mapping.table_type)); |
1076 | 5 | *column = _detach_column(std::move(result_column)); |
1077 | 5 | } |
1078 | 5 | return Status::OK(); |
1079 | 5 | } |
1080 | 13 | ColumnPtr result_column = mapping.table_type->create_column_const_with_default_value(rows); |
1081 | 13 | *column = _detach_column(std::move(result_column)); |
1082 | 13 | return Status::OK(); |
1083 | 18 | } |
1084 | | |
1085 | | Status _materialize_complex_mapping_column(const ColumnMapping& mapping, |
1086 | | const ColumnPtr& file_column, const size_t rows, |
1087 | 10 | ColumnPtr* column) { |
1088 | 10 | DORIS_CHECK(mapping.table_type != nullptr); |
1089 | 10 | DORIS_CHECK(file_column.get() != nullptr); |
1090 | 10 | const auto table_type = remove_nullable(mapping.table_type); |
1091 | 10 | switch (table_type->get_primitive_type()) { |
1092 | 7 | case TYPE_STRUCT: |
1093 | 7 | RETURN_IF_ERROR(_materialize_struct_mapping_column(mapping, file_column, rows, column)); |
1094 | 7 | break; |
1095 | 7 | case TYPE_ARRAY: |
1096 | 2 | RETURN_IF_ERROR(_materialize_array_mapping_column(mapping, file_column, rows, column)); |
1097 | 2 | break; |
1098 | 2 | case TYPE_MAP: |
1099 | 1 | RETURN_IF_ERROR(_materialize_map_mapping_column(mapping, file_column, rows, column)); |
1100 | 1 | break; |
1101 | 1 | default: |
1102 | 0 | *column = _detach_column(file_column); |
1103 | 0 | break; |
1104 | 10 | } |
1105 | 10 | return Status::OK(); |
1106 | 10 | } |
1107 | | |
1108 | | static std::vector<const ColumnMapping*> _present_child_mappings_in_file_order( |
1109 | 7 | const std::vector<ColumnMapping>& child_mappings) { |
1110 | 7 | std::vector<const ColumnMapping*> result; |
1111 | 7 | result.reserve(child_mappings.size()); |
1112 | 16 | for (const auto& child_mapping : child_mappings) { |
1113 | 16 | if (child_mapping.file_local_id.has_value()) { |
1114 | 11 | result.push_back(&child_mapping); |
1115 | 11 | } |
1116 | 16 | } |
1117 | 7 | std::ranges::sort(result, [](const ColumnMapping* lhs, const ColumnMapping* rhs) { |
1118 | 6 | DORIS_CHECK(lhs->file_local_id.has_value()); |
1119 | 6 | DORIS_CHECK(rhs->file_local_id.has_value()); |
1120 | 6 | return *lhs->file_local_id < *rhs->file_local_id; |
1121 | 6 | }); |
1122 | 7 | return result; |
1123 | 7 | } |
1124 | | |
1125 | | static size_t _file_child_ordinal_for_mapping( |
1126 | | const ColumnMapping& mapping, const ColumnMapping& child_mapping, |
1127 | 11 | const std::vector<const ColumnMapping*>& file_ordered_children) { |
1128 | 11 | DORIS_CHECK(child_mapping.file_local_id.has_value()); |
1129 | 11 | if (!mapping.projected_file_children.empty()) { |
1130 | 7 | const auto child_it = std::ranges::find_if( |
1131 | 10 | mapping.projected_file_children, [&](const ColumnDefinition& file_child) { |
1132 | 10 | return file_child.file_local_id() == *child_mapping.file_local_id; |
1133 | 10 | }); |
1134 | 7 | DORIS_CHECK(child_it != mapping.projected_file_children.end()); |
1135 | 7 | return static_cast<size_t>( |
1136 | 7 | std::distance(mapping.projected_file_children.begin(), child_it)); |
1137 | 7 | } |
1138 | 4 | const auto child_it = std::ranges::find(file_ordered_children, &child_mapping); |
1139 | 4 | DORIS_CHECK(child_it != file_ordered_children.end()); |
1140 | 4 | return static_cast<size_t>(std::distance(file_ordered_children.begin(), child_it)); |
1141 | 11 | } |
1142 | | |
1143 | | static std::vector<const ColumnMapping*> _child_mappings_in_table_type_order( |
1144 | 7 | const ColumnMapping& mapping, const DataTypeStruct& table_type) { |
1145 | 7 | std::vector<const ColumnMapping*> result; |
1146 | 7 | result.reserve(mapping.child_mappings.size()); |
1147 | 23 | for (size_t child_idx = 0; child_idx < table_type.get_elements().size(); ++child_idx) { |
1148 | 16 | const auto& child_name = table_type.get_element_name(child_idx); |
1149 | 16 | const auto child_it = std::ranges::find_if( |
1150 | 28 | mapping.child_mappings, [&](const ColumnMapping& child_mapping) { |
1151 | 28 | return child_mapping.table_column_name == child_name; |
1152 | 28 | }); |
1153 | 16 | DORIS_CHECK(child_it != mapping.child_mappings.end()) |
1154 | 0 | << mapping.debug_string() << ", table_child_name=" << child_name; |
1155 | 16 | result.push_back(&*child_it); |
1156 | 16 | } |
1157 | 7 | return result; |
1158 | 7 | } |
1159 | | |
1160 | | static const IColumn* _nested_column_if_nullable(const ColumnPtr& column, |
1161 | 12 | const NullMap** null_map) { |
1162 | 12 | DORIS_CHECK(column.get() != nullptr); |
1163 | 12 | if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) { |
1164 | 8 | if (null_map != nullptr) { |
1165 | 8 | *null_map = &nullable_column->get_null_map_data(); |
1166 | 8 | } |
1167 | 8 | return &nullable_column->get_nested_column(); |
1168 | 8 | } |
1169 | 4 | return column.get(); |
1170 | 12 | } |
1171 | | |
1172 | | Status _materialize_struct_mapping_column(const ColumnMapping& mapping, |
1173 | | const ColumnPtr& file_column, const size_t rows, |
1174 | 7 | ColumnPtr* column) { |
1175 | 7 | DORIS_CHECK(mapping.table_type != nullptr); |
1176 | 7 | const auto* table_type = |
1177 | 7 | assert_cast<const DataTypeStruct*>(remove_nullable(mapping.table_type).get()); |
1178 | 7 | const auto full_file_column = file_column->convert_to_full_column_if_const(); |
1179 | 7 | const NullMap* parent_null_map = nullptr; |
1180 | 7 | const auto* nested_file_column = |
1181 | 7 | _nested_column_if_nullable(full_file_column, &parent_null_map); |
1182 | 7 | const auto* file_struct = assert_cast<const ColumnStruct*>(nested_file_column); |
1183 | 7 | DORIS_CHECK(table_type->get_elements().size() == mapping.child_mappings.size()); |
1184 | | |
1185 | 7 | Columns child_columns; |
1186 | 7 | child_columns.reserve(mapping.child_mappings.size()); |
1187 | 7 | const auto file_ordered_children = |
1188 | 7 | _present_child_mappings_in_file_order(mapping.child_mappings); |
1189 | 7 | const auto table_ordered_children = |
1190 | 7 | _child_mappings_in_table_type_order(mapping, *table_type); |
1191 | 16 | for (const auto* child_mapping : table_ordered_children) { |
1192 | 16 | DORIS_CHECK(child_mapping != nullptr); |
1193 | 16 | if (!child_mapping->file_local_id.has_value()) { |
1194 | 5 | child_columns.push_back( |
1195 | 5 | child_mapping->table_type->create_column_const_with_default_value(rows) |
1196 | 5 | ->convert_to_full_column_if_const()); |
1197 | 5 | continue; |
1198 | 5 | } |
1199 | 11 | const auto file_child_idx = |
1200 | 11 | _file_child_ordinal_for_mapping(mapping, *child_mapping, file_ordered_children); |
1201 | 11 | DORIS_CHECK(file_child_idx < file_struct->get_columns().size()); |
1202 | 11 | ColumnPtr child_column = file_struct->get_column_ptr(file_child_idx); |
1203 | 11 | RETURN_IF_ERROR(_materialize_present_child_mapping_column(*child_mapping, child_column, |
1204 | 11 | rows, &child_column)); |
1205 | 11 | child_columns.push_back(std::move(child_column)); |
1206 | 11 | } |
1207 | 7 | MutableColumns mutable_child_columns; |
1208 | 7 | mutable_child_columns.reserve(child_columns.size()); |
1209 | 16 | for (auto& child_column : child_columns) { |
1210 | 16 | mutable_child_columns.push_back(IColumn::mutate(std::move(child_column))); |
1211 | 16 | } |
1212 | 7 | auto result = ColumnStruct::create(std::move(mutable_child_columns)); |
1213 | 7 | if (mapping.table_type->is_nullable()) { |
1214 | 5 | auto null_map = ColumnUInt8::create(); |
1215 | 5 | auto& null_map_data = null_map->get_data(); |
1216 | 5 | null_map_data.resize(rows); |
1217 | 5 | if (parent_null_map != nullptr) { |
1218 | 5 | DORIS_CHECK(parent_null_map->size() == rows); |
1219 | 5 | null_map_data.assign(parent_null_map->begin(), parent_null_map->end()); |
1220 | 5 | } else { |
1221 | 0 | std::fill(null_map_data.begin(), null_map_data.end(), 0); |
1222 | 0 | } |
1223 | 5 | *column = ColumnNullable::create(std::move(result), std::move(null_map)); |
1224 | 5 | } else { |
1225 | 2 | *column = std::move(result); |
1226 | 2 | } |
1227 | 7 | return Status::OK(); |
1228 | 7 | } |
1229 | | |
1230 | | Status _materialize_array_mapping_column(const ColumnMapping& mapping, |
1231 | | const ColumnPtr& file_column, const size_t rows, |
1232 | 2 | ColumnPtr* column) { |
1233 | 2 | DORIS_CHECK(mapping.child_mappings.size() == 1); |
1234 | 2 | const auto full_file_column = file_column->convert_to_full_column_if_const(); |
1235 | 2 | const NullMap* parent_null_map = nullptr; |
1236 | 2 | const auto* nested_file_column = |
1237 | 2 | _nested_column_if_nullable(full_file_column, &parent_null_map); |
1238 | 2 | const auto* file_array = assert_cast<const ColumnArray*>(nested_file_column); |
1239 | 2 | ColumnPtr nested_column = file_array->get_data_ptr(); |
1240 | 2 | const auto& element_mapping = mapping.child_mappings[0]; |
1241 | 2 | RETURN_IF_ERROR(_materialize_present_child_mapping_column( |
1242 | 2 | element_mapping, nested_column, nested_column->size(), &nested_column)); |
1243 | 2 | auto offsets_column = file_array->get_offsets_ptr()->convert_to_full_column_if_const(); |
1244 | 2 | auto result = ColumnArray::create(IColumn::mutate(std::move(nested_column)), |
1245 | 2 | IColumn::mutate(std::move(offsets_column))); |
1246 | 2 | if (mapping.table_type->is_nullable()) { |
1247 | 2 | auto null_map = ColumnUInt8::create(); |
1248 | 2 | auto& null_map_data = null_map->get_data(); |
1249 | 2 | null_map_data.resize(rows); |
1250 | 2 | if (parent_null_map != nullptr) { |
1251 | 2 | DORIS_CHECK(parent_null_map->size() == rows); |
1252 | 2 | null_map_data.assign(parent_null_map->begin(), parent_null_map->end()); |
1253 | 2 | } else { |
1254 | 0 | std::fill(null_map_data.begin(), null_map_data.end(), 0); |
1255 | 0 | } |
1256 | 2 | *column = ColumnNullable::create(std::move(result), std::move(null_map)); |
1257 | 2 | } else { |
1258 | 0 | *column = std::move(result); |
1259 | 0 | } |
1260 | 2 | return Status::OK(); |
1261 | 2 | } |
1262 | | |
1263 | | Status _materialize_map_mapping_column(const ColumnMapping& mapping, |
1264 | | const ColumnPtr& file_column, const size_t rows, |
1265 | 3 | ColumnPtr* column) { |
1266 | 3 | const auto full_file_column = file_column->convert_to_full_column_if_const(); |
1267 | 3 | const NullMap* parent_null_map = nullptr; |
1268 | 3 | const auto* nested_file_column = |
1269 | 3 | _nested_column_if_nullable(full_file_column, &parent_null_map); |
1270 | 3 | const auto* file_map = assert_cast<const ColumnMap*>(nested_file_column); |
1271 | 3 | ColumnPtr key_column = file_map->get_keys_ptr(); |
1272 | 3 | ColumnPtr value_column = file_map->get_values_ptr(); |
1273 | | |
1274 | 3 | const ColumnMapping* key_mapping = nullptr; |
1275 | 3 | const ColumnMapping* value_mapping = nullptr; |
1276 | 5 | for (const auto& child_mapping : mapping.child_mappings) { |
1277 | 5 | if (!child_mapping.file_local_id.has_value()) { |
1278 | 0 | continue; |
1279 | 0 | } |
1280 | 5 | if (*child_mapping.file_local_id == 0) { |
1281 | 2 | key_mapping = &child_mapping; |
1282 | 3 | } else if (*child_mapping.file_local_id == 1) { |
1283 | 3 | value_mapping = &child_mapping; |
1284 | 3 | } |
1285 | 5 | } |
1286 | | |
1287 | 3 | if (key_mapping != nullptr) { |
1288 | 2 | RETURN_IF_ERROR(_materialize_present_child_mapping_column( |
1289 | 2 | *key_mapping, key_column, key_column->size(), &key_column)); |
1290 | 2 | } |
1291 | 3 | if (value_mapping != nullptr) { |
1292 | 3 | RETURN_IF_ERROR(_materialize_present_child_mapping_column( |
1293 | 3 | *value_mapping, value_column, value_column->size(), &value_column)); |
1294 | 3 | } |
1295 | 3 | auto offsets_column = file_map->get_offsets_ptr()->convert_to_full_column_if_const(); |
1296 | 3 | auto result = ColumnMap::create(IColumn::mutate(std::move(key_column)), |
1297 | 3 | IColumn::mutate(std::move(value_column)), |
1298 | 3 | IColumn::mutate(std::move(offsets_column))); |
1299 | 3 | if (mapping.table_type->is_nullable()) { |
1300 | 1 | auto null_map = ColumnUInt8::create(); |
1301 | 1 | auto& null_map_data = null_map->get_data(); |
1302 | 1 | null_map_data.resize(rows); |
1303 | 1 | if (parent_null_map != nullptr) { |
1304 | 1 | DORIS_CHECK(parent_null_map->size() == rows); |
1305 | 1 | null_map_data.assign(parent_null_map->begin(), parent_null_map->end()); |
1306 | 1 | } else { |
1307 | 0 | std::fill(null_map_data.begin(), null_map_data.end(), 0); |
1308 | 0 | } |
1309 | 1 | *column = ColumnNullable::create(std::move(result), std::move(null_map)); |
1310 | 2 | } else { |
1311 | 2 | *column = std::move(result); |
1312 | 2 | } |
1313 | 3 | return Status::OK(); |
1314 | 3 | } |
1315 | | |
1316 | 530 | Status _open_mapping_exprs() { |
1317 | 530 | RowDescriptor row_desc; |
1318 | 3.16k | for (const auto& mapping : _data_reader.column_mapper->mappings()) { |
1319 | 3.16k | if (mapping.projection != nullptr) { |
1320 | 3.14k | RETURN_IF_ERROR(mapping.projection->prepare(_runtime_state, row_desc)); |
1321 | 3.14k | RETURN_IF_ERROR(mapping.projection->open(_runtime_state)); |
1322 | 3.14k | } |
1323 | 3.16k | if (mapping.default_expr != nullptr) { |
1324 | 5 | RETURN_IF_ERROR(mapping.default_expr->prepare(_runtime_state, row_desc)); |
1325 | 5 | RETURN_IF_ERROR(mapping.default_expr->open(_runtime_state)); |
1326 | 5 | } |
1327 | 3.16k | } |
1328 | 530 | return Status::OK(); |
1329 | 530 | } |
1330 | | |
1331 | | Status _build_file_aggregate_request(TPushAggOp::type agg_type, |
1332 | 7 | FileAggregateRequest* request) const { |
1333 | 7 | DORIS_CHECK(request != nullptr); |
1334 | 7 | DORIS_CHECK(_supports_aggregate_pushdown(agg_type)); |
1335 | 7 | request->agg_type = agg_type; |
1336 | 7 | request->columns.clear(); |
1337 | 7 | if (agg_type == TPushAggOp::type::COUNT) { |
1338 | | // COUNT pushdown historically meant COUNT(*) and therefore carried no columns. For |
1339 | | // complex COUNT(col), materializing the full MAP/LIST/STRUCT value only to test the |
1340 | | // top-level NULL bit can be extremely expensive. When the scan projects exactly one |
1341 | | // directly-mapped complex column, pass that file column to the reader so formats such |
1342 | | // as Parquet can count the column shape from metadata/levels without decoding payload |
1343 | | // values like MAP value strings. Other COUNT cases stay on the existing row-count path |
1344 | | // to avoid changing count(*) semantics. |
1345 | 2 | if (_data_reader.column_mapper->mappings().size() == 1) { |
1346 | 2 | const auto& mapping = _data_reader.column_mapper->mappings()[0]; |
1347 | 2 | if (mapping.file_local_id.has_value() && mapping.file_type != nullptr && |
1348 | 2 | is_complex_type(remove_nullable(mapping.file_type)->get_primitive_type()) && |
1349 | 2 | mapping.virtual_column_type == TableVirtualColumnType::INVALID && |
1350 | 2 | mapping.default_expr == nullptr) { |
1351 | 0 | FileAggregateRequest::Column column; |
1352 | 0 | column.projection = |
1353 | 0 | LocalColumnIndex::top_level(LocalColumnId(*mapping.file_local_id)); |
1354 | 0 | request->columns.push_back(std::move(column)); |
1355 | 0 | } |
1356 | 2 | } |
1357 | 2 | return Status::OK(); |
1358 | 2 | } |
1359 | 5 | request->columns.reserve(_data_reader.column_mapper->mappings().size()); |
1360 | 6 | for (const auto& mapping : _data_reader.column_mapper->mappings()) { |
1361 | 6 | DORIS_CHECK(mapping.file_local_id.has_value()); |
1362 | 6 | FileAggregateRequest::Column column; |
1363 | 6 | column.projection = LocalColumnIndex::top_level(LocalColumnId(*mapping.file_local_id)); |
1364 | 6 | if (!mapping.child_mappings.empty()) { |
1365 | 1 | RETURN_IF_ERROR(build_aggregate_projection(mapping, &column.projection)); |
1366 | 1 | } |
1367 | 6 | request->columns.push_back(std::move(column)); |
1368 | 6 | } |
1369 | 5 | return Status::OK(); |
1370 | 5 | } |
1371 | | |
1372 | | Status _materialize_aggregate_pushdown_rows(TPushAggOp::type agg_type, |
1373 | | const FileAggregateResult& file_result, |
1374 | 6 | Block* block) { |
1375 | 6 | if (agg_type == TPushAggOp::type::COUNT) { |
1376 | | // COUNT pushdown is not a final count value. It emits `count` default rows so the |
1377 | | // upper COUNT(*) aggregate can count them and produce the final result, including |
1378 | | // zero rows when count is 0. |
1379 | 2 | DORIS_CHECK(file_result.count >= 0); |
1380 | 2 | return _materialize_count_rows(cast_set<size_t>(file_result.count), block); |
1381 | 2 | } |
1382 | | // MIN/MAX pushdown emits two rows, min first and max second, for each projected column. |
1383 | | // The upper MIN/MAX aggregate consumes those two rows to produce the final aggregate value. |
1384 | 4 | DORIS_CHECK(file_result.columns.size() == _data_reader.column_mapper->mappings().size()); |
1385 | 4 | DORIS_CHECK(block->columns() == _data_reader.column_mapper->mappings().size()); |
1386 | 4 | Block file_block; |
1387 | 4 | file_block.reserve(_data_reader.file_block_layout.size()); |
1388 | 5 | for (const auto& column : _data_reader.file_block_layout) { |
1389 | 5 | file_block.insert({column.type->create_column(), column.type, column.name}); |
1390 | 5 | } |
1391 | 9 | for (size_t column_idx = 0; column_idx < file_result.columns.size(); ++column_idx) { |
1392 | 5 | const auto& result_column = file_result.columns[column_idx]; |
1393 | 5 | if (!result_column.has_min || !result_column.has_max) { |
1394 | 0 | return Status::NotSupported("Missing min/max aggregate result for column {}", |
1395 | 0 | _projected_columns[column_idx].name); |
1396 | 0 | } |
1397 | 5 | bool found_file_column = false; |
1398 | 6 | for (size_t block_position = 0; block_position < _data_reader.file_block_layout.size(); |
1399 | 6 | ++block_position) { |
1400 | 6 | if (_data_reader.file_block_layout[block_position].file_column_id == |
1401 | 6 | file_result.columns[column_idx].projection.column_id()) { |
1402 | 5 | found_file_column = true; |
1403 | 5 | auto column = file_block.get_by_position(block_position) |
1404 | 5 | .type->create_column() |
1405 | 5 | ->assert_mutable(); |
1406 | 5 | RETURN_IF_ERROR(_insert_aggregate_projection_value( |
1407 | 5 | file_result.columns[column_idx].projection, result_column.min_value, |
1408 | 5 | column.get())); |
1409 | 5 | RETURN_IF_ERROR(_insert_aggregate_projection_value( |
1410 | 5 | file_result.columns[column_idx].projection, result_column.max_value, |
1411 | 5 | column.get())); |
1412 | 5 | file_block.replace_by_position(block_position, std::move(column)); |
1413 | 5 | break; |
1414 | 5 | } |
1415 | 6 | } |
1416 | 5 | DORIS_CHECK(found_file_column); |
1417 | 5 | } |
1418 | 9 | for (size_t column_idx = 0; column_idx < _data_reader.column_mapper->mappings().size(); |
1419 | 5 | ++column_idx) { |
1420 | 5 | ColumnPtr table_column; |
1421 | 5 | RETURN_IF_ERROR( |
1422 | 5 | _materialize_mapping_column(_data_reader.column_mapper->mappings()[column_idx], |
1423 | 5 | &file_block, 2, &table_column)); |
1424 | 5 | block->replace_by_position(column_idx, std::move(table_column)); |
1425 | 5 | } |
1426 | 4 | return Status::OK(); |
1427 | 4 | } |
1428 | | |
1429 | | struct FileBlockColumn { |
1430 | | LocalColumnId file_column_id = LocalColumnId::invalid(); |
1431 | | std::string name; |
1432 | | DataTypePtr type; |
1433 | | }; |
1434 | | |
1435 | | struct DataReader { |
1436 | | std::unique_ptr<FileReader> reader; |
1437 | | std::unique_ptr<TableColumnMapper> column_mapper; |
1438 | | // Schema of the data file, also including virtual column (row position). |
1439 | | std::vector<ColumnDefinition> file_schema; |
1440 | | // Layout of the block returned by file reader, determined by column mapping and file |
1441 | | // schema. It is used for file reader to materialize columns into correct type and position. |
1442 | | std::vector<FileBlockColumn> file_block_layout; |
1443 | | Block block_template; |
1444 | | }; |
1445 | | DataReader _data_reader; |
1446 | | std::vector<ColumnDefinition> _projected_columns; |
1447 | | std::unique_ptr<ScanTask> _current_task; |
1448 | | std::optional<io::FileDescription> _current_file_description; |
1449 | | // Range-level compression has higher priority than scan-param compression. TVF/load can keep |
1450 | | // the logical format as CSV/TEXT while carrying the concrete compression such as GZ or LZO on |
1451 | | // each TFileRangeDesc, matching the old FileScanner reader contract. |
1452 | | TFileCompressType::type _current_range_compress_type = TFileCompressType::UNKNOWN; |
1453 | | std::optional<TUniqueId> _current_range_load_id; |
1454 | | TFileRangeDesc _current_file_range_desc; |
1455 | | std::shared_ptr<io::FileSystemProperties> _system_properties; |
1456 | | // partition key -> value |
1457 | | std::map<std::string, Field> _partition_values; |
1458 | | // Predicates built from scan conjuncts before file-level localization. |
1459 | | std::vector<TableFilter> _table_filters; |
1460 | | TableColumnPredicates _table_column_predicates; |
1461 | | VExprContextSPtrs _conjuncts; |
1462 | | ReadProfile _profile; |
1463 | | // Parsed from row-position based delete files, including position delete and deletion vector. |
1464 | | DeleteRows* _delete_rows = nullptr; |
1465 | | TFileScanRangeParams* _scan_params; |
1466 | | std::shared_ptr<io::IOContext> _io_ctx; |
1467 | | RuntimeState* _runtime_state; |
1468 | | RuntimeProfile* _scanner_profile; |
1469 | | const std::vector<SlotDescriptor*>* _file_slot_descs = nullptr; |
1470 | | FileFormat _format; |
1471 | | TPushAggOp::type _push_down_agg_type = TPushAggOp::type::NONE; |
1472 | | size_t _batch_size = 0; |
1473 | | uint64_t _condition_cache_digest = 0; |
1474 | | segment_v2::ConditionCache::ExternalCacheKey _condition_cache_key; |
1475 | | std::shared_ptr<std::vector<bool>> _condition_cache; |
1476 | | std::shared_ptr<ConditionCacheContext> _condition_cache_ctx; |
1477 | | int64_t _condition_cache_hit_count = 0; |
1478 | | bool _current_reader_reached_eof = false; |
1479 | | int64_t _remaining_table_level_count = -1; |
1480 | | std::optional<GlobalRowIdContext> _global_rowid_context; |
1481 | | bool _aggregate_pushdown_tried = false; |
1482 | | TableColumnMapperOptions _mapper_options; |
1483 | | |
1484 | | private: |
1485 | | static const ColumnDefinition* _find_column_definition( |
1486 | 3.18k | const std::vector<ColumnDefinition>& schema, LocalColumnId column_id) { |
1487 | 23.0k | for (const auto& field : schema) { |
1488 | 23.0k | if (field.file_local_id() == column_id.value()) { |
1489 | 3.16k | return &field; |
1490 | 3.16k | } |
1491 | 23.0k | } |
1492 | 17 | return nullptr; |
1493 | 3.18k | } |
1494 | | |
1495 | 15 | static bool _can_push_down_minmax_for_mapping(const ColumnMapping& mapping) { |
1496 | 15 | if (mapping.child_mappings.empty()) { |
1497 | 12 | return true; |
1498 | 12 | } |
1499 | 3 | const auto primitive_type = remove_nullable(mapping.file_type)->get_primitive_type(); |
1500 | 3 | if (primitive_type != TYPE_STRUCT) { |
1501 | 1 | return false; |
1502 | 1 | } |
1503 | 2 | size_t mapped_children = 0; |
1504 | 2 | const ColumnMapping* mapped_child = nullptr; |
1505 | 2 | for (const auto& child_mapping : mapping.child_mappings) { |
1506 | 2 | if (!child_mapping.file_local_id.has_value()) { |
1507 | 0 | continue; |
1508 | 0 | } |
1509 | 2 | ++mapped_children; |
1510 | 2 | mapped_child = &child_mapping; |
1511 | 2 | } |
1512 | 2 | return mapped_children == 1 && mapped_child != nullptr && |
1513 | 2 | _can_push_down_minmax_for_mapping(*mapped_child); |
1514 | 3 | } |
1515 | | |
1516 | | static Status build_aggregate_projection(const ColumnMapping& mapping, |
1517 | 2 | LocalColumnIndex* projection) { |
1518 | 2 | DORIS_CHECK(projection != nullptr); |
1519 | 2 | DORIS_CHECK(mapping.file_local_id.has_value()); |
1520 | 2 | *projection = LocalColumnIndex::local(*mapping.file_local_id); |
1521 | 2 | projection->children.clear(); |
1522 | 2 | projection->project_all_children = true; |
1523 | 2 | if (mapping.child_mappings.empty()) { |
1524 | 1 | return Status::OK(); |
1525 | 1 | } |
1526 | 1 | projection->project_all_children = false; |
1527 | 1 | for (const auto& child_mapping : mapping.child_mappings) { |
1528 | 1 | if (!child_mapping.file_local_id.has_value()) { |
1529 | 0 | continue; |
1530 | 0 | } |
1531 | 1 | LocalColumnIndex child_projection; |
1532 | 1 | RETURN_IF_ERROR(build_aggregate_projection(child_mapping, &child_projection)); |
1533 | 1 | projection->children.push_back(std::move(child_projection)); |
1534 | 1 | } |
1535 | 1 | DORIS_CHECK(projection->children.size() == 1); |
1536 | 1 | return Status::OK(); |
1537 | 1 | } |
1538 | | |
1539 | | static Status _insert_aggregate_projection_value(const LocalColumnIndex& projection, |
1540 | 24 | const Field& value, IColumn* column) { |
1541 | 24 | DORIS_CHECK(column != nullptr); |
1542 | 24 | if (auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) { |
1543 | 12 | RETURN_IF_ERROR(_insert_aggregate_projection_value( |
1544 | 12 | projection, value, &nullable_column->get_nested_column())); |
1545 | 12 | nullable_column->get_null_map_data().push_back(0); |
1546 | 12 | return Status::OK(); |
1547 | 12 | } |
1548 | 12 | if (projection.project_all_children || projection.children.empty()) { |
1549 | 10 | column->insert(value); |
1550 | 10 | return Status::OK(); |
1551 | 10 | } |
1552 | 2 | auto* struct_column = assert_cast<ColumnStruct*>(column); |
1553 | 2 | DORIS_CHECK(projection.children.size() == 1); |
1554 | 2 | const auto& child_projection = projection.children[0]; |
1555 | 2 | DORIS_CHECK(struct_column->get_columns().size() == 1); |
1556 | 2 | RETURN_IF_ERROR(_insert_aggregate_projection_value(child_projection, value, |
1557 | 2 | &struct_column->get_column(0))); |
1558 | 2 | return Status::OK(); |
1559 | 2 | } |
1560 | | |
1561 | | // Parse row-position deletes from table format specific parameters, and fill in _delete_rows. |
1562 | | Status _parse_delete_predicates(const SplitReadOptions& options); |
1563 | | }; |
1564 | | |
1565 | | } // namespace doris::format |