Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <bvar/status.h> |
21 | | |
22 | | #include <algorithm> |
23 | | #include <exception> |
24 | | #include <map> |
25 | | #include <memory> |
26 | | #include <optional> |
27 | | #include <string> |
28 | | #include <string_view> |
29 | | #include <utility> |
30 | | #include <vector> |
31 | | |
32 | | #include "common/cast_set.h" |
33 | | #include "common/exception.h" |
34 | | #include "common/logging.h" |
35 | | #include "common/status.h" |
36 | | #include "core/assert_cast.h" |
37 | | #include "core/block/block.h" |
38 | | #include "core/column/column_array.h" |
39 | | #include "core/column/column_const.h" |
40 | | #include "core/column/column_map.h" |
41 | | #include "core/column/column_nullable.h" |
42 | | #include "core/column/column_struct.h" |
43 | | #include "core/column/column_vector.h" |
44 | | #include "core/data_type/data_type.h" |
45 | | #include "core/data_type/data_type_array.h" |
46 | | #include "core/data_type/data_type_map.h" |
47 | | #include "core/data_type/data_type_nullable.h" |
48 | | #include "core/data_type/data_type_number.h" |
49 | | #include "core/data_type/data_type_string.h" |
50 | | #include "core/data_type/data_type_struct.h" |
51 | | #include "core/field.h" |
52 | | #include "exec/common/stringop_substring.h" |
53 | | #include "exprs/vexpr.h" |
54 | | #include "exprs/vexpr_context.h" |
55 | | #include "exprs/vexpr_fwd.h" |
56 | | #include "exprs/vslot_ref.h" |
57 | | #include "format_v2/column_data.h" |
58 | | #include "format_v2/column_mapper.h" |
59 | | #include "format_v2/expr/cast.h" |
60 | | #include "format_v2/expr/delete_predicate.h" |
61 | | #include "format_v2/file_reader.h" |
62 | | #include "format_v2/parquet/reader/column_reader.h" |
63 | | #include "format_v2/schema_projection.h" |
64 | | #include "gen_cpp/PlanNodes_types.h" |
65 | | #include "runtime/descriptors.h" |
66 | | #include "storage/segment/condition_cache.h" |
67 | | |
68 | | namespace doris { |
69 | | class Block; |
70 | | class ColumnPredicate; |
71 | | struct DeleteFileDesc; |
72 | | class RuntimeState; |
73 | | } // namespace doris |
74 | | |
75 | | namespace doris::format { |
76 | | |
77 | | using DeleteRows = std::vector<int64_t>; |
78 | | |
79 | | // Row-level predicates on table/global schema. They are rewritten to file-local expressions when |
80 | | // possible, and remain the source of row-level filtering after localization. |
81 | | struct TableFilter { |
82 | | VExprContextSPtr conjunct; |
83 | | std::vector<GlobalIndex> global_indices; |
84 | | }; |
85 | | |
86 | | struct ScanTask { |
87 | 76 | virtual ~ScanTask() = default; |
88 | | |
89 | | std::unique_ptr<io::FileDescription> data_file; |
90 | | }; |
91 | | |
92 | | struct ProjectedColumnBuildContext { |
93 | | const TFileScanRangeParams* scan_params = nullptr; |
94 | | const TFileRangeDesc* range = nullptr; |
95 | | RuntimeState* runtime_state = nullptr; |
96 | | std::optional<ColumnDefinition> schema_column = std::nullopt; |
97 | | size_t next_file_column_idx = 0; |
98 | | }; |
99 | | |
100 | | struct ReadProfile { |
101 | | RuntimeProfile::Counter* num_delete_files = nullptr; |
102 | | RuntimeProfile::Counter* num_delete_rows = nullptr; |
103 | | RuntimeProfile::Counter* parse_delete_file_time = nullptr; |
104 | | RuntimeProfile::Counter* exec_timer = nullptr; |
105 | | RuntimeProfile::Counter* prepare_split_timer = nullptr; |
106 | | RuntimeProfile::Counter* finalize_timer = nullptr; |
107 | | RuntimeProfile::Counter* create_reader_timer = nullptr; |
108 | | RuntimeProfile::Counter* pushdown_agg_timer = nullptr; |
109 | | RuntimeProfile::Counter* open_reader_timer = nullptr; |
110 | | }; |
111 | | |
112 | | struct TableReadOptions { |
113 | | // Columns need to be read from file and output by table reader. They are all in table/global |
114 | | // schema semantics. |
115 | | const std::vector<ColumnDefinition> projected_columns; |
116 | | // Simple predicates for a single column, which is parsed on scan operator. |
117 | | const TableColumnPredicates column_predicates; |
118 | | // All complex conjuncts from scan operator |
119 | | const VExprContextSPtrs conjuncts; |
120 | | // File format of the underlying data files, needed for reader initialization and reader-level |
121 | | // filter pushdown. |
122 | | const FileFormat format; |
123 | | TFileScanRangeParams* scan_params; |
124 | | std::shared_ptr<io::IOContext> io_ctx; |
125 | | RuntimeState* runtime_state; |
126 | | RuntimeProfile* scanner_profile; |
127 | | // File formats without self-describing metadata, such as CSV, need the original FE slot |
128 | | // descriptors to build their file-local schema and deserialize values. Self-describing formats |
129 | | // ignore this field and use metadata parsed from the file footer/header. |
130 | | const std::vector<SlotDescriptor*>* file_slot_descs = nullptr; |
131 | | // Push-down aggregate type. |
132 | | const TPushAggOp::type push_down_agg_type = TPushAggOp::type::NONE; |
133 | | // Digest of stable pushed-down predicates. A zero digest disables condition cache. |
134 | | uint64_t condition_cache_digest = 0; |
135 | | }; |
136 | | |
137 | | struct SplitReadOptions { |
138 | | // Split-level information for reader initialization, which may include file path, partition values, delete file info, etc. The content is table format specific and opaque to table reader base class; it's the responsibility of the concrete table reader implementation to parse necessary information for reader initialization and filter pushdown. |
139 | | std::map<std::string, Field> partition_values; |
140 | | ShardedKVCache* cache; |
141 | | TFileRangeDesc current_range; |
142 | | std::optional<GlobalRowIdContext> global_rowid_context; |
143 | | }; |
144 | | |
145 | | // Base class for table-level readers. |
146 | | // This layer owns common table-level orchestration, such as split iteration, dynamic partition |
147 | | // pruning, delete handling and conversion from file-local blocks to table-schema blocks. Concrete |
148 | | // table-format readers only need to provide format-specific hooks for opening readers and parsing |
149 | | // split metadata. |
150 | | class TableReader { |
151 | | public: |
152 | 86 | virtual ~TableReader() = default; |
153 | | |
154 | | // Initialize common runtime options for the table reader. Subclasses may call this from their |
155 | | // own init(options); table-format schema and split metadata are provided later per split. |
156 | | virtual Status init(TableReadOptions&& options); |
157 | | |
158 | | // Prepare for reading a new split/task. |
159 | | // 1. Pass a new split/task to reader, which will be used in subsequent open_reader() to initialize the underlying file reader. |
160 | | // 2. Parse delete predicates from split/task information, which will be used for later dynamic filtering and delete handling. |
161 | | virtual Status prepare_split(const SplitReadOptions& options); |
162 | | |
163 | | // Public entry point for reading a table-schema block. The base class opens the current reader, |
164 | | // advances across EOF, and closes exhausted readers. Subclasses provide protected hooks for |
165 | | // table-format-specific behavior. |
166 | 80 | virtual Status get_block(Block* block, bool* eos) { |
167 | 80 | SCOPED_TIMER(_profile.exec_timer); |
168 | 80 | DORIS_CHECK(block->columns() == _projected_columns.size()); |
169 | 80 | block->clear_column_data(_projected_columns.size()); |
170 | | |
171 | 90 | while (true) { |
172 | 90 | if (*eos) { |
173 | 0 | return Status::OK(); |
174 | 0 | } |
175 | 90 | if (!_data_reader.reader) { |
176 | 81 | if (_is_table_level_count_active()) { |
177 | 6 | RETURN_IF_ERROR(_read_table_level_count(block, eos)); |
178 | 6 | return Status::OK(); |
179 | 6 | } |
180 | 75 | RETURN_IF_ERROR(create_next_reader(eos)); |
181 | 75 | if (!_data_reader.reader) { |
182 | 12 | DCHECK(*eos); |
183 | 12 | return Status::OK(); |
184 | 12 | } |
185 | 75 | } |
186 | | |
187 | | // Materialize a reduced row set for upper aggregate operators when aggregate |
188 | | // pushdown can be applied. This is not the final aggregate result: COUNT emits |
189 | | // `count` default rows for the upper COUNT(*), and MIN/MAX emits two rows containing |
190 | | // file-level min/max values for the upper MIN/MAX. |
191 | 72 | if (!_aggregate_pushdown_tried) { |
192 | 63 | SCOPED_TIMER(_profile.pushdown_agg_timer); |
193 | 63 | bool pushed_down = false; |
194 | 63 | RETURN_IF_ERROR(_try_materialize_aggregate_pushdown_rows(block, &pushed_down)); |
195 | 63 | if (pushed_down) { |
196 | 6 | return Status::OK(); |
197 | 6 | } |
198 | 63 | } |
199 | | |
200 | 66 | bool current_eof = false; |
201 | 66 | _data_reader.block_template.clear_column_data( |
202 | 66 | cast_set<int64_t>(_data_reader.file_block_layout.size())); |
203 | 66 | size_t current_rows = 0; |
204 | 66 | RETURN_IF_ERROR(_data_reader.reader->get_block(&_data_reader.block_template, |
205 | 66 | ¤t_rows, ¤t_eof)); |
206 | 66 | if (current_rows == 0) { |
207 | 10 | if (current_eof) { |
208 | 10 | _current_reader_reached_eof = true; |
209 | 10 | RETURN_IF_ERROR(close_current_reader()); |
210 | 10 | } |
211 | 10 | continue; |
212 | 10 | } |
213 | 66 | DCHECK_EQ(_data_reader.block_template.columns(), _data_reader.file_block_layout.size()) |
214 | 0 | << _data_reader.block_template.dump_structure(); |
215 | 56 | #ifndef NDEBUG |
216 | 56 | RETURN_IF_ERROR(_check_file_block_columns("after file reader get_block", current_rows)); |
217 | 56 | #endif |
218 | 56 | DORIS_CHECK(block->columns() == _data_reader.column_mapper->mappings().size()); |
219 | 56 | RETURN_IF_ERROR(finalize_chunk(block, current_rows)); |
220 | 56 | #ifndef NDEBUG |
221 | 56 | RETURN_IF_ERROR( |
222 | 56 | _check_table_block_columns("after finalize_chunk", block, current_rows)); |
223 | 56 | #endif |
224 | 56 | if (current_eof) { |
225 | 6 | _current_reader_reached_eof = true; |
226 | 6 | RETURN_IF_ERROR(close_current_reader()); |
227 | 6 | } |
228 | 56 | return Status::OK(); |
229 | 56 | } |
230 | 80 | } |
231 | | |
232 | | // Close the table reader and the currently active file reader. Subclasses that hold additional |
233 | | // table-format resources should override this and call TableReader::close() first. |
234 | 66 | virtual Status close() { |
235 | 66 | if (_data_reader.reader) { |
236 | 41 | RETURN_IF_ERROR(close_current_reader()); |
237 | 41 | } |
238 | 66 | _current_task.reset(); |
239 | 66 | _current_file_description.reset(); |
240 | 66 | _remaining_table_level_count = -1; |
241 | 66 | return Status::OK(); |
242 | 66 | } |
243 | | |
244 | 3 | int64_t condition_cache_hit_count() const { return _condition_cache_hit_count; } |
245 | | |
246 | | virtual std::string debug_string() const; |
247 | | |
248 | | virtual Status annotate_projected_column(const TFileScanSlotInfo& slot_info, |
249 | | ProjectedColumnBuildContext* context, |
250 | | ColumnDefinition* column) const; |
251 | | |
252 | 0 | virtual Status validate_projected_columns(const ProjectedColumnBuildContext& context) const { |
253 | 0 | (void)context; |
254 | 0 | return Status::OK(); |
255 | 0 | } |
256 | | |
257 | | protected: |
258 | | // Parse deletion vector information from table format specific file description. |
259 | | virtual Status _parse_deletion_vector_file(const TTableFormatFileDesc& t_desc, |
260 | 48 | DeleteFileDesc* desc, bool* has_delete_file) { |
261 | 48 | *has_delete_file = false; |
262 | 48 | return Status::OK(); |
263 | 48 | } |
264 | | |
265 | | // Advance to the next reader. This closes the current reader first and then opens the next |
266 | | // concrete reader. Subclasses should not duplicate this loop. |
267 | | Status create_next_reader(bool* eos); |
268 | | virtual Status create_file_reader(std::unique_ptr<FileReader>* reader); |
269 | 44 | virtual TableColumnMappingMode mapping_mode() const { return TableColumnMappingMode::BY_NAME; } |
270 | 63 | virtual Status annotate_file_schema(std::vector<ColumnDefinition>* file_schema) { |
271 | 63 | DORIS_CHECK(file_schema != nullptr); |
272 | 63 | return Status::OK(); |
273 | 63 | } |
274 | | |
275 | | // Open the concrete reader for the current split/task and build the file-local scan request. |
276 | 64 | virtual Status open_reader() { |
277 | 64 | SCOPED_TIMER(_profile.open_reader_timer); |
278 | | // 1. Get file schema and create column mapping. |
279 | 64 | std::vector<ColumnDefinition> file_schema; |
280 | 64 | RETURN_IF_ERROR(_data_reader.reader->get_schema(&file_schema)); |
281 | | // For Paimon/Hudi, FE can provide field ids through `history_schema_info`. Annotate the |
282 | | // file schema before column mapping when the table format maps columns by field id. |
283 | 64 | RETURN_IF_ERROR(annotate_file_schema(&file_schema)); |
284 | 64 | _data_reader.file_schema = file_schema; |
285 | 64 | _mapper_options.mode = mapping_mode(); |
286 | | |
287 | 64 | _data_reader.column_mapper = _data_reader.reader->create_column_mapper(_mapper_options); |
288 | 64 | DORIS_CHECK(_data_reader.column_mapper != nullptr); |
289 | 64 | RETURN_IF_ERROR(_data_reader.column_mapper->create_mapping(_projected_columns, |
290 | 64 | _partition_values, file_schema)); |
291 | 64 | DORIS_CHECK(_data_reader.column_mapper->mappings().size() == _projected_columns.size()); |
292 | | |
293 | | // 2. Build table filters based on conjuncts and column predicates. |
294 | 64 | RETURN_IF_ERROR(_build_table_filters_from_conjuncts()); |
295 | | |
296 | | // 3. Create file scan request based on column mapping and table filters, then open file |
297 | | // reader with the request. File scan request carries row-level expression filters and |
298 | | // file-level pruning hints. Only expression filters decide returned rows; column predicates |
299 | | // are pruning hints. |
300 | 64 | auto file_request = std::make_shared<FileScanRequest>(); |
301 | 64 | RETURN_IF_ERROR(_data_reader.column_mapper->create_scan_request( |
302 | 64 | _table_filters, _table_column_predicates, _projected_columns, file_request.get(), |
303 | 64 | _runtime_state)); |
304 | 64 | bool constant_filter_pruned_split = false; |
305 | 64 | RETURN_IF_ERROR(_evaluate_constant_filters(&constant_filter_pruned_split)); |
306 | 64 | if (constant_filter_pruned_split) { |
307 | 1 | RETURN_IF_ERROR(close_current_reader()); |
308 | 1 | return Status::OK(); |
309 | 1 | } |
310 | 63 | RETURN_IF_ERROR(customize_file_scan_request(file_request.get())); |
311 | 63 | RETURN_IF_ERROR(_open_local_filter_exprs(*file_request)); |
312 | 63 | _data_reader.file_block_layout.clear(); |
313 | 63 | _data_reader.block_template.clear(); |
314 | 63 | _data_reader.file_block_layout.resize(file_request->local_positions.size()); |
315 | | |
316 | | // 4. Build file block layout from file schema and column mapping. The layout describes |
317 | | // the block returned by file reader before table-column materialization. |
318 | 91 | for (const auto& [file_column_id, block_position] : file_request->local_positions) { |
319 | 91 | DORIS_CHECK(block_position.value() < _data_reader.file_block_layout.size()); |
320 | 91 | const auto* field = _find_column_definition(_data_reader.file_schema, file_column_id); |
321 | 91 | DORIS_CHECK(field != nullptr); |
322 | | |
323 | 91 | ColumnDefinition projected_field; |
324 | 91 | { |
325 | 91 | auto it = std::find_if( |
326 | 91 | file_request->non_predicate_columns.begin(), |
327 | 91 | file_request->non_predicate_columns.end(), |
328 | 111 | [&](const LocalColumnIndex& p) { return p.column_id() == file_column_id; }); |
329 | 91 | if (it != file_request->non_predicate_columns.end()) { |
330 | 65 | RETURN_IF_ERROR(project_column_definition(*field, *it, &projected_field)); |
331 | 65 | } |
332 | 91 | } |
333 | 91 | { |
334 | 91 | auto it = std::find_if( |
335 | 91 | file_request->predicate_columns.begin(), |
336 | 91 | file_request->predicate_columns.end(), |
337 | 91 | [&](const LocalColumnIndex& p) { return p.column_id() == file_column_id; }); |
338 | 91 | if (it != file_request->predicate_columns.end()) { |
339 | 26 | RETURN_IF_ERROR(project_column_definition(*field, *it, &projected_field)); |
340 | 26 | } |
341 | 91 | } |
342 | 91 | _data_reader.file_block_layout[block_position.value()] = { |
343 | 91 | .file_column_id = file_column_id, |
344 | 91 | .name = projected_field.name, |
345 | 91 | .type = projected_field.type, |
346 | 91 | }; |
347 | 91 | DORIS_CHECK(_data_reader.file_block_layout[block_position.value()].type != nullptr); |
348 | 91 | } |
349 | | |
350 | | // 5. Prepare block template from file block layout. The block template stores the block |
351 | | // returned by file reader before table-column materialization. |
352 | 63 | _data_reader.block_template.reserve(_data_reader.file_block_layout.size()); |
353 | 91 | for (const auto& column : _data_reader.file_block_layout) { |
354 | 91 | _data_reader.block_template.insert( |
355 | 91 | {column.type->create_column(), column.type, column.name}); |
356 | 91 | } |
357 | 63 | if (VLOG_DEBUG_IS_ON) { |
358 | 0 | VLOG_DEBUG << "TableReader debug: " << debug_string(); |
359 | 0 | } |
360 | 63 | RETURN_IF_ERROR(_open_mapping_exprs()); |
361 | 63 | RETURN_IF_ERROR(_data_reader.reader->open(file_request)); |
362 | 63 | RETURN_IF_ERROR(_init_reader_condition_cache(*file_request)); |
363 | 63 | return Status::OK(); |
364 | 63 | } |
365 | | |
366 | | Status _build_table_filters_from_conjuncts(); |
367 | | Status _open_local_filter_exprs(const FileScanRequest& file_request); |
368 | | Status _init_reader_condition_cache(const FileScanRequest& file_request); |
369 | | void _finalize_reader_condition_cache(); |
370 | | bool _should_enable_condition_cache(const FileScanRequest& file_request) const; |
371 | | |
372 | 64 | Status _evaluate_constant_filters(bool* can_filter_all) { |
373 | 64 | DORIS_CHECK(can_filter_all != nullptr); |
374 | 64 | *can_filter_all = false; |
375 | 64 | for (const auto& table_filter : _table_filters) { |
376 | 19 | if (table_filter.conjunct == nullptr || |
377 | | // RuntimeFilterExpr does not implement execute_column_impl(); it is evaluated by |
378 | | // the row-level filter path through execute_filter(). Constant split pruning uses |
379 | | // VExprContext::execute() on a one-row synthetic block, so runtime filters must not |
380 | | // be pre-executed here even when their referenced slot maps to a constant value. |
381 | 19 | table_filter.conjunct->root()->is_rf_wrapper() || |
382 | 19 | !_table_filter_has_only_constant_entries(table_filter)) { |
383 | 17 | continue; |
384 | 17 | } |
385 | 2 | Block eval_block; |
386 | 2 | RETURN_IF_ERROR(_build_constant_filter_block(table_filter, &eval_block)); |
387 | 2 | RowDescriptor row_desc; |
388 | 2 | RETURN_IF_ERROR(table_filter.conjunct->prepare(_runtime_state, row_desc)); |
389 | 2 | RETURN_IF_ERROR(table_filter.conjunct->open(_runtime_state)); |
390 | 2 | int result_column_id = -1; |
391 | 2 | RETURN_IF_ERROR(table_filter.conjunct->execute(&eval_block, &result_column_id)); |
392 | 2 | DORIS_CHECK(result_column_id >= 0); |
393 | 2 | if (_filter_result_filters_all(eval_block.get_by_position(result_column_id).column)) { |
394 | 1 | *can_filter_all = true; |
395 | 1 | return Status::OK(); |
396 | 1 | } |
397 | 2 | } |
398 | 63 | return Status::OK(); |
399 | 64 | } |
400 | | |
401 | 17 | bool _table_filter_has_only_constant_entries(const TableFilter& table_filter) const { |
402 | 17 | const auto& filter_entries = _data_reader.column_mapper->filter_entries(); |
403 | 17 | for (const auto global_index : table_filter.global_indices) { |
404 | 17 | const auto entry_it = filter_entries.find(global_index); |
405 | 17 | if (entry_it == filter_entries.end() || !entry_it->second.is_constant()) { |
406 | 15 | return false; |
407 | 15 | } |
408 | 17 | } |
409 | 2 | return !table_filter.global_indices.empty(); |
410 | 17 | } |
411 | | |
412 | 2 | Status _build_constant_filter_block(const TableFilter& table_filter, Block* eval_block) { |
413 | 2 | DORIS_CHECK(eval_block != nullptr); |
414 | 2 | eval_block->clear(); |
415 | 2 | const auto& mappings = _data_reader.column_mapper->mappings(); |
416 | 2 | const auto& filter_entries = _data_reader.column_mapper->filter_entries(); |
417 | 2 | DORIS_CHECK(mappings.size() == _projected_columns.size()); |
418 | 4 | for (size_t column_idx = 0; column_idx < mappings.size(); ++column_idx) { |
419 | 2 | const auto global_index = GlobalIndex(column_idx); |
420 | 2 | const auto& mapping = mappings[column_idx]; |
421 | 2 | const auto entry_it = filter_entries.find(global_index); |
422 | 2 | const bool referenced_by_filter = |
423 | 2 | std::find(table_filter.global_indices.begin(), |
424 | 2 | table_filter.global_indices.end(), |
425 | 2 | global_index) != table_filter.global_indices.end(); |
426 | 2 | if (referenced_by_filter && entry_it != filter_entries.end() && |
427 | 2 | entry_it->second.is_constant()) { |
428 | 2 | ColumnPtr constant_column; |
429 | 2 | RETURN_IF_ERROR(_materialize_constant_filter_column( |
430 | 2 | entry_it->second.constant_index(), &constant_column)); |
431 | 2 | eval_block->insert({std::move(constant_column), mapping.table_type, |
432 | 2 | mapping.table_column_name}); |
433 | 2 | } else { |
434 | 0 | eval_block->insert({mapping.table_type->create_column_const_with_default_value(1), |
435 | 0 | mapping.table_type, mapping.table_column_name}); |
436 | 0 | } |
437 | 2 | } |
438 | 2 | return Status::OK(); |
439 | 2 | } |
440 | | |
441 | 2 | Status _materialize_constant_filter_column(ConstantIndex constant_index, ColumnPtr* column) { |
442 | 2 | DORIS_CHECK(column != nullptr); |
443 | 2 | const auto& constant_entry = _data_reader.column_mapper->constant_map().get(constant_index); |
444 | 2 | DORIS_CHECK(constant_entry.expr != nullptr); |
445 | 2 | DORIS_CHECK(constant_entry.type != nullptr); |
446 | 2 | RowDescriptor row_desc; |
447 | 2 | RETURN_IF_ERROR(constant_entry.expr->prepare(_runtime_state, row_desc)); |
448 | 2 | RETURN_IF_ERROR(constant_entry.expr->open(_runtime_state)); |
449 | 2 | Block eval_block; |
450 | 2 | eval_block.insert({constant_entry.type->create_column_const_with_default_value(1), |
451 | 2 | constant_entry.type, "__table_reader_constant_filter"}); |
452 | 2 | int result_column_id = -1; |
453 | 2 | RETURN_IF_ERROR(constant_entry.expr->execute(&eval_block, &result_column_id)); |
454 | 2 | DORIS_CHECK(result_column_id >= 0); |
455 | 2 | *column = eval_block.get_by_position(result_column_id).column; |
456 | 2 | DORIS_CHECK((*column)->size() == 1); |
457 | 2 | return Status::OK(); |
458 | 2 | } |
459 | | |
460 | 2 | static bool _filter_result_filters_all(const ColumnPtr& filter_column) { |
461 | 2 | DORIS_CHECK(filter_column.get() != nullptr); |
462 | 2 | DORIS_CHECK(filter_column->size() == 1); |
463 | 2 | return !filter_column->get_bool(0); |
464 | 2 | } |
465 | | |
466 | 64 | virtual Status customize_file_scan_request(FileScanRequest* file_request) { |
467 | 64 | return _append_delete_predicate(file_request); |
468 | 64 | } |
469 | | |
470 | 178 | bool _is_table_level_count_active() const { return _remaining_table_level_count >= 0; } |
471 | | |
472 | 6 | Status _materialize_count_rows(size_t rows, Block* block) const { |
473 | 6 | DORIS_CHECK(block != nullptr); |
474 | 6 | DORIS_CHECK(block->columns() > 0 || rows == 0); |
475 | 12 | for (size_t column_idx = 0; column_idx < block->columns(); ++column_idx) { |
476 | 6 | auto column = block->get_by_position(column_idx).type->create_column(); |
477 | 6 | column->resize(rows); |
478 | 6 | block->replace_by_position(column_idx, std::move(column)); |
479 | 6 | } |
480 | 6 | return Status::OK(); |
481 | 6 | } |
482 | | |
483 | 6 | Status _read_table_level_count(Block* block, bool* eos) { |
484 | 6 | DORIS_CHECK(block != nullptr); |
485 | 6 | DORIS_CHECK(eos != nullptr); |
486 | 6 | DORIS_CHECK(_push_down_agg_type == TPushAggOp::type::COUNT); |
487 | 6 | DORIS_CHECK(_remaining_table_level_count >= 0); |
488 | 6 | if (_remaining_table_level_count == 0) { |
489 | 2 | _remaining_table_level_count = -1; |
490 | 2 | _current_task.reset(); |
491 | 2 | *eos = true; |
492 | 2 | return Status::OK(); |
493 | 2 | } |
494 | | |
495 | 4 | const int64_t batch_size = _runtime_state == nullptr |
496 | 4 | ? _remaining_table_level_count |
497 | 4 | : static_cast<int64_t>(_runtime_state->batch_size()); |
498 | 4 | const auto rows = std::min(_remaining_table_level_count, batch_size); |
499 | 4 | RETURN_IF_ERROR(_materialize_count_rows(cast_set<size_t>(rows), block)); |
500 | 4 | _remaining_table_level_count -= rows; |
501 | 4 | *eos = false; |
502 | 4 | return Status::OK(); |
503 | 4 | } |
504 | | |
505 | | void _append_file_scan_column(FileScanRequest* request, LocalColumnId column_id, |
506 | 24 | std::vector<LocalColumnIndex>* scan_columns) { |
507 | 24 | DORIS_CHECK(request != nullptr); |
508 | 24 | DORIS_CHECK(scan_columns != nullptr); |
509 | 24 | FileScanRequestBuilder builder(request); |
510 | 24 | Status status; |
511 | 24 | if (scan_columns == &request->predicate_columns) { |
512 | 13 | status = builder.add_predicate_column(column_id); |
513 | 13 | } else { |
514 | 11 | DORIS_CHECK(scan_columns == &request->non_predicate_columns); |
515 | 11 | status = builder.add_non_predicate_column(column_id); |
516 | 11 | } |
517 | 24 | DORIS_CHECK(status.ok()) << status.to_string(); |
518 | 24 | if (column_id == LocalColumnId(ROW_POSITION_COLUMN_ID) && |
519 | 24 | _find_column_definition(_data_reader.file_schema, column_id) == nullptr) { |
520 | 17 | _data_reader.file_schema.push_back(row_position_column_definition()); |
521 | 17 | } |
522 | 24 | } |
523 | | |
524 | | // Append DeletePredicate to file scan request if there are deletes. The predicate will be evaluated in file reader level and filter out deleted rows before returning data to table reader. |
525 | 64 | Status _append_delete_predicate(FileScanRequest* request) { |
526 | 64 | DORIS_CHECK(request != nullptr); |
527 | 64 | if (_delete_rows == nullptr || _delete_rows->empty()) { |
528 | 54 | return Status::OK(); |
529 | 54 | } |
530 | 10 | const auto row_position_column_id = LocalColumnId(ROW_POSITION_COLUMN_ID); |
531 | 10 | _append_file_scan_column(request, row_position_column_id, &request->predicate_columns); |
532 | | |
533 | 10 | auto delete_predicate = std::make_shared<DeletePredicate>(*_delete_rows); |
534 | 10 | const auto block_position = request->local_positions.at(row_position_column_id); |
535 | 10 | delete_predicate->add_child(VSlotRef::create_shared( |
536 | 10 | cast_set<int>(block_position.value()), cast_set<int>(block_position.value()), -1, |
537 | 10 | std::make_shared<DataTypeInt64>(), ROW_POSITION_COLUMN_NAME)); |
538 | | |
539 | 10 | request->delete_conjuncts.push_back( |
540 | 10 | VExprContext::create_shared(std::move(delete_predicate))); |
541 | 10 | return Status::OK(); |
542 | 64 | } |
543 | | |
544 | | // Close the current concrete reader. This hook is called by both create_next_reader() and |
545 | | // close(), so it should remain idempotent. |
546 | 64 | virtual Status close_current_reader() { |
547 | 64 | _finalize_reader_condition_cache(); |
548 | 64 | RETURN_IF_ERROR(_data_reader.reader->close()); |
549 | 64 | _data_reader.reader.reset(); |
550 | 64 | if (_data_reader.column_mapper != nullptr) { |
551 | 64 | _data_reader.column_mapper->clear(); |
552 | 64 | _data_reader.column_mapper.reset(); |
553 | 64 | } |
554 | 64 | _table_filters.clear(); |
555 | 64 | _data_reader.file_schema.clear(); |
556 | 64 | _data_reader.file_block_layout.clear(); |
557 | 64 | _data_reader.block_template.clear(); |
558 | 64 | _current_task.reset(); |
559 | 64 | _current_file_description.reset(); |
560 | 64 | _current_reader_reached_eof = false; |
561 | 64 | return Status::OK(); |
562 | 64 | } |
563 | | |
564 | | // Finalize file-local block to table/global schema block. |
565 | 56 | Status finalize_chunk(Block* block, const size_t rows) { |
566 | 56 | SCOPED_TIMER(_profile.finalize_timer); |
567 | 56 | size_t idx = 0; |
568 | 84 | for (const auto& mapping : _data_reader.column_mapper->mappings()) { |
569 | 84 | ColumnPtr column; |
570 | 84 | RETURN_IF_ERROR(_materialize_mapping_column(mapping, &_data_reader.block_template, rows, |
571 | 84 | &column)); |
572 | 84 | block->replace_by_position(idx, IColumn::mutate(std::move(column))); |
573 | 84 | idx++; |
574 | 84 | } |
575 | 56 | RETURN_IF_ERROR(materialize_virtual_columns(block)); |
576 | | // Enforce CHAR/VARCHAR length declared by the table schema after all file-to-table |
577 | | // materialization has finished. |
578 | 56 | RETURN_IF_ERROR(_truncate_char_or_varchar_columns(block)); |
579 | 56 | return Status::OK(); |
580 | 56 | } |
581 | | |
582 | | // Materialize virtual columns in the table block, such as Iceberg _row_id and |
583 | | // _last_updated_sequence_number. This runs after normal column materialization so finalize |
584 | | // expressions can reference those virtual columns. |
585 | 37 | virtual Status materialize_virtual_columns(Block* table_block) { return Status::OK(); } |
586 | | |
587 | | #ifndef NDEBUG |
588 | 56 | Status _check_file_block_columns(std::string_view stage, size_t rows) { |
589 | 56 | DORIS_CHECK(_data_reader.block_template.columns() == _data_reader.file_block_layout.size()); |
590 | 138 | for (size_t idx = 0; idx < _data_reader.block_template.columns(); ++idx) { |
591 | 82 | const auto& file_block_column = _data_reader.file_block_layout[idx]; |
592 | 82 | const auto& column_with_type = _data_reader.block_template.get_by_position(idx); |
593 | 82 | const auto* column = column_with_type.column.get(); |
594 | 82 | try { |
595 | 82 | if (column == nullptr) { |
596 | 0 | auto st = Status::InternalError( |
597 | 0 | "Invalid file block column {} at {}: file_column_id={}, name='{}', " |
598 | 0 | "type={}, column=null, expected_rows={}, reader={}", |
599 | 0 | idx, stage, file_block_column.file_column_id.value(), |
600 | 0 | file_block_column.name, |
601 | 0 | file_block_column.type == nullptr ? "null" |
602 | 0 | : file_block_column.type->get_name(), |
603 | 0 | rows, debug_string()); |
604 | 0 | LOG(WARNING) << st; |
605 | 0 | return st; |
606 | 0 | } |
607 | 82 | column->sanity_check(); |
608 | 82 | auto st = column_with_type.check_type_and_column_match(); |
609 | 82 | if (!st.ok()) { |
610 | 0 | auto contextual_status = Status::InternalError( |
611 | 0 | "Invalid file block column {} at {}: file_column_id={}, name='{}', " |
612 | 0 | "type={}, column={}, column_size={}, expected_rows={}, error={}, " |
613 | 0 | "reader={}", |
614 | 0 | idx, stage, file_block_column.file_column_id.value(), |
615 | 0 | file_block_column.name, |
616 | 0 | file_block_column.type == nullptr ? "null" |
617 | 0 | : file_block_column.type->get_name(), |
618 | 0 | column->get_name(), column->size(), rows, st.to_string(), |
619 | 0 | debug_string()); |
620 | 0 | LOG(WARNING) << contextual_status; |
621 | 0 | return contextual_status; |
622 | 0 | } |
623 | 82 | } catch (const Exception& e) { |
624 | 0 | auto st = Status::InternalError( |
625 | 0 | "Invalid file block column {} at {}: file_column_id={}, name='{}', " |
626 | 0 | "type={}, column={}, column_size={}, expected_rows={}, error={}, " |
627 | 0 | "reader={}", |
628 | 0 | idx, stage, file_block_column.file_column_id.value(), |
629 | 0 | file_block_column.name, |
630 | 0 | file_block_column.type == nullptr ? "null" |
631 | 0 | : file_block_column.type->get_name(), |
632 | 0 | column == nullptr ? "null" : column->get_name(), |
633 | 0 | column == nullptr ? 0 : column->size(), rows, e.to_string(), |
634 | 0 | debug_string()); |
635 | 0 | LOG(WARNING) << st; |
636 | 0 | return st; |
637 | 0 | } catch (const std::exception& e) { |
638 | 0 | auto st = Status::InternalError( |
639 | 0 | "Invalid file block column {} at {}: file_column_id={}, name='{}', " |
640 | 0 | "type={}, column={}, column_size={}, expected_rows={}, error={}, " |
641 | 0 | "reader={}", |
642 | 0 | idx, stage, file_block_column.file_column_id.value(), |
643 | 0 | file_block_column.name, |
644 | 0 | file_block_column.type == nullptr ? "null" |
645 | 0 | : file_block_column.type->get_name(), |
646 | 0 | column == nullptr ? "null" : column->get_name(), |
647 | 0 | column == nullptr ? 0 : column->size(), rows, e.what(), debug_string()); |
648 | 0 | LOG(WARNING) << st; |
649 | 0 | return st; |
650 | 0 | } |
651 | 82 | } |
652 | 56 | return Status::OK(); |
653 | 56 | } |
654 | | |
655 | 56 | Status _check_table_block_columns(std::string_view stage, const Block* block, size_t rows) { |
656 | 56 | DORIS_CHECK(block != nullptr); |
657 | 56 | DORIS_CHECK(block->columns() == _data_reader.column_mapper->mappings().size()); |
658 | 140 | for (size_t idx = 0; idx < block->columns(); ++idx) { |
659 | 84 | const auto& mapping = _data_reader.column_mapper->mappings()[idx]; |
660 | 84 | const auto& column_with_type = block->get_by_position(idx); |
661 | 84 | const auto* column = column_with_type.column.get(); |
662 | 84 | try { |
663 | 84 | if (column == nullptr) { |
664 | 0 | auto st = Status::InternalError( |
665 | 0 | "Invalid table block column {} at {}: table_column='{}', " |
666 | 0 | "global_index={}, type={}, column=null, expected_rows={}, mapping={}", |
667 | 0 | idx, stage, mapping.table_column_name, mapping.global_index.value(), |
668 | 0 | mapping.table_type == nullptr ? "null" : mapping.table_type->get_name(), |
669 | 0 | rows, mapping.debug_string()); |
670 | 0 | LOG(WARNING) << st; |
671 | 0 | return st; |
672 | 0 | } |
673 | 84 | column->sanity_check(); |
674 | 84 | auto st = column_with_type.check_type_and_column_match(); |
675 | 84 | if (!st.ok()) { |
676 | 0 | auto contextual_status = Status::InternalError( |
677 | 0 | "Invalid table block column {} at {}: table_column='{}', " |
678 | 0 | "global_index={}, type={}, column={}, column_size={}, " |
679 | 0 | "expected_rows={}, error={}, mapping={}", |
680 | 0 | idx, stage, mapping.table_column_name, mapping.global_index.value(), |
681 | 0 | mapping.table_type == nullptr ? "null" : mapping.table_type->get_name(), |
682 | 0 | column->get_name(), column->size(), rows, st.to_string(), |
683 | 0 | mapping.debug_string()); |
684 | 0 | LOG(WARNING) << contextual_status; |
685 | 0 | return contextual_status; |
686 | 0 | } |
687 | 84 | } catch (const Exception& e) { |
688 | 0 | auto st = Status::InternalError( |
689 | 0 | "Invalid table block column {} at {}: table_column='{}', global_index={}, " |
690 | 0 | "type={}, column={}, column_size={}, expected_rows={}, error={}, " |
691 | 0 | "mapping={}", |
692 | 0 | idx, stage, mapping.table_column_name, mapping.global_index.value(), |
693 | 0 | mapping.table_type == nullptr ? "null" : mapping.table_type->get_name(), |
694 | 0 | column == nullptr ? "null" : column->get_name(), |
695 | 0 | column == nullptr ? 0 : column->size(), rows, e.to_string(), |
696 | 0 | mapping.debug_string()); |
697 | 0 | LOG(WARNING) << st; |
698 | 0 | return st; |
699 | 0 | } catch (const std::exception& e) { |
700 | 0 | auto st = Status::InternalError( |
701 | 0 | "Invalid table block column {} at {}: table_column='{}', global_index={}, " |
702 | 0 | "type={}, column={}, column_size={}, expected_rows={}, error={}, " |
703 | 0 | "mapping={}", |
704 | 0 | idx, stage, mapping.table_column_name, mapping.global_index.value(), |
705 | 0 | mapping.table_type == nullptr ? "null" : mapping.table_type->get_name(), |
706 | 0 | column == nullptr ? "null" : column->get_name(), |
707 | 0 | column == nullptr ? 0 : column->size(), rows, e.what(), |
708 | 0 | mapping.debug_string()); |
709 | 0 | LOG(WARNING) << st; |
710 | 0 | return st; |
711 | 0 | } |
712 | 84 | } |
713 | 56 | return Status::OK(); |
714 | 56 | } |
715 | | #endif |
716 | | |
717 | 56 | Status _truncate_char_or_varchar_columns(Block* block) { |
718 | 56 | DORIS_CHECK(block != nullptr); |
719 | 56 | if (_runtime_state == nullptr || |
720 | 56 | !_runtime_state->query_options().truncate_char_or_varchar_columns) { |
721 | 56 | return Status::OK(); |
722 | 56 | } |
723 | 0 | DORIS_CHECK(block->columns() == _data_reader.column_mapper->mappings().size()); |
724 | 0 | for (size_t idx = 0; idx < _data_reader.column_mapper->mappings().size(); ++idx) { |
725 | 0 | const auto& mapping = _data_reader.column_mapper->mappings()[idx]; |
726 | 0 | if (!_should_truncate_char_or_varchar_column(mapping)) { |
727 | 0 | continue; |
728 | 0 | } |
729 | 0 | const auto target_len = |
730 | 0 | assert_cast<const DataTypeString*>(remove_nullable(mapping.table_type).get()) |
731 | 0 | ->len(); |
732 | 0 | _truncate_char_or_varchar_column(block, idx, target_len); |
733 | 0 | } |
734 | 0 | return Status::OK(); |
735 | 56 | } |
736 | | |
737 | | // Return true when the table schema has a bounded CHAR/VARCHAR length that is stricter than |
738 | | // the file-side type. Examples: |
739 | | // - table VARCHAR(10), file VARCHAR(20): truncate to 10; |
740 | | // - table VARCHAR(10), file STRING: truncate to 10 because STRING has no declared bound; |
741 | | // - table STRING, any file type: no truncation because the target has no bound. |
742 | 5 | static bool _should_truncate_char_or_varchar_column(const ColumnMapping& mapping) { |
743 | 5 | if (mapping.table_type == nullptr) { |
744 | 0 | return false; |
745 | 0 | } |
746 | 5 | const auto table_type = remove_nullable(mapping.table_type); |
747 | 5 | const auto primitive_type = table_type->get_primitive_type(); |
748 | 5 | if (primitive_type != TYPE_VARCHAR && primitive_type != TYPE_CHAR) { |
749 | 1 | return false; |
750 | 1 | } |
751 | 4 | const auto target_len = assert_cast<const DataTypeString*>(table_type.get())->len(); |
752 | 4 | if (target_len <= 0) { |
753 | 0 | return false; |
754 | 0 | } |
755 | 4 | if (mapping.file_type == nullptr) { |
756 | 0 | return true; |
757 | 0 | } |
758 | 4 | const auto file_type = remove_nullable(mapping.file_type); |
759 | 4 | DORIS_CHECK(file_type != nullptr); |
760 | 4 | int file_len = -1; |
761 | 4 | if (file_type->get_primitive_type() == TYPE_VARCHAR || |
762 | 4 | file_type->get_primitive_type() == TYPE_CHAR || |
763 | 4 | file_type->get_primitive_type() == TYPE_STRING) { |
764 | 3 | file_len = assert_cast<const DataTypeString*>(file_type.get())->len(); |
765 | 3 | } |
766 | | |
767 | 4 | return file_len < 0 || target_len < file_len; |
768 | 4 | } |
769 | | |
770 | | // Truncate a materialized CHAR/VARCHAR column in place by reusing the vectorized substring |
771 | | // implementation: substring(column, 1, len). Nullable columns are unwrapped before substring |
772 | | // execution and wrapped back with the original null map afterward, because substring operates |
773 | | // on the nested string payload only. |
774 | 1 | static void _truncate_char_or_varchar_column(Block* block, size_t idx, int len) { |
775 | 1 | DORIS_CHECK(block != nullptr); |
776 | 1 | auto int_type = std::make_shared<DataTypeInt32>(); |
777 | 1 | const auto num_columns_without_result = cast_set<uint32_t>(block->columns()); |
778 | 1 | auto& target = block->get_by_position(idx); |
779 | 1 | const bool is_nullable = target.type->is_nullable(); |
780 | 1 | ColumnPtr input_column = target.column; |
781 | 1 | ColumnPtr null_map_column; |
782 | 1 | if (is_nullable) { |
783 | 1 | const auto* nullable_column = assert_cast<const ColumnNullable*>(target.column.get()); |
784 | 1 | input_column = nullable_column->get_nested_column_ptr(); |
785 | 1 | null_map_column = nullable_column->get_null_map_column_ptr(); |
786 | 1 | } |
787 | 1 | block->replace_by_position(idx, std::move(input_column)); |
788 | 1 | block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(1)), |
789 | 1 | int_type, "const 1"}); |
790 | 1 | block->insert({int_type->create_column_const(block->rows(), to_field<TYPE_INT>(len)), |
791 | 1 | int_type, "const len"}); |
792 | 1 | block->insert({nullptr, std::make_shared<DataTypeString>(), "result"}); |
793 | | |
794 | 1 | ColumnNumbers temp_arguments(3); |
795 | 1 | temp_arguments[0] = cast_set<uint32_t>(idx); |
796 | 1 | temp_arguments[1] = num_columns_without_result; |
797 | 1 | temp_arguments[2] = num_columns_without_result + 1; |
798 | 1 | const uint32_t result_column_id = num_columns_without_result + 2; |
799 | 1 | SubstringUtil::substring_execute(*block, temp_arguments, result_column_id, block->rows()); |
800 | | |
801 | 1 | ColumnPtr result_column = block->get_by_position(result_column_id).column; |
802 | 1 | if (is_nullable) { |
803 | 1 | result_column = ColumnNullable::create(std::move(result_column), null_map_column); |
804 | 1 | } |
805 | 1 | block->replace_by_position(idx, std::move(result_column)); |
806 | 1 | block->erase_tail(num_columns_without_result); |
807 | 1 | } |
808 | | |
809 | 63 | Status _try_materialize_aggregate_pushdown_rows(Block* block, bool* pushed_down) { |
810 | 63 | DORIS_CHECK(block != nullptr); |
811 | 63 | DORIS_CHECK(pushed_down != nullptr); |
812 | 63 | *pushed_down = false; |
813 | 63 | block->clear_column_data(_projected_columns.size()); |
814 | 63 | _aggregate_pushdown_tried = true; |
815 | 63 | if (!_supports_aggregate_pushdown(_push_down_agg_type)) { |
816 | 56 | return Status::OK(); |
817 | 56 | } |
818 | | |
819 | 7 | FileAggregateRequest file_request; |
820 | 7 | RETURN_IF_ERROR(_build_file_aggregate_request(_push_down_agg_type, &file_request)); |
821 | 7 | FileAggregateResult file_result; |
822 | 7 | const auto status = _data_reader.reader->get_aggregate_result(file_request, &file_result); |
823 | 7 | if (status.is<ErrorCode::NOT_IMPLEMENTED_ERROR>()) { |
824 | 1 | return Status::OK(); |
825 | 1 | } |
826 | 6 | RETURN_IF_ERROR(status); |
827 | 6 | RETURN_IF_ERROR( |
828 | 6 | _materialize_aggregate_pushdown_rows(_push_down_agg_type, file_result, block)); |
829 | 6 | *pushed_down = true; |
830 | 6 | RETURN_IF_ERROR(close_current_reader()); |
831 | 6 | return Status::OK(); |
832 | 6 | } |
833 | | |
834 | 70 | virtual bool _supports_aggregate_pushdown(TPushAggOp::type agg_type) const { |
835 | | // Only COUNT and MIN/MAX can be push down. |
836 | 70 | if (agg_type != TPushAggOp::type::COUNT && agg_type != TPushAggOp::type::MINMAX) { |
837 | 48 | return false; |
838 | 48 | } |
839 | | // Only support aggregate pushdown when there is no delete, filter and column predicate, so |
840 | | // the reduced rows consumed by the upper aggregate remain semantically equivalent to a |
841 | | // normal scan. |
842 | 22 | if (_delete_rows != nullptr && !_delete_rows->empty()) { |
843 | 3 | return false; |
844 | 3 | } |
845 | 19 | if (!_table_filters.empty() || !_table_column_predicates.empty()) { |
846 | 2 | return false; |
847 | 2 | } |
848 | 17 | if (agg_type == TPushAggOp::type::COUNT) { |
849 | 5 | return true; |
850 | 5 | } |
851 | | // For MIN/MAX, only support direct file-to-table column mappings. The two emitted rows |
852 | | // must be enough for the upper MIN/MAX aggregate without evaluating default expressions or |
853 | | // virtual columns. |
854 | 14 | for (const auto& mapping : _data_reader.column_mapper->mappings()) { |
855 | 14 | if (!mapping.file_local_id.has_value() || |
856 | 14 | mapping.virtual_column_type != TableVirtualColumnType::INVALID || |
857 | 14 | mapping.default_expr != nullptr || mapping.file_type == nullptr || |
858 | 14 | mapping.table_type == nullptr) { |
859 | 1 | return false; |
860 | 1 | } |
861 | 13 | if (!_can_push_down_minmax_for_mapping(mapping)) { |
862 | 1 | return false; |
863 | 1 | } |
864 | 13 | } |
865 | 10 | return true; |
866 | 12 | } |
867 | | |
868 | 84 | static ColumnPtr _detach_column(ColumnPtr column) { |
869 | 84 | DORIS_CHECK(column.get() != nullptr); |
870 | 84 | return IColumn::mutate(std::move(column)); |
871 | 84 | } |
872 | | |
873 | 58 | static Status _align_column_nullability(ColumnPtr* column, const DataTypePtr& table_type) { |
874 | 58 | DORIS_CHECK(column != nullptr); |
875 | 58 | DORIS_CHECK(column->get() != nullptr); |
876 | 58 | DORIS_CHECK(table_type != nullptr); |
877 | | // Must return non-const column |
878 | 58 | *column = (*column)->convert_to_full_column_if_const(); |
879 | 58 | if (table_type->is_nullable()) { |
880 | 23 | const auto& nested_type = |
881 | 23 | assert_cast<const DataTypeNullable&>(*table_type).get_nested_type(); |
882 | 23 | if (!(*column)->is_nullable()) { |
883 | 2 | RETURN_IF_ERROR(_align_column_nullability(column, nested_type)); |
884 | 2 | *column = make_nullable(*column); |
885 | 2 | return Status::OK(); |
886 | 2 | } |
887 | 21 | const auto& nullable_column = assert_cast<const ColumnNullable&>(**column); |
888 | 21 | ColumnPtr nested_column = nullable_column.get_nested_column_ptr(); |
889 | 21 | RETURN_IF_ERROR(_align_column_nullability(&nested_column, nested_type)); |
890 | 21 | *column = ColumnNullable::create(nested_column, |
891 | 21 | nullable_column.get_null_map_column_ptr()); |
892 | 21 | return Status::OK(); |
893 | 21 | } |
894 | 35 | if ((*column)->is_nullable()) { |
895 | 0 | const auto& nullable_column = assert_cast<const ColumnNullable&>(**column); |
896 | 0 | if (nullable_column.has_null()) { |
897 | 0 | return Status::InternalError( |
898 | 0 | "Default expression produced NULL for non-nullable table column"); |
899 | 0 | } |
900 | 0 | ColumnPtr nested_column = nullable_column.get_nested_column_ptr(); |
901 | 0 | RETURN_IF_ERROR(_align_column_nullability(&nested_column, table_type)); |
902 | 0 | *column = nested_column; |
903 | 0 | return Status::OK(); |
904 | 0 | } |
905 | 35 | if (const auto* array_type = typeid_cast<const DataTypeArray*>(table_type.get())) { |
906 | 1 | const auto& array_column = assert_cast<const ColumnArray&>(**column); |
907 | 1 | ColumnPtr nested_column = array_column.get_data_ptr(); |
908 | 1 | RETURN_IF_ERROR( |
909 | 1 | _align_column_nullability(&nested_column, array_type->get_nested_type())); |
910 | 1 | *column = ColumnArray::create(nested_column, array_column.get_offsets_ptr()); |
911 | 1 | return Status::OK(); |
912 | 1 | } |
913 | 34 | if (const auto* map_type = typeid_cast<const DataTypeMap*>(table_type.get())) { |
914 | 0 | const auto& map_column = assert_cast<const ColumnMap&>(**column); |
915 | 0 | ColumnPtr key_column = map_column.get_keys_ptr(); |
916 | 0 | ColumnPtr value_column = map_column.get_values_ptr(); |
917 | 0 | RETURN_IF_ERROR(_align_column_nullability(&key_column, map_type->get_key_type())); |
918 | 0 | RETURN_IF_ERROR(_align_column_nullability(&value_column, map_type->get_value_type())); |
919 | 0 | *column = ColumnMap::create(key_column, value_column, map_column.get_offsets_ptr()); |
920 | 0 | return Status::OK(); |
921 | 0 | } |
922 | 34 | if (const auto* struct_type = typeid_cast<const DataTypeStruct*>(table_type.get())) { |
923 | 5 | const auto& struct_column = assert_cast<const ColumnStruct&>(**column); |
924 | 5 | Columns columns = struct_column.get_columns_copy(); |
925 | 5 | DORIS_CHECK(columns.size() == struct_type->get_elements().size()); |
926 | 16 | for (size_t i = 0; i < columns.size(); ++i) { |
927 | 11 | RETURN_IF_ERROR( |
928 | 11 | _align_column_nullability(&columns[i], struct_type->get_element(i))); |
929 | 11 | } |
930 | 5 | *column = ColumnStruct::create(columns); |
931 | 5 | return Status::OK(); |
932 | 5 | } |
933 | 29 | return Status::OK(); |
934 | 34 | } |
935 | | |
936 | | static Status _execute_default_expr_without_root_type_check( |
937 | | const VExprContextSPtr& default_expr, const Block* block, |
938 | 5 | ColumnWithTypeAndName* result_data) { |
939 | 5 | DORIS_CHECK(default_expr != nullptr); |
940 | 5 | DORIS_CHECK(block != nullptr); |
941 | 5 | DORIS_CHECK(result_data != nullptr); |
942 | 5 | ColumnPtr result_column; |
943 | 5 | Status st; |
944 | 5 | RETURN_IF_CATCH_EXCEPTION({ |
945 | 5 | st = default_expr->root()->execute_column_impl(default_expr.get(), block, nullptr, |
946 | 5 | block->rows(), result_column); |
947 | 5 | }); |
948 | 5 | RETURN_IF_ERROR(st); |
949 | 5 | DORIS_CHECK(result_column.get() != nullptr); |
950 | 5 | if (result_column->size() != block->rows()) { |
951 | 0 | return Status::InternalError( |
952 | 0 | "Default expr {} return column size {} not equal to expected size {}", |
953 | 0 | default_expr->expr_name(), result_column->size(), block->rows()); |
954 | 0 | } |
955 | 5 | result_data->column = result_column; |
956 | 5 | result_data->type = default_expr->execute_type(block); |
957 | 5 | result_data->name = default_expr->expr_name(); |
958 | 5 | return Status::OK(); |
959 | 5 | } |
960 | | |
961 | | Status _cast_column_to_type(ColumnPtr* column, const DataTypePtr& file_type, |
962 | | const DataTypePtr& table_type, |
963 | 0 | const std::string& column_name) const { |
964 | 0 | DORIS_CHECK(column != nullptr); |
965 | 0 | DORIS_CHECK(column->get() != nullptr); |
966 | 0 | DORIS_CHECK(file_type != nullptr); |
967 | 0 | DORIS_CHECK(table_type != nullptr); |
968 | 0 | if (file_type->equals(*table_type)) { |
969 | 0 | return Status::OK(); |
970 | 0 | } |
971 | | |
972 | 0 | DataTypePtr input_type = file_type; |
973 | 0 | if ((*column)->is_nullable() && !input_type->is_nullable()) { |
974 | 0 | input_type = make_nullable(input_type); |
975 | 0 | } |
976 | 0 | Block cast_block; |
977 | 0 | cast_block.insert({*column, input_type, column_name}); |
978 | 0 | auto slot_ref = VSlotRef::create_shared(0, 0, -1, input_type, column_name); |
979 | 0 | auto cast_expr = Cast::create_shared(table_type); |
980 | 0 | cast_expr->add_child(std::move(slot_ref)); |
981 | 0 | auto cast_ctx = VExprContext::create_shared(std::move(cast_expr)); |
982 | 0 | RowDescriptor row_desc; |
983 | 0 | RETURN_IF_ERROR(cast_ctx->prepare(_runtime_state, row_desc)); |
984 | 0 | RETURN_IF_ERROR(cast_ctx->open(_runtime_state)); |
985 | 0 | ColumnPtr cast_column; |
986 | 0 | RETURN_IF_ERROR(cast_ctx->execute(&cast_block, cast_column)); |
987 | 0 | *column = std::move(cast_column); |
988 | 0 | return Status::OK(); |
989 | 0 | } |
990 | | |
991 | | Status _materialize_present_child_mapping_column(const ColumnMapping& mapping, |
992 | | const ColumnPtr& file_column, |
993 | 18 | const size_t rows, ColumnPtr* column) { |
994 | 18 | DORIS_CHECK(column != nullptr); |
995 | 18 | DORIS_CHECK(mapping.file_type != nullptr); |
996 | 18 | DORIS_CHECK(mapping.table_type != nullptr); |
997 | 18 | *column = file_column; |
998 | 18 | if (!mapping.is_trivial) { |
999 | 5 | if (!mapping.child_mappings.empty()) { |
1000 | 5 | RETURN_IF_ERROR( |
1001 | 5 | _materialize_complex_mapping_column(mapping, *column, rows, column)); |
1002 | 5 | } else { |
1003 | 0 | RETURN_IF_ERROR(_cast_column_to_type(column, mapping.file_type, mapping.table_type, |
1004 | 0 | mapping.file_column_name)); |
1005 | 0 | } |
1006 | 5 | } |
1007 | 18 | RETURN_IF_ERROR(_align_column_nullability(column, mapping.table_type)); |
1008 | 18 | return Status::OK(); |
1009 | 18 | } |
1010 | | |
1011 | | Status _materialize_mapping_column(const ColumnMapping& mapping, Block* current_block, |
1012 | 89 | const size_t rows, ColumnPtr* column) { |
1013 | 89 | if (!mapping.is_trivial && mapping.file_local_id.has_value() && |
1014 | 89 | !mapping.child_mappings.empty()) { |
1015 | 5 | DCHECK(mapping.projection != nullptr); |
1016 | 5 | int res_id; |
1017 | 5 | auto st = mapping.projection->execute(current_block, &res_id); |
1018 | 5 | if (!st.ok()) { |
1019 | 0 | return Status::InternalError( |
1020 | 0 | "Failed to execute complex mapping projection for table column '{}' " |
1021 | 0 | "(global_index={}, file_local_id={}, rows={}): {}, mapping={}", |
1022 | 0 | mapping.table_column_name, mapping.global_index.value(), |
1023 | 0 | *mapping.file_local_id, rows, st.to_string(), mapping.debug_string()); |
1024 | 0 | } |
1025 | 5 | ColumnPtr result_column = current_block->get_by_position(res_id).column; |
1026 | 5 | RETURN_IF_ERROR( |
1027 | 5 | _materialize_complex_mapping_column(mapping, result_column, rows, column)); |
1028 | 5 | return Status::OK(); |
1029 | 5 | } |
1030 | 84 | if (mapping.projection != nullptr) { |
1031 | 66 | int res_id; |
1032 | 66 | auto st = mapping.projection->execute(current_block, &res_id); |
1033 | 66 | if (!st.ok()) { |
1034 | 0 | std::string file_local_id = "null"; |
1035 | 0 | if (mapping.file_local_id.has_value()) { |
1036 | 0 | file_local_id = std::to_string(*mapping.file_local_id); |
1037 | 0 | } |
1038 | 0 | return Status::InternalError( |
1039 | 0 | "Failed to execute mapping projection for table column '{}' " |
1040 | 0 | "(global_index={}, file_local_id={}, rows={}): {}, mapping={}", |
1041 | 0 | mapping.table_column_name, mapping.global_index.value(), file_local_id, |
1042 | 0 | rows, st.to_string(), mapping.debug_string()); |
1043 | 0 | } |
1044 | 66 | ColumnPtr result_column = current_block->get_by_position(res_id).column; |
1045 | 66 | *column = _detach_column(std::move(result_column)); |
1046 | 66 | return Status::OK(); |
1047 | 66 | } |
1048 | 18 | if (mapping.default_expr != nullptr) { |
1049 | 5 | if (current_block->rows() == rows) { |
1050 | 0 | ColumnWithTypeAndName result; |
1051 | 0 | RETURN_IF_ERROR(_execute_default_expr_without_root_type_check( |
1052 | 0 | mapping.default_expr, current_block, &result)); |
1053 | 0 | ColumnPtr result_column = result.column; |
1054 | 0 | RETURN_IF_ERROR(_align_column_nullability(&result_column, mapping.table_type)); |
1055 | 0 | *column = _detach_column(std::move(result_column)); |
1056 | 5 | } else { |
1057 | 5 | DORIS_CHECK(mapping.constant_index.has_value()); |
1058 | 5 | Block eval_block; |
1059 | 5 | eval_block.insert({mapping.table_type->create_column_const_with_default_value(rows), |
1060 | 5 | mapping.table_type, "__table_reader_const_rows"}); |
1061 | 5 | ColumnWithTypeAndName result; |
1062 | 5 | RETURN_IF_ERROR(_execute_default_expr_without_root_type_check( |
1063 | 5 | mapping.default_expr, &eval_block, &result)); |
1064 | 5 | ColumnPtr result_column = result.column; |
1065 | 5 | RETURN_IF_ERROR(_align_column_nullability(&result_column, mapping.table_type)); |
1066 | 5 | *column = _detach_column(std::move(result_column)); |
1067 | 5 | } |
1068 | 5 | return Status::OK(); |
1069 | 5 | } |
1070 | 13 | ColumnPtr result_column = mapping.table_type->create_column_const_with_default_value(rows); |
1071 | 13 | *column = _detach_column(std::move(result_column)); |
1072 | 13 | return Status::OK(); |
1073 | 18 | } |
1074 | | |
1075 | | Status _materialize_complex_mapping_column(const ColumnMapping& mapping, |
1076 | | const ColumnPtr& file_column, const size_t rows, |
1077 | 10 | ColumnPtr* column) { |
1078 | 10 | DORIS_CHECK(mapping.table_type != nullptr); |
1079 | 10 | DORIS_CHECK(file_column.get() != nullptr); |
1080 | 10 | const auto table_type = remove_nullable(mapping.table_type); |
1081 | 10 | switch (table_type->get_primitive_type()) { |
1082 | 7 | case TYPE_STRUCT: |
1083 | 7 | RETURN_IF_ERROR(_materialize_struct_mapping_column(mapping, file_column, rows, column)); |
1084 | 7 | break; |
1085 | 7 | case TYPE_ARRAY: |
1086 | 2 | RETURN_IF_ERROR(_materialize_array_mapping_column(mapping, file_column, rows, column)); |
1087 | 2 | break; |
1088 | 2 | case TYPE_MAP: |
1089 | 1 | RETURN_IF_ERROR(_materialize_map_mapping_column(mapping, file_column, rows, column)); |
1090 | 1 | break; |
1091 | 1 | default: |
1092 | 0 | *column = _detach_column(file_column); |
1093 | 0 | break; |
1094 | 10 | } |
1095 | 10 | return Status::OK(); |
1096 | 10 | } |
1097 | | |
1098 | | static std::vector<const ColumnMapping*> _present_child_mappings_in_file_order( |
1099 | 7 | const std::vector<ColumnMapping>& child_mappings) { |
1100 | 7 | std::vector<const ColumnMapping*> result; |
1101 | 7 | result.reserve(child_mappings.size()); |
1102 | 16 | for (const auto& child_mapping : child_mappings) { |
1103 | 16 | if (child_mapping.file_local_id.has_value()) { |
1104 | 11 | result.push_back(&child_mapping); |
1105 | 11 | } |
1106 | 16 | } |
1107 | 7 | std::ranges::sort(result, [](const ColumnMapping* lhs, const ColumnMapping* rhs) { |
1108 | 6 | DORIS_CHECK(lhs->file_local_id.has_value()); |
1109 | 6 | DORIS_CHECK(rhs->file_local_id.has_value()); |
1110 | 6 | return *lhs->file_local_id < *rhs->file_local_id; |
1111 | 6 | }); |
1112 | 7 | return result; |
1113 | 7 | } |
1114 | | |
1115 | | static size_t _file_child_ordinal_for_mapping( |
1116 | | const ColumnMapping& mapping, const ColumnMapping& child_mapping, |
1117 | 11 | const std::vector<const ColumnMapping*>& file_ordered_children) { |
1118 | 11 | DORIS_CHECK(child_mapping.file_local_id.has_value()); |
1119 | 11 | if (!mapping.projected_file_children.empty()) { |
1120 | 7 | const auto child_it = std::ranges::find_if( |
1121 | 10 | mapping.projected_file_children, [&](const ColumnDefinition& file_child) { |
1122 | 10 | return file_child.file_local_id() == *child_mapping.file_local_id; |
1123 | 10 | }); |
1124 | 7 | DORIS_CHECK(child_it != mapping.projected_file_children.end()); |
1125 | 7 | return static_cast<size_t>( |
1126 | 7 | std::distance(mapping.projected_file_children.begin(), child_it)); |
1127 | 7 | } |
1128 | 4 | const auto child_it = std::ranges::find(file_ordered_children, &child_mapping); |
1129 | 4 | DORIS_CHECK(child_it != file_ordered_children.end()); |
1130 | 4 | return static_cast<size_t>(std::distance(file_ordered_children.begin(), child_it)); |
1131 | 11 | } |
1132 | | |
1133 | | static std::vector<const ColumnMapping*> _child_mappings_in_table_type_order( |
1134 | 7 | const ColumnMapping& mapping, const DataTypeStruct& table_type) { |
1135 | 7 | std::vector<const ColumnMapping*> result; |
1136 | 7 | result.reserve(mapping.child_mappings.size()); |
1137 | 23 | for (size_t child_idx = 0; child_idx < table_type.get_elements().size(); ++child_idx) { |
1138 | 16 | const auto& child_name = table_type.get_element_name(child_idx); |
1139 | 16 | const auto child_it = std::ranges::find_if( |
1140 | 28 | mapping.child_mappings, [&](const ColumnMapping& child_mapping) { |
1141 | 28 | return child_mapping.table_column_name == child_name; |
1142 | 28 | }); |
1143 | 16 | DORIS_CHECK(child_it != mapping.child_mappings.end()) |
1144 | 0 | << mapping.debug_string() << ", table_child_name=" << child_name; |
1145 | 16 | result.push_back(&*child_it); |
1146 | 16 | } |
1147 | 7 | return result; |
1148 | 7 | } |
1149 | | |
1150 | | static const IColumn* _nested_column_if_nullable(const ColumnPtr& column, |
1151 | 12 | const NullMap** null_map) { |
1152 | 12 | DORIS_CHECK(column.get() != nullptr); |
1153 | 12 | if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) { |
1154 | 8 | if (null_map != nullptr) { |
1155 | 8 | *null_map = &nullable_column->get_null_map_data(); |
1156 | 8 | } |
1157 | 8 | return &nullable_column->get_nested_column(); |
1158 | 8 | } |
1159 | 4 | return column.get(); |
1160 | 12 | } |
1161 | | |
1162 | | Status _materialize_struct_mapping_column(const ColumnMapping& mapping, |
1163 | | const ColumnPtr& file_column, const size_t rows, |
1164 | 7 | ColumnPtr* column) { |
1165 | 7 | DORIS_CHECK(mapping.table_type != nullptr); |
1166 | 7 | const auto* table_type = |
1167 | 7 | assert_cast<const DataTypeStruct*>(remove_nullable(mapping.table_type).get()); |
1168 | 7 | const auto full_file_column = file_column->convert_to_full_column_if_const(); |
1169 | 7 | const NullMap* parent_null_map = nullptr; |
1170 | 7 | const auto* nested_file_column = |
1171 | 7 | _nested_column_if_nullable(full_file_column, &parent_null_map); |
1172 | 7 | const auto* file_struct = assert_cast<const ColumnStruct*>(nested_file_column); |
1173 | 7 | DORIS_CHECK(table_type->get_elements().size() == mapping.child_mappings.size()); |
1174 | | |
1175 | 7 | Columns child_columns; |
1176 | 7 | child_columns.reserve(mapping.child_mappings.size()); |
1177 | 7 | const auto file_ordered_children = |
1178 | 7 | _present_child_mappings_in_file_order(mapping.child_mappings); |
1179 | 7 | const auto table_ordered_children = |
1180 | 7 | _child_mappings_in_table_type_order(mapping, *table_type); |
1181 | 16 | for (const auto* child_mapping : table_ordered_children) { |
1182 | 16 | DORIS_CHECK(child_mapping != nullptr); |
1183 | 16 | if (!child_mapping->file_local_id.has_value()) { |
1184 | 5 | child_columns.push_back( |
1185 | 5 | child_mapping->table_type->create_column_const_with_default_value(rows) |
1186 | 5 | ->convert_to_full_column_if_const()); |
1187 | 5 | continue; |
1188 | 5 | } |
1189 | 11 | const auto file_child_idx = |
1190 | 11 | _file_child_ordinal_for_mapping(mapping, *child_mapping, file_ordered_children); |
1191 | 11 | DORIS_CHECK(file_child_idx < file_struct->get_columns().size()); |
1192 | 11 | ColumnPtr child_column = file_struct->get_column_ptr(file_child_idx); |
1193 | 11 | RETURN_IF_ERROR(_materialize_present_child_mapping_column(*child_mapping, child_column, |
1194 | 11 | rows, &child_column)); |
1195 | 11 | child_columns.push_back(std::move(child_column)); |
1196 | 11 | } |
1197 | 7 | MutableColumns mutable_child_columns; |
1198 | 7 | mutable_child_columns.reserve(child_columns.size()); |
1199 | 16 | for (auto& child_column : child_columns) { |
1200 | 16 | mutable_child_columns.push_back(IColumn::mutate(std::move(child_column))); |
1201 | 16 | } |
1202 | 7 | auto result = ColumnStruct::create(std::move(mutable_child_columns)); |
1203 | 7 | if (mapping.table_type->is_nullable()) { |
1204 | 5 | auto null_map = ColumnUInt8::create(); |
1205 | 5 | auto& null_map_data = null_map->get_data(); |
1206 | 5 | null_map_data.resize(rows); |
1207 | 5 | if (parent_null_map != nullptr) { |
1208 | 5 | DORIS_CHECK(parent_null_map->size() == rows); |
1209 | 5 | null_map_data.assign(parent_null_map->begin(), parent_null_map->end()); |
1210 | 5 | } else { |
1211 | 0 | std::fill(null_map_data.begin(), null_map_data.end(), 0); |
1212 | 0 | } |
1213 | 5 | *column = ColumnNullable::create(std::move(result), std::move(null_map)); |
1214 | 5 | } else { |
1215 | 2 | *column = std::move(result); |
1216 | 2 | } |
1217 | 7 | return Status::OK(); |
1218 | 7 | } |
1219 | | |
1220 | | Status _materialize_array_mapping_column(const ColumnMapping& mapping, |
1221 | | const ColumnPtr& file_column, const size_t rows, |
1222 | 2 | ColumnPtr* column) { |
1223 | 2 | DORIS_CHECK(mapping.child_mappings.size() == 1); |
1224 | 2 | const auto full_file_column = file_column->convert_to_full_column_if_const(); |
1225 | 2 | const NullMap* parent_null_map = nullptr; |
1226 | 2 | const auto* nested_file_column = |
1227 | 2 | _nested_column_if_nullable(full_file_column, &parent_null_map); |
1228 | 2 | const auto* file_array = assert_cast<const ColumnArray*>(nested_file_column); |
1229 | 2 | ColumnPtr nested_column = file_array->get_data_ptr(); |
1230 | 2 | const auto& element_mapping = mapping.child_mappings[0]; |
1231 | 2 | RETURN_IF_ERROR(_materialize_present_child_mapping_column( |
1232 | 2 | element_mapping, nested_column, nested_column->size(), &nested_column)); |
1233 | 2 | auto offsets_column = file_array->get_offsets_ptr()->convert_to_full_column_if_const(); |
1234 | 2 | auto result = ColumnArray::create(IColumn::mutate(std::move(nested_column)), |
1235 | 2 | IColumn::mutate(std::move(offsets_column))); |
1236 | 2 | if (mapping.table_type->is_nullable()) { |
1237 | 2 | auto null_map = ColumnUInt8::create(); |
1238 | 2 | auto& null_map_data = null_map->get_data(); |
1239 | 2 | null_map_data.resize(rows); |
1240 | 2 | if (parent_null_map != nullptr) { |
1241 | 2 | DORIS_CHECK(parent_null_map->size() == rows); |
1242 | 2 | null_map_data.assign(parent_null_map->begin(), parent_null_map->end()); |
1243 | 2 | } else { |
1244 | 0 | std::fill(null_map_data.begin(), null_map_data.end(), 0); |
1245 | 0 | } |
1246 | 2 | *column = ColumnNullable::create(std::move(result), std::move(null_map)); |
1247 | 2 | } else { |
1248 | 0 | *column = std::move(result); |
1249 | 0 | } |
1250 | 2 | return Status::OK(); |
1251 | 2 | } |
1252 | | |
1253 | | Status _materialize_map_mapping_column(const ColumnMapping& mapping, |
1254 | | const ColumnPtr& file_column, const size_t rows, |
1255 | 3 | ColumnPtr* column) { |
1256 | 3 | const auto full_file_column = file_column->convert_to_full_column_if_const(); |
1257 | 3 | const NullMap* parent_null_map = nullptr; |
1258 | 3 | const auto* nested_file_column = |
1259 | 3 | _nested_column_if_nullable(full_file_column, &parent_null_map); |
1260 | 3 | const auto* file_map = assert_cast<const ColumnMap*>(nested_file_column); |
1261 | 3 | ColumnPtr key_column = file_map->get_keys_ptr(); |
1262 | 3 | ColumnPtr value_column = file_map->get_values_ptr(); |
1263 | | |
1264 | 3 | const ColumnMapping* key_mapping = nullptr; |
1265 | 3 | const ColumnMapping* value_mapping = nullptr; |
1266 | 5 | for (const auto& child_mapping : mapping.child_mappings) { |
1267 | 5 | if (!child_mapping.file_local_id.has_value()) { |
1268 | 0 | continue; |
1269 | 0 | } |
1270 | 5 | if (*child_mapping.file_local_id == 0) { |
1271 | 2 | key_mapping = &child_mapping; |
1272 | 3 | } else if (*child_mapping.file_local_id == 1) { |
1273 | 3 | value_mapping = &child_mapping; |
1274 | 3 | } |
1275 | 5 | } |
1276 | | |
1277 | 3 | if (key_mapping != nullptr) { |
1278 | 2 | RETURN_IF_ERROR(_materialize_present_child_mapping_column( |
1279 | 2 | *key_mapping, key_column, key_column->size(), &key_column)); |
1280 | 2 | } |
1281 | 3 | if (value_mapping != nullptr) { |
1282 | 3 | RETURN_IF_ERROR(_materialize_present_child_mapping_column( |
1283 | 3 | *value_mapping, value_column, value_column->size(), &value_column)); |
1284 | 3 | } |
1285 | 3 | auto offsets_column = file_map->get_offsets_ptr()->convert_to_full_column_if_const(); |
1286 | 3 | auto result = ColumnMap::create(IColumn::mutate(std::move(key_column)), |
1287 | 3 | IColumn::mutate(std::move(value_column)), |
1288 | 3 | IColumn::mutate(std::move(offsets_column))); |
1289 | 3 | if (mapping.table_type->is_nullable()) { |
1290 | 1 | auto null_map = ColumnUInt8::create(); |
1291 | 1 | auto& null_map_data = null_map->get_data(); |
1292 | 1 | null_map_data.resize(rows); |
1293 | 1 | if (parent_null_map != nullptr) { |
1294 | 1 | DORIS_CHECK(parent_null_map->size() == rows); |
1295 | 1 | null_map_data.assign(parent_null_map->begin(), parent_null_map->end()); |
1296 | 1 | } else { |
1297 | 0 | std::fill(null_map_data.begin(), null_map_data.end(), 0); |
1298 | 0 | } |
1299 | 1 | *column = ColumnNullable::create(std::move(result), std::move(null_map)); |
1300 | 2 | } else { |
1301 | 2 | *column = std::move(result); |
1302 | 2 | } |
1303 | 3 | return Status::OK(); |
1304 | 3 | } |
1305 | | |
1306 | 63 | Status _open_mapping_exprs() { |
1307 | 63 | RowDescriptor row_desc; |
1308 | 93 | for (const auto& mapping : _data_reader.column_mapper->mappings()) { |
1309 | 93 | if (mapping.projection != nullptr) { |
1310 | 75 | RETURN_IF_ERROR(mapping.projection->prepare(_runtime_state, row_desc)); |
1311 | 75 | RETURN_IF_ERROR(mapping.projection->open(_runtime_state)); |
1312 | 75 | } |
1313 | 93 | if (mapping.default_expr != nullptr) { |
1314 | 5 | RETURN_IF_ERROR(mapping.default_expr->prepare(_runtime_state, row_desc)); |
1315 | 5 | RETURN_IF_ERROR(mapping.default_expr->open(_runtime_state)); |
1316 | 5 | } |
1317 | 93 | } |
1318 | 63 | return Status::OK(); |
1319 | 63 | } |
1320 | | |
1321 | | Status _build_file_aggregate_request(TPushAggOp::type agg_type, |
1322 | 7 | FileAggregateRequest* request) const { |
1323 | 7 | DORIS_CHECK(request != nullptr); |
1324 | 7 | DORIS_CHECK(_supports_aggregate_pushdown(agg_type)); |
1325 | 7 | request->agg_type = agg_type; |
1326 | 7 | request->columns.clear(); |
1327 | 7 | if (agg_type == TPushAggOp::type::COUNT) { |
1328 | 2 | return Status::OK(); |
1329 | 2 | } |
1330 | 5 | request->columns.reserve(_data_reader.column_mapper->mappings().size()); |
1331 | 6 | for (const auto& mapping : _data_reader.column_mapper->mappings()) { |
1332 | 6 | DORIS_CHECK(mapping.file_local_id.has_value()); |
1333 | 6 | FileAggregateRequest::Column column; |
1334 | 6 | column.projection = LocalColumnIndex::top_level(LocalColumnId(*mapping.file_local_id)); |
1335 | 6 | if (!mapping.child_mappings.empty()) { |
1336 | 1 | RETURN_IF_ERROR(build_aggregate_projection(mapping, &column.projection)); |
1337 | 1 | } |
1338 | 6 | request->columns.push_back(std::move(column)); |
1339 | 6 | } |
1340 | 5 | return Status::OK(); |
1341 | 5 | } |
1342 | | |
1343 | | Status _materialize_aggregate_pushdown_rows(TPushAggOp::type agg_type, |
1344 | | const FileAggregateResult& file_result, |
1345 | 6 | Block* block) { |
1346 | 6 | if (agg_type == TPushAggOp::type::COUNT) { |
1347 | | // COUNT pushdown is not a final count value. It emits `count` default rows so the |
1348 | | // upper COUNT(*) aggregate can count them and produce the final result, including |
1349 | | // zero rows when count is 0. |
1350 | 2 | DORIS_CHECK(file_result.count >= 0); |
1351 | 2 | return _materialize_count_rows(cast_set<size_t>(file_result.count), block); |
1352 | 2 | } |
1353 | | // MIN/MAX pushdown emits two rows, min first and max second, for each projected column. |
1354 | | // The upper MIN/MAX aggregate consumes those two rows to produce the final aggregate value. |
1355 | 4 | DORIS_CHECK(file_result.columns.size() == _data_reader.column_mapper->mappings().size()); |
1356 | 4 | DORIS_CHECK(block->columns() == _data_reader.column_mapper->mappings().size()); |
1357 | 4 | Block file_block; |
1358 | 4 | file_block.reserve(_data_reader.file_block_layout.size()); |
1359 | 5 | for (const auto& column : _data_reader.file_block_layout) { |
1360 | 5 | file_block.insert({column.type->create_column(), column.type, column.name}); |
1361 | 5 | } |
1362 | 9 | for (size_t column_idx = 0; column_idx < file_result.columns.size(); ++column_idx) { |
1363 | 5 | const auto& result_column = file_result.columns[column_idx]; |
1364 | 5 | if (!result_column.has_min || !result_column.has_max) { |
1365 | 0 | return Status::NotSupported("Missing min/max aggregate result for column {}", |
1366 | 0 | _projected_columns[column_idx].name); |
1367 | 0 | } |
1368 | 5 | bool found_file_column = false; |
1369 | 6 | for (size_t block_position = 0; block_position < _data_reader.file_block_layout.size(); |
1370 | 6 | ++block_position) { |
1371 | 6 | if (_data_reader.file_block_layout[block_position].file_column_id == |
1372 | 6 | file_result.columns[column_idx].projection.column_id()) { |
1373 | 5 | found_file_column = true; |
1374 | 5 | auto column = file_block.get_by_position(block_position) |
1375 | 5 | .type->create_column() |
1376 | 5 | ->assert_mutable(); |
1377 | 5 | RETURN_IF_ERROR(_insert_aggregate_projection_value( |
1378 | 5 | file_result.columns[column_idx].projection, result_column.min_value, |
1379 | 5 | column.get())); |
1380 | 5 | RETURN_IF_ERROR(_insert_aggregate_projection_value( |
1381 | 5 | file_result.columns[column_idx].projection, result_column.max_value, |
1382 | 5 | column.get())); |
1383 | 5 | file_block.replace_by_position(block_position, std::move(column)); |
1384 | 5 | break; |
1385 | 5 | } |
1386 | 6 | } |
1387 | 5 | DORIS_CHECK(found_file_column); |
1388 | 5 | } |
1389 | 9 | for (size_t column_idx = 0; column_idx < _data_reader.column_mapper->mappings().size(); |
1390 | 5 | ++column_idx) { |
1391 | 5 | ColumnPtr table_column; |
1392 | 5 | RETURN_IF_ERROR( |
1393 | 5 | _materialize_mapping_column(_data_reader.column_mapper->mappings()[column_idx], |
1394 | 5 | &file_block, 2, &table_column)); |
1395 | 5 | block->replace_by_position(column_idx, std::move(table_column)); |
1396 | 5 | } |
1397 | 4 | return Status::OK(); |
1398 | 4 | } |
1399 | | |
1400 | | struct FileBlockColumn { |
1401 | | LocalColumnId file_column_id = LocalColumnId::invalid(); |
1402 | | std::string name; |
1403 | | DataTypePtr type; |
1404 | | }; |
1405 | | |
1406 | | struct DataReader { |
1407 | | std::unique_ptr<FileReader> reader; |
1408 | | std::unique_ptr<TableColumnMapper> column_mapper; |
1409 | | // Schema of the data file, also including virtual column (row position). |
1410 | | std::vector<ColumnDefinition> file_schema; |
1411 | | // Layout of the block returned by file reader, determined by column mapping and file |
1412 | | // schema. It is used for file reader to materialize columns into correct type and position. |
1413 | | std::vector<FileBlockColumn> file_block_layout; |
1414 | | Block block_template; |
1415 | | }; |
1416 | | DataReader _data_reader; |
1417 | | std::vector<ColumnDefinition> _projected_columns; |
1418 | | std::unique_ptr<ScanTask> _current_task; |
1419 | | std::optional<io::FileDescription> _current_file_description; |
1420 | | // Range-level compression has higher priority than scan-param compression. TVF/load can keep |
1421 | | // the logical format as CSV/TEXT while carrying the concrete compression such as GZ or LZO on |
1422 | | // each TFileRangeDesc, matching the old FileScanner reader contract. |
1423 | | TFileCompressType::type _current_range_compress_type = TFileCompressType::UNKNOWN; |
1424 | | std::optional<TUniqueId> _current_range_load_id; |
1425 | | std::shared_ptr<io::FileSystemProperties> _system_properties; |
1426 | | // partition key -> value |
1427 | | std::map<std::string, Field> _partition_values; |
1428 | | // Predicates built from scan conjuncts before file-level localization. |
1429 | | std::vector<TableFilter> _table_filters; |
1430 | | TableColumnPredicates _table_column_predicates; |
1431 | | VExprContextSPtrs _conjuncts; |
1432 | | ReadProfile _profile; |
1433 | | // Parsed from row-position based delete files, including position delete and deletion vector. |
1434 | | DeleteRows* _delete_rows = nullptr; |
1435 | | TFileScanRangeParams* _scan_params; |
1436 | | std::shared_ptr<io::IOContext> _io_ctx; |
1437 | | RuntimeState* _runtime_state; |
1438 | | RuntimeProfile* _scanner_profile; |
1439 | | const std::vector<SlotDescriptor*>* _file_slot_descs = nullptr; |
1440 | | FileFormat _format; |
1441 | | TPushAggOp::type _push_down_agg_type = TPushAggOp::type::NONE; |
1442 | | uint64_t _condition_cache_digest = 0; |
1443 | | segment_v2::ConditionCache::ExternalCacheKey _condition_cache_key; |
1444 | | std::shared_ptr<std::vector<bool>> _condition_cache; |
1445 | | std::shared_ptr<ConditionCacheContext> _condition_cache_ctx; |
1446 | | int64_t _condition_cache_hit_count = 0; |
1447 | | bool _current_reader_reached_eof = false; |
1448 | | int64_t _remaining_table_level_count = -1; |
1449 | | std::optional<GlobalRowIdContext> _global_rowid_context; |
1450 | | bool _aggregate_pushdown_tried = false; |
1451 | | TableColumnMapperOptions _mapper_options; |
1452 | | |
1453 | | private: |
1454 | | static const ColumnDefinition* _find_column_definition( |
1455 | 110 | const std::vector<ColumnDefinition>& schema, LocalColumnId column_id) { |
1456 | 219 | for (const auto& field : schema) { |
1457 | 219 | if (field.file_local_id() == column_id.value()) { |
1458 | 93 | return &field; |
1459 | 93 | } |
1460 | 219 | } |
1461 | 17 | return nullptr; |
1462 | 110 | } |
1463 | | |
1464 | 15 | static bool _can_push_down_minmax_for_mapping(const ColumnMapping& mapping) { |
1465 | 15 | if (mapping.child_mappings.empty()) { |
1466 | 12 | return true; |
1467 | 12 | } |
1468 | 3 | const auto primitive_type = remove_nullable(mapping.file_type)->get_primitive_type(); |
1469 | 3 | if (primitive_type != TYPE_STRUCT) { |
1470 | 1 | return false; |
1471 | 1 | } |
1472 | 2 | size_t mapped_children = 0; |
1473 | 2 | const ColumnMapping* mapped_child = nullptr; |
1474 | 2 | for (const auto& child_mapping : mapping.child_mappings) { |
1475 | 2 | if (!child_mapping.file_local_id.has_value()) { |
1476 | 0 | continue; |
1477 | 0 | } |
1478 | 2 | ++mapped_children; |
1479 | 2 | mapped_child = &child_mapping; |
1480 | 2 | } |
1481 | 2 | return mapped_children == 1 && mapped_child != nullptr && |
1482 | 2 | _can_push_down_minmax_for_mapping(*mapped_child); |
1483 | 3 | } |
1484 | | |
1485 | | static Status build_aggregate_projection(const ColumnMapping& mapping, |
1486 | 2 | LocalColumnIndex* projection) { |
1487 | 2 | DORIS_CHECK(projection != nullptr); |
1488 | 2 | DORIS_CHECK(mapping.file_local_id.has_value()); |
1489 | 2 | *projection = LocalColumnIndex::local(*mapping.file_local_id); |
1490 | 2 | projection->children.clear(); |
1491 | 2 | projection->project_all_children = true; |
1492 | 2 | if (mapping.child_mappings.empty()) { |
1493 | 1 | return Status::OK(); |
1494 | 1 | } |
1495 | 1 | projection->project_all_children = false; |
1496 | 1 | for (const auto& child_mapping : mapping.child_mappings) { |
1497 | 1 | if (!child_mapping.file_local_id.has_value()) { |
1498 | 0 | continue; |
1499 | 0 | } |
1500 | 1 | LocalColumnIndex child_projection; |
1501 | 1 | RETURN_IF_ERROR(build_aggregate_projection(child_mapping, &child_projection)); |
1502 | 1 | projection->children.push_back(std::move(child_projection)); |
1503 | 1 | } |
1504 | 1 | DORIS_CHECK(projection->children.size() == 1); |
1505 | 1 | return Status::OK(); |
1506 | 1 | } |
1507 | | |
1508 | | static Status _insert_aggregate_projection_value(const LocalColumnIndex& projection, |
1509 | 24 | const Field& value, IColumn* column) { |
1510 | 24 | DORIS_CHECK(column != nullptr); |
1511 | 24 | if (auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) { |
1512 | 12 | RETURN_IF_ERROR(_insert_aggregate_projection_value( |
1513 | 12 | projection, value, &nullable_column->get_nested_column())); |
1514 | 12 | nullable_column->get_null_map_data().push_back(0); |
1515 | 12 | return Status::OK(); |
1516 | 12 | } |
1517 | 12 | if (projection.project_all_children || projection.children.empty()) { |
1518 | 10 | column->insert(value); |
1519 | 10 | return Status::OK(); |
1520 | 10 | } |
1521 | 2 | auto* struct_column = assert_cast<ColumnStruct*>(column); |
1522 | 2 | DORIS_CHECK(projection.children.size() == 1); |
1523 | 2 | const auto& child_projection = projection.children[0]; |
1524 | 2 | DORIS_CHECK(struct_column->get_columns().size() == 1); |
1525 | 2 | RETURN_IF_ERROR(_insert_aggregate_projection_value(child_projection, value, |
1526 | 2 | &struct_column->get_column(0))); |
1527 | 2 | return Status::OK(); |
1528 | 2 | } |
1529 | | |
1530 | | // Parse row-position deletes from table format specific parameters, and fill in _delete_rows. |
1531 | | Status _parse_delete_predicates(const SplitReadOptions& options); |
1532 | | }; |
1533 | | |
1534 | | } // namespace doris::format |