Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format_v2/column_mapper.h" |
19 | | |
20 | | #include <algorithm> |
21 | | #include <cstddef> |
22 | | #include <memory> |
23 | | #include <optional> |
24 | | #include <span> |
25 | | #include <sstream> |
26 | | #include <utility> |
27 | | #include <vector> |
28 | | |
29 | | #include "common/exception.h" |
30 | | #include "common/status.h" |
31 | | #include "core/data_type/convert_field_to_type.h" |
32 | | #include "core/data_type/data_type_nullable.h" |
33 | | #include "core/data_type/data_type_struct.h" |
34 | | #include "core/data_type/primitive_type.h" |
35 | | #include "exprs/create_predicate_function.h" |
36 | | #include "exprs/vcast_expr.h" |
37 | | #include "exprs/vcompound_pred.h" |
38 | | #include "exprs/vdirect_in_predicate.h" |
39 | | #include "exprs/vectorized_fn_call.h" |
40 | | #include "exprs/vexpr_context.h" |
41 | | #include "exprs/vin_predicate.h" |
42 | | #include "format_v2/expr/cast.h" |
43 | | #include "format_v2/expr/literal.h" |
44 | | #include "format_v2/expr/slot_ref.h" |
45 | | #include "format_v2/file_reader.h" |
46 | | #include "format_v2/schema_projection.h" |
47 | | #include "format_v2/table_reader.h" |
48 | | #include "gen_cpp/Exprs_types.h" |
49 | | #include "storage/predicate/predicate_creator.h" |
50 | | |
51 | | namespace doris::format { |
52 | | |
53 | | namespace { |
54 | | |
55 | 0 | std::string mapping_mode_to_string(TableColumnMappingMode mode) { |
56 | 0 | switch (mode) { |
57 | 0 | case TableColumnMappingMode::BY_FIELD_ID: |
58 | 0 | return "BY_FIELD_ID"; |
59 | 0 | case TableColumnMappingMode::BY_NAME: |
60 | 0 | return "BY_NAME"; |
61 | 0 | case TableColumnMappingMode::BY_INDEX: |
62 | 0 | return "BY_INDEX"; |
63 | 0 | } |
64 | 0 | return "UNKNOWN"; |
65 | 0 | } |
66 | | |
67 | 0 | bool column_has_name(const ColumnDefinition& column, const std::string& name) { |
68 | 0 | if (to_lower(column.name) == to_lower(name)) { |
69 | 0 | return true; |
70 | 0 | } |
71 | 0 | if (column.has_identifier_name() && to_lower(column.get_identifier_name()) == to_lower(name)) { |
72 | 0 | return true; |
73 | 0 | } |
74 | 0 | return std::ranges::any_of(column.name_mapping, [&](const std::string& alias) { |
75 | 0 | return to_lower(alias) == to_lower(name); |
76 | 0 | }); |
77 | 0 | } |
78 | | |
79 | 0 | bool column_names_match(const ColumnDefinition& lhs, const ColumnDefinition& rhs) { |
80 | 0 | if (column_has_name(rhs, lhs.name)) { |
81 | 0 | return true; |
82 | 0 | } |
83 | 0 | if (lhs.has_identifier_name() && column_has_name(rhs, lhs.get_identifier_name())) { |
84 | 0 | return true; |
85 | 0 | } |
86 | 0 | return std::ranges::any_of(lhs.name_mapping, |
87 | 0 | [&](const std::string& alias) { return column_has_name(rhs, alias); }); |
88 | 0 | } |
89 | | |
90 | | class ColumnMatcher { |
91 | | public: |
92 | 0 | virtual ~ColumnMatcher() = default; |
93 | | virtual const ColumnDefinition* find( |
94 | | const ColumnDefinition& table_column, |
95 | | const std::vector<ColumnDefinition>& file_schema) const = 0; |
96 | | }; |
97 | | |
98 | | class FieldIdMatcher final : public ColumnMatcher { |
99 | | public: |
100 | | const ColumnDefinition* find(const ColumnDefinition& table_column, |
101 | 0 | const std::vector<ColumnDefinition>& file_schema) const override { |
102 | 0 | if (!table_column.has_identifier_field_id()) { |
103 | 0 | return nullptr; |
104 | 0 | } |
105 | 0 | const auto field_id = table_column.get_identifier_field_id(); |
106 | 0 | const auto field_it = std::ranges::find_if(file_schema, [&](const ColumnDefinition& field) { |
107 | 0 | return field.has_identifier_field_id() && field.get_identifier_field_id() == field_id; |
108 | 0 | }); |
109 | 0 | return field_it == file_schema.end() ? nullptr : &*field_it; |
110 | 0 | } |
111 | | }; |
112 | | |
113 | | class NameMatcher final : public ColumnMatcher { |
114 | | public: |
115 | | const ColumnDefinition* find(const ColumnDefinition& table_column, |
116 | 0 | const std::vector<ColumnDefinition>& file_schema) const override { |
117 | 0 | const auto field_it = std::ranges::find_if(file_schema, [&](const ColumnDefinition& field) { |
118 | 0 | return column_names_match(table_column, field); |
119 | 0 | }); |
120 | 0 | return field_it == file_schema.end() ? nullptr : &*field_it; |
121 | 0 | } |
122 | | }; |
123 | | |
124 | | class PositionMatcher final : public ColumnMatcher { |
125 | | public: |
126 | | const ColumnDefinition* find(const ColumnDefinition& table_column, |
127 | 0 | const std::vector<ColumnDefinition>& file_schema) const override { |
128 | 0 | if (!table_column.has_identifier_field_id()) { |
129 | 0 | return nullptr; |
130 | 0 | } |
131 | 0 | const auto position = table_column.get_identifier_position(); |
132 | 0 | if (position < 0 || static_cast<size_t>(position) >= file_schema.size()) { |
133 | 0 | return nullptr; |
134 | 0 | } |
135 | 0 | return &file_schema[static_cast<size_t>(position)]; |
136 | 0 | } |
137 | | }; |
138 | | |
139 | 0 | const ColumnMatcher& matcher_for_mode(TableColumnMappingMode mode) { |
140 | 0 | static const FieldIdMatcher field_id_matcher; |
141 | 0 | static const NameMatcher name_matcher; |
142 | 0 | static const PositionMatcher position_matcher; |
143 | 0 | switch (mode) { |
144 | 0 | case TableColumnMappingMode::BY_FIELD_ID: |
145 | 0 | return field_id_matcher; |
146 | 0 | case TableColumnMappingMode::BY_NAME: |
147 | 0 | return name_matcher; |
148 | 0 | case TableColumnMappingMode::BY_INDEX: |
149 | 0 | return position_matcher; |
150 | 0 | } |
151 | 0 | return field_id_matcher; |
152 | 0 | } |
153 | | |
154 | 0 | std::string virtual_column_type_to_string(TableVirtualColumnType type) { |
155 | 0 | switch (type) { |
156 | 0 | case TableVirtualColumnType::INVALID: |
157 | 0 | return "INVALID"; |
158 | 0 | case TableVirtualColumnType::ROW_ID: |
159 | 0 | return "ROW_ID"; |
160 | 0 | case TableVirtualColumnType::LAST_UPDATED_SEQUENCE_NUMBER: |
161 | 0 | return "LAST_UPDATED_SEQUENCE_NUMBER"; |
162 | 0 | } |
163 | 0 | return "UNKNOWN"; |
164 | 0 | } |
165 | | |
166 | 0 | std::string filter_conversion_type_to_string(FilterConversionType type) { |
167 | 0 | switch (type) { |
168 | 0 | case FilterConversionType::COPY_DIRECTLY: |
169 | 0 | return "COPY_DIRECTLY"; |
170 | 0 | case FilterConversionType::CAST_FILTER: |
171 | 0 | return "CAST_FILTER"; |
172 | 0 | case FilterConversionType::READER_EXPRESSION: |
173 | 0 | return "READER_EXPRESSION"; |
174 | 0 | case FilterConversionType::FINALIZE_ONLY: |
175 | 0 | return "FINALIZE_ONLY"; |
176 | 0 | case FilterConversionType::CONSTANT: |
177 | 0 | return "CONSTANT"; |
178 | 0 | } |
179 | 0 | return "UNKNOWN"; |
180 | 0 | } |
181 | | |
182 | 0 | std::string data_type_debug_string(const DataTypePtr& type) { |
183 | 0 | return type == nullptr ? "null" : type->get_name(); |
184 | 0 | } |
185 | | |
186 | | const Field* find_partition_value(const ColumnDefinition& table_column, |
187 | 0 | const std::map<std::string, Field>& partition_values) { |
188 | 0 | const auto find_by_name = [&](const std::string& name) -> const Field* { |
189 | 0 | const auto value_it = partition_values.find(name); |
190 | 0 | return value_it == partition_values.end() ? nullptr : &value_it->second; |
191 | 0 | }; |
192 | 0 | if (const auto* value = find_by_name(table_column.name); value != nullptr) { |
193 | 0 | return value; |
194 | 0 | } |
195 | 0 | if (table_column.has_identifier_name()) { |
196 | 0 | if (const auto* value = find_by_name(table_column.get_identifier_name()); value != nullptr) { |
197 | 0 | return value; |
198 | 0 | } |
199 | 0 | } |
200 | 0 | for (const auto& alias : table_column.name_mapping) { |
201 | 0 | if (const auto* value = find_by_name(alias); value != nullptr) { |
202 | 0 | return value; |
203 | 0 | } |
204 | 0 | } |
205 | 0 | return nullptr; |
206 | 0 | } |
207 | | |
208 | 0 | std::string field_debug_string(const Field& field) { |
209 | 0 | std::ostringstream out; |
210 | 0 | out << "Field{type=" << type_to_string(field.get_type()) << ", value="; |
211 | 0 | switch (field.get_type()) { |
212 | 0 | case TYPE_NULL: |
213 | 0 | out << "null"; |
214 | 0 | break; |
215 | 0 | case TYPE_INT: |
216 | 0 | out << field.get<TYPE_INT>(); |
217 | 0 | break; |
218 | 0 | case TYPE_BIGINT: |
219 | 0 | out << field.get<TYPE_BIGINT>(); |
220 | 0 | break; |
221 | 0 | case TYPE_STRING: |
222 | 0 | out << field.get<TYPE_STRING>(); |
223 | 0 | break; |
224 | 0 | default: |
225 | 0 | out << field.to_debug_string(0); |
226 | 0 | break; |
227 | 0 | } |
228 | 0 | out << "}"; |
229 | 0 | return out.str(); |
230 | 0 | } |
231 | | |
232 | | template <typename T, typename Formatter> |
233 | 0 | std::string join_debug_strings(const std::vector<T>& values, Formatter formatter) { |
234 | 0 | std::ostringstream out; |
235 | 0 | out << "["; |
236 | 0 | for (size_t i = 0; i < values.size(); ++i) { |
237 | 0 | if (i > 0) { |
238 | 0 | out << ", "; |
239 | 0 | } |
240 | 0 | out << formatter(values[i]); |
241 | 0 | } |
242 | 0 | out << "]"; |
243 | 0 | return out.str(); |
244 | 0 | } Unexecuted instantiation: column_mapper.cpp:_ZN5doris6format12_GLOBAL__N_118join_debug_stringsINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEZNKS0_16ColumnDefinition12debug_stringEvE3$_0EES8_RKSt6vectorIT_SaISC_EET0_ Unexecuted instantiation: column_mapper.cpp:_ZN5doris6format12_GLOBAL__N_118join_debug_stringsINS0_16ColumnDefinitionEZNKS3_12debug_stringB5cxx11EvE3$_1EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISC_EET0_ Unexecuted instantiation: column_mapper.cpp:_ZN5doris6format12_GLOBAL__N_118join_debug_stringsINS0_16LocalColumnIndexEZNKS3_12debug_stringB5cxx11EvE3$_0EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISC_EET0_ Unexecuted instantiation: column_mapper.cpp:_ZN5doris6format12_GLOBAL__N_118join_debug_stringsINS0_16ColumnDefinitionEZNKS0_13ColumnMapping12debug_stringB5cxx11EvE3$_0EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_ Unexecuted instantiation: column_mapper.cpp:_ZN5doris6format12_GLOBAL__N_118join_debug_stringsINS0_13ColumnMappingEZNKS3_12debug_stringB5cxx11EvE3$_1EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISC_EET0_ Unexecuted instantiation: column_mapper.cpp:_ZN5doris6format12_GLOBAL__N_118join_debug_stringsINS0_13ColumnMappingEZNKS0_17TableColumnMapper12debug_stringB5cxx11EvE3$_0EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_ |
245 | | |
246 | | } // namespace |
247 | | |
248 | | struct FileSlotRewriteInfo { |
249 | | size_t block_position = 0; |
250 | | DataTypePtr file_type; |
251 | | DataTypePtr table_type; |
252 | | std::string file_column_name; |
253 | | }; |
254 | | |
255 | | struct RewriteContext { |
256 | | RuntimeState* runtime_state = nullptr; |
257 | | std::vector<VExprSPtr> created_exprs {}; |
258 | | |
259 | 0 | void add_created_expr(VExprSPtr expr) { created_exprs.push_back(std::move(expr)); } |
260 | | |
261 | 0 | Status prepare_created_exprs(VExprContext* context) const { |
262 | 0 | DORIS_CHECK(context != nullptr); |
263 | 0 | RowDescriptor row_desc; |
264 | 0 | for (const auto& expr : created_exprs) { |
265 | 0 | if (dynamic_cast<const Cast*>(expr.get()) != nullptr && runtime_state == nullptr) { |
266 | 0 | return Status::InvalidArgument( |
267 | 0 | "RuntimeState is required to prepare rewritten cast expression {}", |
268 | 0 | expr->expr_name()); |
269 | 0 | } |
270 | 0 | RETURN_IF_ERROR(expr->prepare(runtime_state, row_desc, context)); |
271 | 0 | } |
272 | 0 | return Status::OK(); |
273 | 0 | } |
274 | | }; |
275 | | |
276 | | struct StructChildSelector { |
277 | | bool by_name = true; |
278 | | std::string name; |
279 | | size_t ordinal = 0; |
280 | | }; |
281 | | |
282 | | struct NestedStructPath { |
283 | | GlobalIndex root_global_index; |
284 | | std::vector<StructChildSelector> selectors; |
285 | | }; |
286 | | |
287 | 0 | static GlobalIndex slot_ref_global_index(const VSlotRef& slot_ref) { |
288 | 0 | DORIS_CHECK(slot_ref.column_id() >= 0); |
289 | 0 | return GlobalIndex(cast_set<size_t>(slot_ref.slot_id())); |
290 | 0 | } |
291 | | |
292 | | // A split-local literal produced by slot-literal predicate localization. This wrapper keeps the |
293 | | // original table literal so a cloned conjunct can be localized again for another split. |
294 | | class SplitLocalFileLiteral final : public TableLiteral { |
295 | | public: |
296 | | SplitLocalFileLiteral(const DataTypePtr& file_type, const Field& file_field, |
297 | | DataTypePtr original_type, Field original_field) |
298 | 0 | : TableLiteral(file_type, file_field), |
299 | 0 | _original_type(std::move(original_type)), |
300 | 0 | _original_field(std::move(original_field)) {} |
301 | 0 | const DataTypePtr& original_type() const { return _original_type; } |
302 | 0 | const Field& original_field() const { return _original_field; } |
303 | | |
304 | | private: |
305 | | DataTypePtr _original_type; |
306 | | Field _original_field; |
307 | | }; |
308 | | |
309 | | static VExprSPtr create_file_slot_ref(const VSlotRef& slot_ref, |
310 | | const FileSlotRewriteInfo& rewrite_info, |
311 | 0 | RewriteContext* rewrite_context) { |
312 | 0 | DCHECK(remove_nullable(slot_ref.data_type())->equals(*remove_nullable(rewrite_info.file_type))) |
313 | 0 | << rewrite_info.file_type->get_name() << " " << slot_ref.data_type()->get_name() << " " |
314 | 0 | << rewrite_info.table_type->get_name() << " " << rewrite_info.block_position << ' ' |
315 | 0 | << rewrite_info.file_column_name; |
316 | 0 | auto ref = TableSlotRef::create_shared(slot_ref.slot_id(), |
317 | 0 | cast_set<int>(rewrite_info.block_position), -1, |
318 | 0 | rewrite_info.file_type, rewrite_info.file_column_name); |
319 | 0 | rewrite_context->add_created_expr(ref); |
320 | 0 | return ref; |
321 | 0 | } |
322 | | |
323 | 0 | static bool is_cast_expr(const VExprSPtr& expr) { |
324 | 0 | return dynamic_cast<const Cast*>(expr.get()) != nullptr; |
325 | 0 | } |
326 | | |
327 | 0 | static bool is_binary_comparison_predicate(const VExprSPtr& expr) { |
328 | 0 | if (expr == nullptr || expr->get_num_children() != 2 || |
329 | 0 | (expr->node_type() != TExprNodeType::BINARY_PRED && |
330 | 0 | expr->node_type() != TExprNodeType::NULL_AWARE_BINARY_PRED)) { |
331 | 0 | return false; |
332 | 0 | } |
333 | 0 | switch (expr->op()) { |
334 | 0 | case TExprOpcode::EQ: |
335 | 0 | case TExprOpcode::EQ_FOR_NULL: |
336 | 0 | case TExprOpcode::NE: |
337 | 0 | case TExprOpcode::GE: |
338 | 0 | case TExprOpcode::GT: |
339 | 0 | case TExprOpcode::LE: |
340 | 0 | case TExprOpcode::LT: |
341 | 0 | return true; |
342 | 0 | default: |
343 | 0 | return false; |
344 | 0 | } |
345 | 0 | } |
346 | | |
347 | 0 | std::string TableColumnMapperOptions::debug_string() const { |
348 | 0 | std::ostringstream out; |
349 | 0 | out << "TableColumnMapperOptions{mode=" << mapping_mode_to_string(mode) |
350 | 0 | << ", allow_missing_columns=" << allow_missing_columns << "}"; |
351 | 0 | return out.str(); |
352 | 0 | } |
353 | | |
354 | 0 | std::string ColumnDefinition::debug_string() const { |
355 | 0 | std::ostringstream out; |
356 | 0 | out << "ColumnDefinition{name=" << name << ", identifier=" << field_debug_string(identifier) |
357 | 0 | << ", name_mapping=" |
358 | 0 | << join_debug_strings(name_mapping, [](const std::string& name) { return name; }) |
359 | 0 | << ", local_id=" << local_id << ", type=" << data_type_debug_string(type) << ", children=" |
360 | 0 | << join_debug_strings(children, |
361 | 0 | [](const ColumnDefinition& child) { return child.debug_string(); }) |
362 | 0 | << ", has_default_expr=" << (default_expr != nullptr) |
363 | 0 | << ", is_partition_key=" << is_partition_key << "}"; |
364 | 0 | return out.str(); |
365 | 0 | } |
366 | | |
367 | 0 | std::string LocalColumnIndex::debug_string() const { |
368 | 0 | std::ostringstream out; |
369 | 0 | out << "LocalColumnIndex{index=" << index << ", project_all_children=" << project_all_children |
370 | 0 | << ", children=" |
371 | 0 | << join_debug_strings(children, |
372 | 0 | [](const LocalColumnIndex& child) { return child.debug_string(); }) |
373 | 0 | << "}"; |
374 | 0 | return out.str(); |
375 | 0 | } |
376 | | |
377 | 0 | std::string ColumnMapping::debug_string() const { |
378 | 0 | std::ostringstream out; |
379 | 0 | out << "ColumnMapping{global_index=" << global_index |
380 | 0 | << ", table_column_name=" << table_column_name << ", file_local_id="; |
381 | 0 | if (file_local_id.has_value()) { |
382 | 0 | out << *file_local_id; |
383 | 0 | } else { |
384 | 0 | out << "null"; |
385 | 0 | } |
386 | 0 | out << ", constant_index="; |
387 | 0 | if (constant_index.has_value()) { |
388 | 0 | out << *constant_index; |
389 | 0 | } else { |
390 | 0 | out << "null"; |
391 | 0 | } |
392 | 0 | out << ", file_column_name=" << file_column_name |
393 | 0 | << ", original_file_type=" << data_type_debug_string(original_file_type) |
394 | 0 | << ", original_file_children=" |
395 | 0 | << join_debug_strings(original_file_children, |
396 | 0 | [](const ColumnDefinition& child) { return child.debug_string(); }) |
397 | 0 | << ", file_type=" << data_type_debug_string(file_type) |
398 | 0 | << ", table_type=" << data_type_debug_string(table_type) |
399 | 0 | << ", has_projection=" << (projection != nullptr) << ", child_mappings=" |
400 | 0 | << join_debug_strings(child_mappings, |
401 | 0 | [](const ColumnMapping& child) { return child.debug_string(); }) |
402 | 0 | << ", is_trivial=" << is_trivial << ", is_constant=" << constant_index.has_value() |
403 | 0 | << ", has_complex_projection=" << has_complex_projection |
404 | 0 | << ", filter_conversion=" << filter_conversion_type_to_string(filter_conversion) |
405 | 0 | << ", virtual_column_type=" << virtual_column_type_to_string(virtual_column_type) |
406 | 0 | << ", has_default_expr=" << (default_expr != nullptr) << "}"; |
407 | 0 | return out.str(); |
408 | 0 | } |
409 | | |
410 | 0 | std::string TableColumnMapper::debug_string() const { |
411 | 0 | std::ostringstream out; |
412 | 0 | out << "TableColumnMapper{options=" << _options.debug_string() << ", mappings=" |
413 | 0 | << join_debug_strings(_mappings, |
414 | 0 | [](const ColumnMapping& mapping) { return mapping.debug_string(); }) |
415 | 0 | << ", constant_count=" << _constant_map.size() << "}"; |
416 | 0 | return out.str(); |
417 | 0 | } |
418 | | |
419 | | static const FileSlotRewriteInfo* find_slot_rewrite_info( |
420 | | const VExprSPtr& expr, |
421 | | const std::map<GlobalIndex, FileSlotRewriteInfo>& global_to_file_slot, |
422 | 0 | const VSlotRef** slot_ref) { |
423 | 0 | if (expr == nullptr) { |
424 | 0 | return nullptr; |
425 | 0 | } |
426 | 0 | VExprSPtr slot_expr = expr; |
427 | 0 | const bool input_is_cast = is_cast_expr(expr) && expr->get_num_children() == 1; |
428 | 0 | if (is_cast_expr(expr) && expr->get_num_children() == 1) { |
429 | 0 | slot_expr = expr->children()[0]; |
430 | 0 | } |
431 | 0 | if (!slot_expr->is_slot_ref()) { |
432 | 0 | return nullptr; |
433 | 0 | } |
434 | 0 | const auto* candidate_slot_ref = assert_cast<const VSlotRef*>(slot_expr.get()); |
435 | 0 | const auto rewrite_it = global_to_file_slot.find(slot_ref_global_index(*candidate_slot_ref)); |
436 | 0 | if (rewrite_it == global_to_file_slot.end()) { |
437 | 0 | return nullptr; |
438 | 0 | } |
439 | 0 | if (input_is_cast && !expr->data_type()->equals(*rewrite_it->second.table_type)) { |
440 | 0 | return nullptr; |
441 | 0 | } |
442 | 0 | if (slot_ref != nullptr) { |
443 | 0 | *slot_ref = candidate_slot_ref; |
444 | 0 | } |
445 | 0 | return &rewrite_it->second; |
446 | 0 | } |
447 | | |
448 | 0 | static bool filter_conversion_has_local_source(FilterConversionType conversion) { |
449 | 0 | switch (conversion) { |
450 | 0 | case FilterConversionType::COPY_DIRECTLY: |
451 | 0 | case FilterConversionType::CAST_FILTER: |
452 | 0 | case FilterConversionType::READER_EXPRESSION: |
453 | 0 | return true; |
454 | 0 | case FilterConversionType::FINALIZE_ONLY: |
455 | 0 | case FilterConversionType::CONSTANT: |
456 | 0 | return false; |
457 | 0 | } |
458 | 0 | return false; |
459 | 0 | } |
460 | | |
461 | 0 | static bool column_predicate_can_use_local_source(FilterConversionType conversion) { |
462 | 0 | switch (conversion) { |
463 | 0 | case FilterConversionType::COPY_DIRECTLY: |
464 | 0 | return true; |
465 | 0 | case FilterConversionType::CAST_FILTER: |
466 | 0 | case FilterConversionType::READER_EXPRESSION: |
467 | 0 | case FilterConversionType::FINALIZE_ONLY: |
468 | 0 | case FilterConversionType::CONSTANT: |
469 | 0 | return false; |
470 | 0 | } |
471 | 0 | return false; |
472 | 0 | } |
473 | | |
474 | | static bool table_filter_has_only_local_entries( |
475 | 0 | const TableFilter& table_filter, const std::map<GlobalIndex, FilterEntry>& filter_entries) { |
476 | 0 | for (const auto global_index : table_filter.global_indices) { |
477 | 0 | const auto entry_it = filter_entries.find(global_index); |
478 | 0 | if (entry_it == filter_entries.end() || !entry_it->second.is_local()) { |
479 | 0 | return false; |
480 | 0 | } |
481 | 0 | } |
482 | 0 | return true; |
483 | 0 | } |
484 | | |
485 | | static VExprSPtr unwrap_literal_for_file_cast(const VExprSPtr& expr, |
486 | 0 | const DataTypePtr& table_type) { |
487 | 0 | if (expr == nullptr) { |
488 | 0 | return nullptr; |
489 | 0 | } |
490 | 0 | if (expr->is_literal()) { |
491 | 0 | return expr; |
492 | 0 | } |
493 | 0 | if (is_cast_expr(expr) && expr->get_num_children() == 1 && expr->children()[0]->is_literal() && |
494 | 0 | expr->children()[0]->data_type()->equals(*table_type)) { |
495 | 0 | return expr->children()[0]; |
496 | 0 | } |
497 | 0 | return nullptr; |
498 | 0 | } |
499 | | |
500 | 0 | static Field literal_field(const VExprSPtr& literal_expr) { |
501 | 0 | DORIS_CHECK(literal_expr != nullptr); |
502 | 0 | DORIS_CHECK(literal_expr->is_literal()); |
503 | 0 | const auto* literal = dynamic_cast<const VLiteral*>(literal_expr.get()); |
504 | 0 | DORIS_CHECK(literal != nullptr); |
505 | 0 | Field field; |
506 | 0 | literal->get_column_ptr()->get(0, field); |
507 | 0 | return field; |
508 | 0 | } |
509 | | |
510 | 0 | static TExprNode rebuild_expr_node(const VExpr& expr) { |
511 | 0 | TExprNode node; |
512 | 0 | node.__set_node_type(expr.node_type()); |
513 | 0 | node.__set_opcode(expr.op()); |
514 | 0 | node.__set_type(create_type_desc(remove_nullable(expr.data_type())->get_primitive_type(), |
515 | 0 | cast_set<int>(expr.data_type()->get_precision()), |
516 | 0 | cast_set<int>(expr.data_type()->get_scale()))); |
517 | 0 | node.__set_is_nullable(expr.data_type()->is_nullable()); |
518 | 0 | node.__set_num_children(expr.get_num_children()); |
519 | 0 | node.__set_fn(expr.fn()); |
520 | 0 | if (const auto* in_pred = dynamic_cast<const VInPredicate*>(&expr)) { |
521 | 0 | TInPredicate in_predicate; |
522 | 0 | in_predicate.__set_is_not_in(in_pred->is_not_in()); |
523 | 0 | node.__set_in_predicate(in_predicate); |
524 | 0 | } |
525 | 0 | return node; |
526 | 0 | } |
527 | | |
528 | | // TODO(gabriel): could we clone it in another way? |
529 | 0 | Status clone_table_expr_tree(const VExprSPtr& expr, VExprSPtr* cloned_expr) { |
530 | 0 | DORIS_CHECK(cloned_expr != nullptr); |
531 | 0 | if (expr == nullptr) { |
532 | 0 | *cloned_expr = nullptr; |
533 | 0 | return Status::OK(); |
534 | 0 | } |
535 | | |
536 | 0 | VExprSPtr cloned; |
537 | 0 | if (const auto* table_slot_ref = dynamic_cast<const TableSlotRef*>(expr.get())) { |
538 | 0 | cloned = TableSlotRef::create_shared(table_slot_ref->slot_id(), table_slot_ref->column_id(), |
539 | 0 | table_slot_ref->column_uniq_id(), |
540 | 0 | table_slot_ref->data_type(), |
541 | 0 | table_slot_ref->column_name()); |
542 | 0 | } else if (const auto* vslot_ref = dynamic_cast<const VSlotRef*>(expr.get())) { |
543 | 0 | cloned = TableSlotRef::create_shared(vslot_ref->slot_id(), vslot_ref->column_id(), |
544 | 0 | vslot_ref->column_uniq_id(), vslot_ref->data_type(), |
545 | 0 | vslot_ref->column_name()); |
546 | 0 | } else if (const auto* split_literal = dynamic_cast<const SplitLocalFileLiteral*>(expr.get())) { |
547 | 0 | cloned = std::make_shared<SplitLocalFileLiteral>( |
548 | 0 | split_literal->data_type(), literal_field(expr), split_literal->original_type(), |
549 | 0 | split_literal->original_field()); |
550 | 0 | } else if (dynamic_cast<const TableLiteral*>(expr.get()) != nullptr) { |
551 | 0 | cloned = TableLiteral::create_shared(expr->data_type(), literal_field(expr)); |
552 | 0 | } else if (expr->is_literal()) { |
553 | 0 | cloned = TableLiteral::create_shared(expr->data_type(), literal_field(expr)); |
554 | 0 | } else if (const auto* cast_expr = dynamic_cast<const Cast*>(expr.get())) { |
555 | 0 | cloned = std::make_shared<Cast>(cast_expr->data_type()); |
556 | 0 | } else if (const auto* vcast_expr = dynamic_cast<const VCastExpr*>(expr.get()); |
557 | 0 | vcast_expr != nullptr && vcast_expr->node_type() == TExprNodeType::CAST_EXPR) { |
558 | 0 | cloned = std::make_shared<Cast>(vcast_expr->data_type()); |
559 | 0 | } else if (const auto* in_pred = dynamic_cast<const VInPredicate*>(expr.get())) { |
560 | 0 | cloned = VInPredicate::create_shared(rebuild_expr_node(*in_pred)); |
561 | 0 | } else if (const auto* direct_in_pred = dynamic_cast<const VDirectInPredicate*>(expr.get())) { |
562 | 0 | cloned = std::make_shared<VDirectInPredicate>(rebuild_expr_node(*direct_in_pred), |
563 | 0 | direct_in_pred->get_set_func()); |
564 | 0 | } else if (const auto* compound_pred = dynamic_cast<const VCompoundPred*>(expr.get())) { |
565 | 0 | cloned = VCompoundPred::create_shared(rebuild_expr_node(*compound_pred)); |
566 | 0 | } else if (const auto* fn_call = dynamic_cast<const VectorizedFnCall*>(expr.get())) { |
567 | 0 | cloned = VectorizedFnCall::create_shared(rebuild_expr_node(*fn_call)); |
568 | 0 | } else { |
569 | 0 | return Status::NotSupported("Cannot clone expression {} for file-local rewrite", |
570 | 0 | expr->expr_name()); |
571 | 0 | } |
572 | | |
573 | 0 | VExprSPtrs cloned_children; |
574 | 0 | cloned_children.reserve(expr->children().size()); |
575 | 0 | for (const auto& child : expr->children()) { |
576 | 0 | VExprSPtr cloned_child; |
577 | 0 | RETURN_IF_ERROR(clone_table_expr_tree(child, &cloned_child)); |
578 | 0 | cloned_children.push_back(std::move(cloned_child)); |
579 | 0 | } |
580 | 0 | cloned->set_children(std::move(cloned_children)); |
581 | 0 | cloned->reset_prepare_state(); |
582 | 0 | *cloned_expr = std::move(cloned); |
583 | 0 | return Status::OK(); |
584 | 0 | } |
585 | | |
586 | | static VExprSPtr original_table_literal(const VExprSPtr& literal_expr, |
587 | 0 | RewriteContext* rewrite_context = nullptr) { |
588 | 0 | DORIS_CHECK(literal_expr != nullptr); |
589 | 0 | DORIS_CHECK(literal_expr->is_literal()); |
590 | 0 | const auto* rewritten_literal = dynamic_cast<const SplitLocalFileLiteral*>(literal_expr.get()); |
591 | 0 | if (rewritten_literal == nullptr) { |
592 | 0 | return literal_expr; |
593 | 0 | } |
594 | 0 | auto literal = TableLiteral::create_shared(rewritten_literal->original_type(), |
595 | 0 | rewritten_literal->original_field()); |
596 | 0 | if (rewrite_context != nullptr) { |
597 | 0 | rewrite_context->add_created_expr(literal); |
598 | 0 | } |
599 | 0 | return literal; |
600 | 0 | } |
601 | | |
602 | 0 | static bool is_struct_element_expr(const VExprSPtr& expr) { |
603 | 0 | return expr != nullptr && expr->get_num_children() == 2 && |
604 | 0 | expr->fn().name.function_name == "struct_element"; |
605 | 0 | } |
606 | | |
607 | 0 | static bool parse_struct_child_selector(const VExprSPtr& expr, StructChildSelector* selector) { |
608 | 0 | DORIS_CHECK(selector != nullptr); |
609 | 0 | if (expr == nullptr || !expr->is_literal()) { |
610 | 0 | return false; |
611 | 0 | } |
612 | 0 | const Field field = literal_field(expr); |
613 | 0 | switch (field.get_type()) { |
614 | 0 | case TYPE_STRING: |
615 | 0 | case TYPE_CHAR: |
616 | 0 | case TYPE_VARCHAR: |
617 | 0 | selector->by_name = true; |
618 | 0 | selector->name = std::string(field.as_string_view()); |
619 | 0 | return true; |
620 | 0 | case TYPE_BOOLEAN: |
621 | 0 | selector->by_name = false; |
622 | 0 | selector->ordinal = field.get<TYPE_BOOLEAN>() ? 1 : 0; |
623 | 0 | return selector->ordinal > 0; |
624 | 0 | case TYPE_TINYINT: |
625 | 0 | selector->by_name = false; |
626 | 0 | if (field.get<TYPE_TINYINT>() <= 0) { |
627 | 0 | return false; |
628 | 0 | } |
629 | 0 | selector->ordinal = cast_set<size_t>(field.get<TYPE_TINYINT>()); |
630 | 0 | return true; |
631 | 0 | case TYPE_SMALLINT: |
632 | 0 | selector->by_name = false; |
633 | 0 | if (field.get<TYPE_SMALLINT>() <= 0) { |
634 | 0 | return false; |
635 | 0 | } |
636 | 0 | selector->ordinal = cast_set<size_t>(field.get<TYPE_SMALLINT>()); |
637 | 0 | return true; |
638 | 0 | case TYPE_INT: |
639 | 0 | selector->by_name = false; |
640 | 0 | if (field.get<TYPE_INT>() <= 0) { |
641 | 0 | return false; |
642 | 0 | } |
643 | 0 | selector->ordinal = cast_set<size_t>(field.get<TYPE_INT>()); |
644 | 0 | return true; |
645 | 0 | case TYPE_BIGINT: |
646 | 0 | selector->by_name = false; |
647 | 0 | if (field.get<TYPE_BIGINT>() <= 0) { |
648 | 0 | return false; |
649 | 0 | } |
650 | 0 | selector->ordinal = cast_set<size_t>(field.get<TYPE_BIGINT>()); |
651 | 0 | return true; |
652 | 0 | default: |
653 | 0 | return false; |
654 | 0 | } |
655 | 0 | } |
656 | | |
657 | 0 | static bool extract_nested_struct_path(const VExprSPtr& expr, NestedStructPath* path) { |
658 | 0 | DORIS_CHECK(path != nullptr); |
659 | 0 | if (!is_struct_element_expr(expr)) { |
660 | 0 | return false; |
661 | 0 | } |
662 | | |
663 | 0 | StructChildSelector selector; |
664 | 0 | if (!parse_struct_child_selector(expr->children()[1], &selector)) { |
665 | 0 | return false; |
666 | 0 | } |
667 | | |
668 | 0 | const auto& parent = expr->children()[0]; |
669 | 0 | if (parent->is_slot_ref()) { |
670 | 0 | const auto* slot_ref = assert_cast<const VSlotRef*>(parent.get()); |
671 | 0 | path->root_global_index = slot_ref_global_index(*slot_ref); |
672 | 0 | path->selectors.clear(); |
673 | 0 | path->selectors.push_back(std::move(selector)); |
674 | 0 | return true; |
675 | 0 | } |
676 | | |
677 | 0 | if (!extract_nested_struct_path(parent, path)) { |
678 | 0 | return false; |
679 | 0 | } |
680 | 0 | path->selectors.push_back(std::move(selector)); |
681 | 0 | return true; |
682 | 0 | } |
683 | | |
684 | | static void collect_nested_struct_paths(const VExprSPtr& expr, |
685 | 0 | std::vector<NestedStructPath>* paths) { |
686 | 0 | DORIS_CHECK(paths != nullptr); |
687 | 0 | if (expr == nullptr) { |
688 | 0 | return; |
689 | 0 | } |
690 | 0 | NestedStructPath path; |
691 | 0 | if (extract_nested_struct_path(expr, &path)) { |
692 | 0 | paths->push_back(std::move(path)); |
693 | 0 | return; |
694 | 0 | } |
695 | 0 | for (const auto& child : expr->children()) { |
696 | 0 | collect_nested_struct_paths(child, paths); |
697 | 0 | } |
698 | 0 | } |
699 | | |
700 | | static const ColumnDefinition* resolve_file_child(const std::vector<ColumnDefinition>& children, |
701 | 0 | const StructChildSelector& selector) { |
702 | 0 | if (selector.by_name) { |
703 | 0 | const auto child_it = std::ranges::find_if(children, [&](const ColumnDefinition& child) { |
704 | 0 | return child.name == selector.name; |
705 | 0 | }); |
706 | 0 | return child_it == children.end() ? nullptr : &*child_it; |
707 | 0 | } |
708 | 0 | if (selector.ordinal == 0 || selector.ordinal > children.size()) { |
709 | 0 | return nullptr; |
710 | 0 | } |
711 | 0 | return &children[selector.ordinal - 1]; |
712 | 0 | } |
713 | | |
714 | 0 | static const DataTypeStruct* struct_type_or_null(const DataTypePtr& type) { |
715 | 0 | if (type == nullptr) { |
716 | 0 | return nullptr; |
717 | 0 | } |
718 | 0 | const auto nested_type = remove_nullable(type); |
719 | 0 | if (nested_type->get_primitive_type() != TYPE_STRUCT) { |
720 | 0 | return nullptr; |
721 | 0 | } |
722 | 0 | return assert_cast<const DataTypeStruct*>(nested_type.get()); |
723 | 0 | } |
724 | | |
725 | | static std::optional<int32_t> struct_child_index(const ColumnMapping& mapping, |
726 | 0 | const StructChildSelector& selector) { |
727 | 0 | const auto* struct_type = struct_type_or_null(mapping.table_type); |
728 | 0 | if (struct_type == nullptr) { |
729 | 0 | return std::nullopt; |
730 | 0 | } |
731 | 0 | if (selector.by_name) { |
732 | 0 | const auto position = struct_type->try_get_position_by_name(selector.name); |
733 | 0 | if (!position.has_value()) { |
734 | 0 | return std::nullopt; |
735 | 0 | } |
736 | 0 | return cast_set<int32_t>(*position); |
737 | 0 | } |
738 | 0 | if (selector.ordinal == 0 || selector.ordinal > struct_type->get_elements().size()) { |
739 | 0 | return std::nullopt; |
740 | 0 | } |
741 | 0 | return cast_set<int32_t>(selector.ordinal - 1); |
742 | 0 | } |
743 | | |
744 | | static int32_t child_mapping_global_index(const ColumnMapping& mapping, |
745 | | const ColumnMapping& child_mapping, |
746 | 0 | size_t fallback_child_idx) { |
747 | 0 | const auto* struct_type = struct_type_or_null(mapping.table_type); |
748 | 0 | if (struct_type == nullptr) { |
749 | 0 | return cast_set<int32_t>(fallback_child_idx); |
750 | 0 | } |
751 | 0 | const auto position = struct_type->try_get_position_by_name(child_mapping.table_column_name); |
752 | 0 | DORIS_CHECK(position.has_value()) << "Cannot find child '" << child_mapping.table_column_name |
753 | 0 | << "' in table type " << mapping.table_type->get_name(); |
754 | 0 | return cast_set<int32_t>(*position); |
755 | 0 | } |
756 | | |
757 | | static const ColumnMapping* resolve_mapped_child(const ColumnMapping& mapping, |
758 | 0 | int32_t global_child_index) { |
759 | 0 | for (size_t child_idx = 0; child_idx < mapping.child_mappings.size(); ++child_idx) { |
760 | 0 | const auto& child_mapping = mapping.child_mappings[child_idx]; |
761 | 0 | if (child_mapping_global_index(mapping, child_mapping, child_idx) == global_child_index) { |
762 | 0 | return &child_mapping; |
763 | 0 | } |
764 | 0 | } |
765 | 0 | return nullptr; |
766 | 0 | } |
767 | | |
768 | | static const ColumnMapping* resolve_mapped_child(const ColumnMapping& mapping, |
769 | 0 | const StructChildSelector& selector) { |
770 | 0 | const auto global_child_index = struct_child_index(mapping, selector); |
771 | 0 | if (!global_child_index.has_value()) { |
772 | 0 | return nullptr; |
773 | 0 | } |
774 | 0 | return resolve_mapped_child(mapping, *global_child_index); |
775 | 0 | } |
776 | | |
777 | 0 | static std::shared_ptr<IndexMapping> build_child_index_mapping(const ColumnMapping& mapping) { |
778 | 0 | DORIS_CHECK(mapping.file_local_id.has_value()); |
779 | 0 | auto result = std::make_shared<IndexMapping>(); |
780 | 0 | result->index = *mapping.file_local_id; |
781 | 0 | for (size_t child_idx = 0; child_idx < mapping.child_mappings.size(); ++child_idx) { |
782 | 0 | const auto& child_mapping = mapping.child_mappings[child_idx]; |
783 | 0 | if (!child_mapping.file_local_id.has_value()) { |
784 | 0 | continue; |
785 | 0 | } |
786 | 0 | result->child_mapping.emplace(child_mapping_global_index(mapping, child_mapping, child_idx), |
787 | 0 | build_child_index_mapping(child_mapping)); |
788 | 0 | } |
789 | 0 | return result; |
790 | 0 | } |
791 | | |
792 | 0 | static IndexMapping build_index_mapping(const ColumnMapping& mapping, LocalIndex block_position) { |
793 | 0 | DORIS_CHECK(mapping.file_local_id.has_value()); |
794 | 0 | IndexMapping result; |
795 | 0 | result.index = cast_set<int32_t>(block_position.value()); |
796 | 0 | for (size_t child_idx = 0; child_idx < mapping.child_mappings.size(); ++child_idx) { |
797 | 0 | const auto& child_mapping = mapping.child_mappings[child_idx]; |
798 | 0 | if (!child_mapping.file_local_id.has_value()) { |
799 | 0 | continue; |
800 | 0 | } |
801 | 0 | result.child_mapping.emplace(child_mapping_global_index(mapping, child_mapping, child_idx), |
802 | 0 | build_child_index_mapping(child_mapping)); |
803 | 0 | } |
804 | 0 | return result; |
805 | 0 | } |
806 | | |
807 | | static bool resolve_nested_projection_with_index_mapping(const NestedStructPath& path, |
808 | | const std::vector<ColumnMapping>& mappings, |
809 | | LocalColumnIndex* root_projection, |
810 | 0 | const ColumnMapping** leaf_mapping) { |
811 | 0 | DORIS_CHECK(root_projection != nullptr); |
812 | 0 | DORIS_CHECK(leaf_mapping != nullptr); |
813 | 0 | *root_projection = {}; |
814 | 0 | *leaf_mapping = nullptr; |
815 | 0 | if (path.selectors.empty()) { |
816 | 0 | return false; |
817 | 0 | } |
818 | 0 | const auto mapping_it = std::ranges::find_if(mappings, [&](const ColumnMapping& mapping) { |
819 | 0 | return mapping.global_index == path.root_global_index; |
820 | 0 | }); |
821 | 0 | if (mapping_it == mappings.end() || !mapping_it->file_local_id.has_value()) { |
822 | 0 | return false; |
823 | 0 | } |
824 | | |
825 | 0 | const auto root_index_mapping = build_index_mapping(*mapping_it, LocalIndex(0)); |
826 | 0 | *root_projection = LocalColumnIndex::partial_field(*mapping_it->file_local_id); |
827 | 0 | auto* current_projection = root_projection; |
828 | 0 | const auto* current_mapping = &*mapping_it; |
829 | 0 | const auto* current_index_mapping = &root_index_mapping; |
830 | 0 | for (size_t selector_idx = 0; selector_idx < path.selectors.size(); ++selector_idx) { |
831 | 0 | const auto global_child_index = |
832 | 0 | struct_child_index(*current_mapping, path.selectors[selector_idx]); |
833 | 0 | if (!global_child_index.has_value()) { |
834 | 0 | *root_projection = {}; |
835 | 0 | return false; |
836 | 0 | } |
837 | 0 | const auto index_mapping_it = |
838 | 0 | current_index_mapping->child_mapping.find(*global_child_index); |
839 | 0 | if (index_mapping_it == current_index_mapping->child_mapping.end()) { |
840 | 0 | *root_projection = {}; |
841 | 0 | return false; |
842 | 0 | } |
843 | 0 | const auto* child_mapping = resolve_mapped_child(*current_mapping, *global_child_index); |
844 | 0 | DORIS_CHECK(child_mapping != nullptr); |
845 | 0 | DORIS_CHECK(child_mapping->file_local_id.has_value()); |
846 | |
|
847 | 0 | auto child_projection = LocalColumnIndex::partial_field(index_mapping_it->second->index); |
848 | 0 | child_projection.project_all_children = selector_idx + 1 == path.selectors.size(); |
849 | 0 | current_projection->children.push_back(std::move(child_projection)); |
850 | 0 | current_projection = ¤t_projection->children.back(); |
851 | 0 | current_mapping = child_mapping; |
852 | 0 | current_index_mapping = index_mapping_it->second.get(); |
853 | 0 | } |
854 | 0 | *leaf_mapping = current_mapping; |
855 | 0 | return true; |
856 | 0 | } |
857 | | |
858 | | static Status build_filter_projection_path(const std::vector<ColumnDefinition>& children, |
859 | | std::span<const StructChildSelector> selectors, |
860 | 0 | LocalColumnIndex* projection) { |
861 | 0 | DORIS_CHECK(projection != nullptr); |
862 | 0 | if (selectors.empty()) { |
863 | 0 | return Status::InvalidArgument("Nested struct selector path is empty"); |
864 | 0 | } |
865 | 0 | const auto* child = resolve_file_child(children, selectors.front()); |
866 | 0 | if (child == nullptr) { |
867 | 0 | return Status::OK(); |
868 | 0 | } |
869 | 0 | *projection = LocalColumnIndex::field(child->file_local_id()); |
870 | 0 | projection->project_all_children = selectors.size() == 1; |
871 | 0 | projection->children.clear(); |
872 | 0 | if (selectors.size() == 1) { |
873 | 0 | return Status::OK(); |
874 | 0 | } |
875 | 0 | if (child->children.empty() || |
876 | 0 | remove_nullable(child->type)->get_primitive_type() != TYPE_STRUCT) { |
877 | 0 | *projection = LocalColumnIndex {}; |
878 | 0 | return Status::OK(); |
879 | 0 | } |
880 | 0 | LocalColumnIndex child_projection; |
881 | 0 | RETURN_IF_ERROR( |
882 | 0 | build_filter_projection_path(child->children, selectors.subspan(1), &child_projection)); |
883 | 0 | if (child_projection.field_id() < 0) { |
884 | 0 | *projection = LocalColumnIndex {}; |
885 | 0 | return Status::OK(); |
886 | 0 | } |
887 | 0 | projection->children.push_back(std::move(child_projection)); |
888 | 0 | return Status::OK(); |
889 | 0 | } |
890 | | |
891 | | // Prefer the table-to-file mapping tree for nested filter projection. This keeps renamed |
892 | | // children and field-id schema evolution in the mapper instead of leaking table names into the |
893 | | // file reader request. The file schema fallback below is only for filter-only children that do not |
894 | | // have an output child mapping yet. |
895 | | static Status build_filter_projection_path(const ColumnMapping& mapping, |
896 | | std::span<const StructChildSelector> selectors, |
897 | 0 | LocalColumnIndex* projection) { |
898 | 0 | DORIS_CHECK(projection != nullptr); |
899 | 0 | if (selectors.empty()) { |
900 | 0 | return Status::InvalidArgument("Nested struct selector path is empty"); |
901 | 0 | } |
902 | 0 | const auto* child_mapping = resolve_mapped_child(mapping, selectors.front()); |
903 | 0 | if (child_mapping == nullptr) { |
904 | 0 | return build_filter_projection_path(mapping.original_file_children, selectors, projection); |
905 | 0 | } |
906 | 0 | if (!child_mapping->file_local_id.has_value()) { |
907 | 0 | *projection = LocalColumnIndex {}; |
908 | 0 | return Status::OK(); |
909 | 0 | } |
910 | 0 | *projection = LocalColumnIndex::field(*child_mapping->file_local_id); |
911 | 0 | projection->project_all_children = selectors.size() == 1; |
912 | 0 | projection->children.clear(); |
913 | 0 | if (selectors.size() == 1) { |
914 | 0 | return Status::OK(); |
915 | 0 | } |
916 | 0 | LocalColumnIndex child_projection; |
917 | 0 | if (child_mapping->child_mappings.empty()) { |
918 | 0 | RETURN_IF_ERROR(build_filter_projection_path(child_mapping->original_file_children, |
919 | 0 | selectors.subspan(1), &child_projection)); |
920 | 0 | } else { |
921 | 0 | RETURN_IF_ERROR(build_filter_projection_path(*child_mapping, selectors.subspan(1), |
922 | 0 | &child_projection)); |
923 | 0 | } |
924 | 0 | if (child_projection.field_id() < 0) { |
925 | 0 | *projection = LocalColumnIndex {}; |
926 | 0 | return Status::OK(); |
927 | 0 | } |
928 | 0 | projection->children.push_back(std::move(child_projection)); |
929 | 0 | return Status::OK(); |
930 | 0 | } |
931 | | |
932 | 0 | static std::optional<PredicateType> to_column_predicate_type(TExprOpcode::type opcode) { |
933 | 0 | switch (opcode) { |
934 | 0 | case TExprOpcode::EQ: |
935 | 0 | return PredicateType::EQ; |
936 | 0 | case TExprOpcode::NE: |
937 | 0 | return PredicateType::NE; |
938 | 0 | case TExprOpcode::GT: |
939 | 0 | return PredicateType::GT; |
940 | 0 | case TExprOpcode::GE: |
941 | 0 | return PredicateType::GE; |
942 | 0 | case TExprOpcode::LT: |
943 | 0 | return PredicateType::LT; |
944 | 0 | case TExprOpcode::LE: |
945 | 0 | return PredicateType::LE; |
946 | 0 | default: |
947 | 0 | return std::nullopt; |
948 | 0 | } |
949 | 0 | } |
950 | | |
951 | 0 | static TExprOpcode::type reverse_comparison_opcode(TExprOpcode::type opcode) { |
952 | 0 | switch (opcode) { |
953 | 0 | case TExprOpcode::GT: |
954 | 0 | return TExprOpcode::LT; |
955 | 0 | case TExprOpcode::GE: |
956 | 0 | return TExprOpcode::LE; |
957 | 0 | case TExprOpcode::LT: |
958 | 0 | return TExprOpcode::GT; |
959 | 0 | case TExprOpcode::LE: |
960 | 0 | return TExprOpcode::GE; |
961 | 0 | default: |
962 | 0 | return opcode; |
963 | 0 | } |
964 | 0 | } |
965 | | |
966 | | static std::shared_ptr<ColumnPredicate> create_comparison_column_predicate( |
967 | | PredicateType predicate_type, uint32_t column_id, const std::string& column_name, |
968 | 0 | const DataTypePtr& data_type, const Field& value) { |
969 | 0 | switch (predicate_type) { |
970 | 0 | case PredicateType::EQ: |
971 | 0 | return create_comparison_predicate<PredicateType::EQ>(column_id, column_name, data_type, |
972 | 0 | value, false); |
973 | 0 | case PredicateType::NE: |
974 | 0 | return create_comparison_predicate<PredicateType::NE>(column_id, column_name, data_type, |
975 | 0 | value, false); |
976 | 0 | case PredicateType::GT: |
977 | 0 | return create_comparison_predicate<PredicateType::GT>(column_id, column_name, data_type, |
978 | 0 | value, false); |
979 | 0 | case PredicateType::GE: |
980 | 0 | return create_comparison_predicate<PredicateType::GE>(column_id, column_name, data_type, |
981 | 0 | value, false); |
982 | 0 | case PredicateType::LT: |
983 | 0 | return create_comparison_predicate<PredicateType::LT>(column_id, column_name, data_type, |
984 | 0 | value, false); |
985 | 0 | case PredicateType::LE: |
986 | 0 | return create_comparison_predicate<PredicateType::LE>(column_id, column_name, data_type, |
987 | 0 | value, false); |
988 | 0 | default: |
989 | 0 | return nullptr; |
990 | 0 | } |
991 | 0 | } |
992 | | |
993 | | static bool extract_child_id_path_from_projection(const LocalColumnIndex& root_projection, |
994 | 0 | std::vector<int32_t>* file_child_id_path) { |
995 | 0 | DORIS_CHECK(file_child_id_path != nullptr); |
996 | 0 | file_child_id_path->clear(); |
997 | 0 | const auto* current_projection = &root_projection; |
998 | 0 | while (!current_projection->children.empty()) { |
999 | 0 | if (current_projection->children.size() != 1) { |
1000 | 0 | file_child_id_path->clear(); |
1001 | 0 | return false; |
1002 | 0 | } |
1003 | 0 | current_projection = ¤t_projection->children[0]; |
1004 | 0 | file_child_id_path->push_back(current_projection->field_id()); |
1005 | 0 | } |
1006 | 0 | return !file_child_id_path->empty(); |
1007 | 0 | } |
1008 | | |
1009 | | static std::shared_ptr<ColumnPredicate> build_nested_comparison_predicate( |
1010 | | const VExprSPtr& literal_expr, TExprOpcode::type opcode, LocalColumnId root_file_column_id, |
1011 | 0 | const std::string& leaf_name, const DataTypePtr& file_leaf_type) { |
1012 | 0 | if (literal_expr == nullptr || !literal_expr->is_literal() || file_leaf_type == nullptr) { |
1013 | 0 | return nullptr; |
1014 | 0 | } |
1015 | 0 | const auto predicate_type = to_column_predicate_type(opcode); |
1016 | 0 | if (!predicate_type.has_value()) { |
1017 | 0 | return nullptr; |
1018 | 0 | } |
1019 | 0 | const auto original_literal = original_table_literal(literal_expr); |
1020 | 0 | const Field original_field = literal_field(original_literal); |
1021 | 0 | Field file_field; |
1022 | 0 | try { |
1023 | 0 | convert_field_to_type(original_field, *file_leaf_type, &file_field, |
1024 | 0 | original_literal->data_type().get()); |
1025 | 0 | } catch (const Exception&) { |
1026 | 0 | return nullptr; |
1027 | 0 | } |
1028 | 0 | if (file_field.is_null()) { |
1029 | 0 | return nullptr; |
1030 | 0 | } |
1031 | 0 | try { |
1032 | 0 | return create_comparison_column_predicate(*predicate_type, |
1033 | 0 | cast_set<uint32_t>(root_file_column_id.value()), |
1034 | 0 | leaf_name, file_leaf_type, file_field); |
1035 | 0 | } catch (const Exception&) { |
1036 | 0 | return nullptr; |
1037 | 0 | } |
1038 | 0 | } |
1039 | | |
1040 | | static std::shared_ptr<ColumnPredicate> build_nested_in_list_predicate( |
1041 | | const VExprSPtrs& literal_exprs, LocalColumnId root_file_column_id, |
1042 | 0 | const std::string& leaf_name, const DataTypePtr& file_leaf_type) { |
1043 | 0 | if (literal_exprs.empty() || file_leaf_type == nullptr) { |
1044 | 0 | return nullptr; |
1045 | 0 | } |
1046 | | |
1047 | 0 | auto value_column = file_leaf_type->create_column(); |
1048 | 0 | for (const auto& literal_expr : literal_exprs) { |
1049 | 0 | if (literal_expr == nullptr || !literal_expr->is_literal()) { |
1050 | 0 | return nullptr; |
1051 | 0 | } |
1052 | 0 | const auto original_literal = original_table_literal(literal_expr); |
1053 | 0 | const Field original_field = literal_field(original_literal); |
1054 | 0 | Field file_field; |
1055 | 0 | try { |
1056 | 0 | convert_field_to_type(original_field, *file_leaf_type, &file_field, |
1057 | 0 | original_literal->data_type().get()); |
1058 | 0 | } catch (const Exception&) { |
1059 | 0 | return nullptr; |
1060 | 0 | } |
1061 | 0 | if (file_field.is_null()) { |
1062 | 0 | return nullptr; |
1063 | 0 | } |
1064 | 0 | value_column->insert(file_field); |
1065 | 0 | } |
1066 | | |
1067 | 0 | std::shared_ptr<HybridSetBase> values; |
1068 | 0 | try { |
1069 | 0 | values.reset(create_set(file_leaf_type->get_primitive_type(), literal_exprs.size(), false)); |
1070 | 0 | ColumnPtr value_column_ptr = std::move(value_column); |
1071 | 0 | values->insert_range_from(value_column_ptr, 0, value_column_ptr->size()); |
1072 | 0 | return create_in_list_predicate<PredicateType::IN_LIST>( |
1073 | 0 | cast_set<uint32_t>(root_file_column_id.value()), leaf_name, file_leaf_type, values, |
1074 | 0 | false); |
1075 | 0 | } catch (const Exception&) { |
1076 | 0 | return nullptr; |
1077 | 0 | } |
1078 | 0 | } |
1079 | | |
1080 | | static bool extract_nested_binary_comparison_filter(const VExprSPtr& expr, |
1081 | | const std::vector<ColumnMapping>& mappings, |
1082 | 0 | FileColumnPredicateFilter* column_filter) { |
1083 | 0 | DORIS_CHECK(column_filter != nullptr); |
1084 | 0 | if (!is_binary_comparison_predicate(expr)) { |
1085 | 0 | return false; |
1086 | 0 | } |
1087 | 0 | NestedStructPath path; |
1088 | 0 | VExprSPtr literal_expr; |
1089 | 0 | TExprOpcode::type opcode = expr->op(); |
1090 | 0 | if (extract_nested_struct_path(expr->children()[0], &path) && |
1091 | 0 | expr->children()[1]->is_literal()) { |
1092 | 0 | literal_expr = expr->children()[1]; |
1093 | 0 | } else if (extract_nested_struct_path(expr->children()[1], &path) && |
1094 | 0 | expr->children()[0]->is_literal()) { |
1095 | 0 | literal_expr = expr->children()[0]; |
1096 | 0 | opcode = reverse_comparison_opcode(opcode); |
1097 | 0 | } else { |
1098 | 0 | return false; |
1099 | 0 | } |
1100 | | |
1101 | 0 | LocalColumnIndex file_projection; |
1102 | 0 | const ColumnMapping* leaf_mapping = nullptr; |
1103 | 0 | if (!resolve_nested_projection_with_index_mapping(path, mappings, &file_projection, |
1104 | 0 | &leaf_mapping) || |
1105 | 0 | leaf_mapping == nullptr || leaf_mapping->file_type == nullptr || |
1106 | 0 | is_complex_type(remove_nullable(leaf_mapping->file_type)->get_primitive_type())) { |
1107 | 0 | return false; |
1108 | 0 | } |
1109 | 0 | auto predicate = build_nested_comparison_predicate( |
1110 | 0 | literal_expr, opcode, file_projection.column_id(), leaf_mapping->file_column_name, |
1111 | 0 | remove_nullable(leaf_mapping->file_type)); |
1112 | 0 | if (predicate == nullptr) { |
1113 | 0 | return false; |
1114 | 0 | } |
1115 | 0 | std::vector<int32_t> file_child_id_path; |
1116 | 0 | if (!extract_child_id_path_from_projection(file_projection, &file_child_id_path)) { |
1117 | 0 | return false; |
1118 | 0 | } |
1119 | 0 | column_filter->file_column_id = file_projection.column_id(); |
1120 | 0 | column_filter->file_child_id_path = std::move(file_child_id_path); |
1121 | 0 | column_filter->predicates.push_back(std::move(predicate)); |
1122 | 0 | return true; |
1123 | 0 | } |
1124 | | |
1125 | | static bool extract_nested_in_list_filter(const VExprSPtr& expr, |
1126 | | const std::vector<ColumnMapping>& mappings, |
1127 | 0 | FileColumnPredicateFilter* column_filter) { |
1128 | 0 | DORIS_CHECK(column_filter != nullptr); |
1129 | 0 | if (expr == nullptr || expr->node_type() != TExprNodeType::IN_PRED || |
1130 | 0 | expr->get_num_children() < 2) { |
1131 | 0 | return false; |
1132 | 0 | } |
1133 | 0 | if (const auto* in_predicate = dynamic_cast<const VInPredicate*>(expr.get()); |
1134 | 0 | in_predicate != nullptr && in_predicate->is_not_in()) { |
1135 | 0 | return false; |
1136 | 0 | } |
1137 | | |
1138 | 0 | NestedStructPath path; |
1139 | 0 | if (!extract_nested_struct_path(expr->children()[0], &path)) { |
1140 | 0 | return false; |
1141 | 0 | } |
1142 | | |
1143 | 0 | VExprSPtrs literal_exprs; |
1144 | 0 | literal_exprs.reserve(expr->get_num_children() - 1); |
1145 | 0 | for (size_t child_idx = 1; child_idx < expr->children().size(); ++child_idx) { |
1146 | 0 | if (!expr->children()[child_idx]->is_literal()) { |
1147 | 0 | return false; |
1148 | 0 | } |
1149 | 0 | literal_exprs.push_back(expr->children()[child_idx]); |
1150 | 0 | } |
1151 | | |
1152 | 0 | LocalColumnIndex file_projection; |
1153 | 0 | const ColumnMapping* leaf_mapping = nullptr; |
1154 | 0 | if (!resolve_nested_projection_with_index_mapping(path, mappings, &file_projection, |
1155 | 0 | &leaf_mapping) || |
1156 | 0 | leaf_mapping == nullptr || leaf_mapping->file_type == nullptr || |
1157 | 0 | is_complex_type(remove_nullable(leaf_mapping->file_type)->get_primitive_type())) { |
1158 | 0 | return false; |
1159 | 0 | } |
1160 | 0 | auto predicate = build_nested_in_list_predicate(literal_exprs, file_projection.column_id(), |
1161 | 0 | leaf_mapping->file_column_name, |
1162 | 0 | remove_nullable(leaf_mapping->file_type)); |
1163 | 0 | if (predicate == nullptr) { |
1164 | 0 | return false; |
1165 | 0 | } |
1166 | 0 | std::vector<int32_t> file_child_id_path; |
1167 | 0 | if (!extract_child_id_path_from_projection(file_projection, &file_child_id_path)) { |
1168 | 0 | return false; |
1169 | 0 | } |
1170 | 0 | column_filter->file_column_id = file_projection.column_id(); |
1171 | 0 | column_filter->file_child_id_path = std::move(file_child_id_path); |
1172 | 0 | column_filter->predicates.push_back(std::move(predicate)); |
1173 | 0 | return true; |
1174 | 0 | } |
1175 | | |
1176 | | static void merge_column_predicate_filter(FileColumnPredicateFilter column_filter, |
1177 | 0 | std::vector<FileColumnPredicateFilter>* filters) { |
1178 | 0 | DORIS_CHECK(filters != nullptr); |
1179 | 0 | auto existing_filter_it = std::ranges::find_if(*filters, [&](const auto& existing_filter) { |
1180 | 0 | return existing_filter.file_column_id == column_filter.file_column_id && |
1181 | 0 | existing_filter.file_child_id_path == column_filter.file_child_id_path; |
1182 | 0 | }); |
1183 | 0 | if (existing_filter_it == filters->end()) { |
1184 | 0 | filters->push_back(std::move(column_filter)); |
1185 | 0 | return; |
1186 | 0 | } |
1187 | 0 | existing_filter_it->predicates.insert(existing_filter_it->predicates.end(), |
1188 | 0 | column_filter.predicates.begin(), |
1189 | 0 | column_filter.predicates.end()); |
1190 | 0 | } |
1191 | | |
1192 | | static void collect_nested_column_predicate_filters( |
1193 | | const VExprSPtr& expr, const std::vector<ColumnMapping>& mappings, |
1194 | 0 | std::vector<FileColumnPredicateFilter>* filters) { |
1195 | 0 | DORIS_CHECK(filters != nullptr); |
1196 | 0 | if (expr == nullptr) { |
1197 | 0 | return; |
1198 | 0 | } |
1199 | 0 | if (expr->node_type() == TExprNodeType::COMPOUND_PRED && |
1200 | 0 | expr->op() == TExprOpcode::COMPOUND_AND) { |
1201 | 0 | for (const auto& child : expr->children()) { |
1202 | 0 | collect_nested_column_predicate_filters(child, mappings, filters); |
1203 | 0 | } |
1204 | 0 | return; |
1205 | 0 | } |
1206 | 0 | FileColumnPredicateFilter column_filter; |
1207 | 0 | if (extract_nested_binary_comparison_filter(expr, mappings, &column_filter) || |
1208 | 0 | extract_nested_in_list_filter(expr, mappings, &column_filter)) { |
1209 | 0 | merge_column_predicate_filter(std::move(column_filter), filters); |
1210 | 0 | } |
1211 | 0 | } |
1212 | | |
1213 | | static Status build_projected_type_from_projection(const DataTypePtr& file_type, |
1214 | | const std::vector<ColumnDefinition>& children, |
1215 | | const LocalColumnIndex& projection, |
1216 | 0 | DataTypePtr* projected_type) { |
1217 | 0 | DORIS_CHECK(file_type != nullptr); |
1218 | 0 | DORIS_CHECK(projected_type != nullptr); |
1219 | 0 | ColumnDefinition field; |
1220 | 0 | field.type = file_type; |
1221 | 0 | field.children = children; |
1222 | 0 | ColumnDefinition projected_field; |
1223 | 0 | RETURN_IF_ERROR(project_column_definition(field, projection, &projected_field)); |
1224 | 0 | *projected_type = std::move(projected_field.type); |
1225 | 0 | return Status::OK(); |
1226 | 0 | } |
1227 | | |
1228 | | static VExprSPtr rewrite_literal_to_file_type(const VExprSPtr& literal_expr, |
1229 | | const FileSlotRewriteInfo& rewrite_info, |
1230 | 0 | RewriteContext* rewrite_context) { |
1231 | 0 | DORIS_CHECK(literal_expr != nullptr); |
1232 | 0 | DORIS_CHECK(literal_expr->is_literal()); |
1233 | 0 | const auto original_literal = original_table_literal(literal_expr, rewrite_context); |
1234 | 0 | const Field original_field = literal_field(original_literal); |
1235 | 0 | if (rewrite_info.file_type->equals(*original_literal->data_type())) { |
1236 | 0 | return original_literal; |
1237 | 0 | } |
1238 | 0 | Field file_field; |
1239 | 0 | try { |
1240 | 0 | convert_field_to_type(original_field, *rewrite_info.file_type, &file_field, |
1241 | 0 | original_literal->data_type().get()); |
1242 | 0 | } catch (const Exception&) { |
1243 | 0 | return nullptr; |
1244 | 0 | } |
1245 | 0 | if (file_field.is_null()) { |
1246 | 0 | return nullptr; |
1247 | 0 | } |
1248 | 0 | if (file_field.get_type() != remove_nullable(rewrite_info.file_type)->get_primitive_type()) { |
1249 | 0 | return nullptr; |
1250 | 0 | } |
1251 | 0 | auto literal = std::make_shared<SplitLocalFileLiteral>( |
1252 | 0 | rewrite_info.file_type, file_field, original_literal->data_type(), original_field); |
1253 | 0 | rewrite_context->add_created_expr(literal); |
1254 | 0 | return literal; |
1255 | 0 | } |
1256 | | |
1257 | | static bool rewrite_binary_slot_literal_predicate( |
1258 | | const VExprSPtr& expr, |
1259 | | const std::map<GlobalIndex, FileSlotRewriteInfo>& global_to_file_slot, |
1260 | 0 | RewriteContext* rewrite_context) { |
1261 | 0 | if (!is_binary_comparison_predicate(expr)) { |
1262 | 0 | return false; |
1263 | 0 | } |
1264 | 0 | auto children = expr->children(); |
1265 | 0 | const VSlotRef* slot_ref = nullptr; |
1266 | 0 | const FileSlotRewriteInfo* rewrite_info = |
1267 | 0 | find_slot_rewrite_info(children[0], global_to_file_slot, &slot_ref); |
1268 | 0 | int slot_child_idx = 0; |
1269 | 0 | int literal_child_idx = 1; |
1270 | 0 | if (rewrite_info == nullptr) { |
1271 | 0 | rewrite_info = find_slot_rewrite_info(children[1], global_to_file_slot, &slot_ref); |
1272 | 0 | slot_child_idx = 1; |
1273 | 0 | literal_child_idx = 0; |
1274 | 0 | } |
1275 | 0 | if (rewrite_info == nullptr || slot_ref == nullptr) { |
1276 | 0 | return false; |
1277 | 0 | } |
1278 | 0 | auto literal_expr = |
1279 | 0 | unwrap_literal_for_file_cast(children[literal_child_idx], rewrite_info->table_type); |
1280 | 0 | if (literal_expr == nullptr) { |
1281 | 0 | return false; |
1282 | 0 | } |
1283 | | |
1284 | 0 | auto rewritten_literal = |
1285 | 0 | rewrite_literal_to_file_type(literal_expr, *rewrite_info, rewrite_context); |
1286 | 0 | if (rewritten_literal == nullptr) { |
1287 | 0 | children[literal_child_idx] = original_table_literal(literal_expr, rewrite_context); |
1288 | 0 | expr->set_children(std::move(children)); |
1289 | 0 | return false; |
1290 | 0 | } |
1291 | | |
1292 | 0 | children[slot_child_idx] = create_file_slot_ref(*slot_ref, *rewrite_info, rewrite_context); |
1293 | 0 | children[literal_child_idx] = std::move(rewritten_literal); |
1294 | 0 | expr->set_children(std::move(children)); |
1295 | 0 | return true; |
1296 | 0 | } |
1297 | | |
1298 | | static bool rewrite_in_slot_literal_predicate( |
1299 | | const VExprSPtr& expr, |
1300 | | const std::map<GlobalIndex, FileSlotRewriteInfo>& global_to_file_slot, |
1301 | 0 | RewriteContext* rewrite_context) { |
1302 | 0 | if (expr->node_type() != TExprNodeType::IN_PRED || expr->get_num_children() < 2) { |
1303 | 0 | return false; |
1304 | 0 | } |
1305 | 0 | auto children = expr->children(); |
1306 | 0 | const VSlotRef* slot_ref = nullptr; |
1307 | 0 | const FileSlotRewriteInfo* rewrite_info = |
1308 | 0 | find_slot_rewrite_info(children[0], global_to_file_slot, &slot_ref); |
1309 | 0 | if (rewrite_info == nullptr || slot_ref == nullptr) { |
1310 | 0 | return false; |
1311 | 0 | } |
1312 | | |
1313 | 0 | VExprSPtrs rewritten_literals; |
1314 | 0 | rewritten_literals.reserve(children.size() - 1); |
1315 | 0 | for (size_t child_idx = 1; child_idx < children.size(); ++child_idx) { |
1316 | 0 | auto literal_expr = |
1317 | 0 | unwrap_literal_for_file_cast(children[child_idx], rewrite_info->table_type); |
1318 | 0 | if (literal_expr == nullptr) { |
1319 | 0 | return false; |
1320 | 0 | } |
1321 | 0 | auto rewritten_literal = |
1322 | 0 | rewrite_literal_to_file_type(literal_expr, *rewrite_info, rewrite_context); |
1323 | 0 | if (rewritten_literal == nullptr) { |
1324 | 0 | for (size_t restore_idx = 1; restore_idx < children.size(); ++restore_idx) { |
1325 | 0 | auto restore_literal = unwrap_literal_for_file_cast(children[restore_idx], |
1326 | 0 | rewrite_info->table_type); |
1327 | 0 | if (restore_literal != nullptr) { |
1328 | 0 | children[restore_idx] = |
1329 | 0 | original_table_literal(restore_literal, rewrite_context); |
1330 | 0 | } |
1331 | 0 | } |
1332 | 0 | expr->set_children(std::move(children)); |
1333 | 0 | return false; |
1334 | 0 | } |
1335 | 0 | rewritten_literals.push_back(std::move(rewritten_literal)); |
1336 | 0 | } |
1337 | | |
1338 | 0 | children[0] = create_file_slot_ref(*slot_ref, *rewrite_info, rewrite_context); |
1339 | 0 | for (size_t literal_idx = 0; literal_idx < rewritten_literals.size(); ++literal_idx) { |
1340 | 0 | children[literal_idx + 1] = std::move(rewritten_literals[literal_idx]); |
1341 | 0 | } |
1342 | 0 | expr->set_children(std::move(children)); |
1343 | 0 | return true; |
1344 | 0 | } |
1345 | | |
1346 | | static VExprSPtr rewrite_table_expr_to_file_expr( |
1347 | | const VExprSPtr& expr, |
1348 | | const std::map<GlobalIndex, FileSlotRewriteInfo>& global_to_file_slot, |
1349 | 0 | RewriteContext* rewrite_context) { |
1350 | 0 | if (expr == nullptr) { |
1351 | 0 | return nullptr; |
1352 | 0 | } |
1353 | 0 | DORIS_CHECK(rewrite_context != nullptr); |
1354 | 0 | if (rewrite_binary_slot_literal_predicate(expr, global_to_file_slot, rewrite_context)) { |
1355 | 0 | return expr; |
1356 | 0 | } |
1357 | 0 | if (rewrite_in_slot_literal_predicate(expr, global_to_file_slot, rewrite_context)) { |
1358 | 0 | return expr; |
1359 | 0 | } |
1360 | 0 | if (is_struct_element_expr(expr)) { |
1361 | 0 | auto children = expr->children(); |
1362 | 0 | if (children[0]->is_slot_ref()) { |
1363 | 0 | const auto* slot_ref = assert_cast<const VSlotRef*>(children[0].get()); |
1364 | 0 | const auto rewrite_it = global_to_file_slot.find(slot_ref_global_index(*slot_ref)); |
1365 | 0 | if (rewrite_it != global_to_file_slot.end()) { |
1366 | | // struct_element must see the actual file struct layout. Casting the parent struct |
1367 | | // to the output projection can hide filter-only children such as `s.id` in |
1368 | | // `SELECT s.name WHERE s.id > 5`. |
1369 | 0 | children[0] = create_file_slot_ref(*slot_ref, rewrite_it->second, rewrite_context); |
1370 | 0 | expr->set_children(std::move(children)); |
1371 | 0 | return expr; |
1372 | 0 | } |
1373 | 0 | } |
1374 | 0 | children[0] = |
1375 | 0 | rewrite_table_expr_to_file_expr(children[0], global_to_file_slot, rewrite_context); |
1376 | 0 | expr->set_children(std::move(children)); |
1377 | 0 | return expr; |
1378 | 0 | } |
1379 | 0 | if (expr->is_slot_ref()) { |
1380 | 0 | const auto* slot_ref = assert_cast<const VSlotRef*>(expr.get()); |
1381 | 0 | const auto rewrite_it = global_to_file_slot.find(slot_ref_global_index(*slot_ref)); |
1382 | 0 | if (rewrite_it != global_to_file_slot.end()) { |
1383 | 0 | const auto& rewrite_info = rewrite_it->second; |
1384 | 0 | auto file_slot = create_file_slot_ref(*slot_ref, rewrite_info, rewrite_context); |
1385 | 0 | if (rewrite_info.file_type->equals(*rewrite_info.table_type)) { |
1386 | 0 | return file_slot; |
1387 | 0 | } |
1388 | 0 | auto cast_expr = Cast::create_shared(rewrite_info.table_type); |
1389 | 0 | cast_expr->add_child(std::move(file_slot)); |
1390 | 0 | rewrite_context->add_created_expr(cast_expr); |
1391 | 0 | return cast_expr; |
1392 | 0 | } |
1393 | 0 | return expr; |
1394 | 0 | } |
1395 | | // The input is a split-local cloned tree. A previous split-local clone may already have |
1396 | | // inserted Cast(slot). Keep that rewrite idempotent: rewrite the cast child from table slot to |
1397 | | // the current split's file slot, and drop the cast when the current split no longer needs it. |
1398 | 0 | if (is_cast_expr(expr) && expr->get_num_children() == 1) { |
1399 | 0 | const auto& child = expr->children()[0]; |
1400 | 0 | if (child->is_slot_ref()) { |
1401 | 0 | const auto* slot_ref = assert_cast<const VSlotRef*>(child.get()); |
1402 | 0 | const auto rewrite_it = global_to_file_slot.find(slot_ref_global_index(*slot_ref)); |
1403 | 0 | if (rewrite_it != global_to_file_slot.end() && |
1404 | 0 | expr->data_type()->equals(*rewrite_it->second.table_type)) { |
1405 | 0 | auto rewritten_child = |
1406 | 0 | create_file_slot_ref(*slot_ref, rewrite_it->second, rewrite_context); |
1407 | 0 | if (rewrite_it->second.file_type->equals(*rewrite_it->second.table_type)) { |
1408 | 0 | return rewritten_child; |
1409 | 0 | } |
1410 | 0 | expr->set_children({std::move(rewritten_child)}); |
1411 | 0 | return expr; |
1412 | 0 | } |
1413 | 0 | } |
1414 | 0 | } |
1415 | | |
1416 | 0 | VExprSPtrs rewritten_children; |
1417 | 0 | rewritten_children.reserve(expr->children().size()); |
1418 | 0 | for (const auto& child : expr->children()) { |
1419 | 0 | rewritten_children.push_back( |
1420 | 0 | rewrite_table_expr_to_file_expr(child, global_to_file_slot, rewrite_context)); |
1421 | 0 | } |
1422 | 0 | expr->set_children(std::move(rewritten_children)); |
1423 | 0 | return expr; |
1424 | 0 | } |
1425 | | |
1426 | | static constexpr const char* ROW_LINEAGE_ROW_ID = "_row_id"; |
1427 | | static constexpr const char* ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER = "_last_updated_sequence_number"; |
1428 | | |
1429 | 0 | static bool complex_projection_has_pruned_children(const ColumnMapping& mapping) { |
1430 | 0 | if (!is_complex_type(mapping.file_type->get_primitive_type())) { |
1431 | 0 | return false; |
1432 | 0 | } |
1433 | 0 | if (mapping.child_mappings.empty()) { |
1434 | 0 | return false; |
1435 | 0 | } |
1436 | 0 | DORIS_CHECK(mapping.file_type != nullptr); |
1437 | 0 | DORIS_CHECK(mapping.table_type != nullptr); |
1438 | 0 | if (remove_nullable(mapping.file_type)->get_primitive_type() != |
1439 | 0 | remove_nullable(mapping.table_type)->get_primitive_type()) { |
1440 | 0 | return true; |
1441 | 0 | } |
1442 | 0 | if (!mapping.table_type->equals(*mapping.file_type)) { |
1443 | 0 | return true; |
1444 | 0 | } |
1445 | 0 | for (const auto& child_mapping : mapping.child_mappings) { |
1446 | | // `child_mapping.table_column_name != child_mapping.file_column_name` means this column is renamed |
1447 | | // `!child_mapping.file_local_id.has_value()` means this column is miss in file |
1448 | 0 | if (child_mapping.table_column_name != child_mapping.file_column_name || |
1449 | 0 | !child_mapping.file_local_id.has_value() || |
1450 | 0 | complex_projection_has_pruned_children(child_mapping)) { |
1451 | 0 | return true; |
1452 | 0 | } |
1453 | 0 | } |
1454 | 0 | return false; |
1455 | 0 | } |
1456 | | |
1457 | | // Build the projected file type according to the pruned complex projection. For example, if we |
1458 | | // have a struct column `s` with children `id` and `name`, and the projection only keeps `s.name`, |
1459 | | // then we need to build the projected file type of `s` to only contain `name` so that the file |
1460 | | // reader can read it correctly. |
1461 | | static Status build_projected_child_type(const DataTypePtr& file_type, |
1462 | | const std::vector<ColumnMapping>& child_mappings, |
1463 | 0 | DataTypePtr* projected_type) { |
1464 | 0 | DORIS_CHECK(file_type != nullptr); |
1465 | 0 | DORIS_CHECK(projected_type != nullptr); |
1466 | 0 | DataTypes child_types; |
1467 | 0 | Strings child_names; |
1468 | 0 | child_types.reserve(child_mappings.size()); |
1469 | 0 | child_names.reserve(child_mappings.size()); |
1470 | 0 | for (const auto& child_mapping : child_mappings) { |
1471 | 0 | if (!child_mapping.file_local_id.has_value()) { |
1472 | | // Missing child column |
1473 | 0 | continue; |
1474 | 0 | } |
1475 | 0 | child_types.push_back(child_mapping.file_type); |
1476 | 0 | child_names.push_back(child_mapping.file_column_name); |
1477 | 0 | } |
1478 | 0 | return rebuild_projected_type(file_type, child_types, child_names, projected_type); |
1479 | 0 | } |
1480 | | |
1481 | 0 | static Status build_complex_projection(const ColumnMapping& mapping, LocalColumnIndex* projection) { |
1482 | 0 | if (projection == nullptr) { |
1483 | 0 | return Status::InvalidArgument("projection is null"); |
1484 | 0 | } |
1485 | 0 | DORIS_CHECK(mapping.file_local_id.has_value()); |
1486 | 0 | *projection = LocalColumnIndex::field(*mapping.file_local_id); |
1487 | 0 | projection->project_all_children = mapping.child_mappings.empty(); |
1488 | 0 | projection->children.clear(); |
1489 | 0 | for (const auto& child_mapping : mapping.child_mappings) { |
1490 | 0 | if (!child_mapping.file_local_id.has_value()) { |
1491 | 0 | continue; |
1492 | 0 | } |
1493 | 0 | LocalColumnIndex child_projection; |
1494 | 0 | RETURN_IF_ERROR(build_complex_projection(child_mapping, &child_projection)); |
1495 | 0 | projection->children.push_back(std::move(child_projection)); |
1496 | 0 | } |
1497 | 0 | if (!projection->project_all_children && projection->children.empty()) { |
1498 | 0 | return Status::NotSupported("Projection for complex column {} contains no file children", |
1499 | 0 | mapping.file_column_name); |
1500 | 0 | } |
1501 | 0 | return Status::OK(); |
1502 | 0 | } |
1503 | | |
1504 | | // Re-build file type according to the pruned complex projection. |
1505 | | // For example, if we have a struct column `s` with children `id` and `name`, |
1506 | | // and the projection only keeps `s.name`, then we need to rebuild the file type of `s` to only |
1507 | | // contain `name` so that the file reader can read it correctly. |
1508 | 0 | static Status rebuild_projected_file_type(ColumnMapping* mapping) { |
1509 | 0 | if (mapping == nullptr) { |
1510 | 0 | return Status::InvalidArgument("mapping is null"); |
1511 | 0 | } |
1512 | 0 | if (mapping->original_file_type == nullptr) { |
1513 | 0 | mapping->original_file_type = mapping->file_type; |
1514 | 0 | } |
1515 | 0 | DORIS_CHECK( |
1516 | 0 | is_complex_type(remove_nullable(mapping->original_file_type)->get_primitive_type())); |
1517 | 0 | RETURN_IF_ERROR(build_projected_child_type(mapping->original_file_type, mapping->child_mappings, |
1518 | 0 | &mapping->file_type)); |
1519 | 0 | mapping->is_trivial = |
1520 | 0 | mapping->table_type != nullptr && mapping->table_type->equals(*mapping->file_type); |
1521 | 0 | mapping->has_complex_projection = true; |
1522 | 0 | return Status::OK(); |
1523 | 0 | } |
1524 | | |
1525 | | using FilterProjectionMap = std::map<LocalColumnId, LocalColumnIndex>; |
1526 | | |
1527 | | static Status apply_projection_to_mapping_file_type(const LocalColumnIndex& projection, |
1528 | 0 | ColumnMapping* mapping) { |
1529 | 0 | DORIS_CHECK(mapping != nullptr); |
1530 | 0 | if (mapping->original_file_type == nullptr) { |
1531 | 0 | mapping->original_file_type = mapping->file_type; |
1532 | 0 | } |
1533 | 0 | if (mapping->original_file_type == nullptr || |
1534 | 0 | !is_complex_type(remove_nullable(mapping->original_file_type)->get_primitive_type())) { |
1535 | 0 | return Status::OK(); |
1536 | 0 | } |
1537 | 0 | DataTypePtr projected_type; |
1538 | 0 | RETURN_IF_ERROR(build_projected_type_from_projection(mapping->original_file_type, |
1539 | 0 | mapping->original_file_children, |
1540 | 0 | projection, &projected_type)); |
1541 | 0 | mapping->file_type = std::move(projected_type); |
1542 | 0 | mapping->has_complex_projection = !projection.project_all_children; |
1543 | 0 | mapping->is_trivial = |
1544 | 0 | mapping->table_type != nullptr && mapping->table_type->equals(*mapping->file_type); |
1545 | 0 | return Status::OK(); |
1546 | 0 | } |
1547 | | |
1548 | | static Status merge_filter_projection(const FilterProjectionMap* filter_projections, |
1549 | 0 | LocalColumnIndex* projection) { |
1550 | 0 | DORIS_CHECK(projection != nullptr); |
1551 | 0 | if (filter_projections == nullptr) { |
1552 | 0 | return Status::OK(); |
1553 | 0 | } |
1554 | 0 | const auto filter_projection_it = filter_projections->find(projection->column_id()); |
1555 | 0 | if (filter_projection_it == filter_projections->end()) { |
1556 | 0 | return Status::OK(); |
1557 | 0 | } |
1558 | 0 | RETURN_IF_ERROR(merge_local_column_index(projection, filter_projection_it->second)); |
1559 | 0 | return Status::OK(); |
1560 | 0 | } |
1561 | | |
1562 | | static Status add_scan_column(FileScanRequest* file_request, ColumnMapping* mapping, |
1563 | | std::vector<LocalColumnIndex>* scan_columns, bool is_predicate_column, |
1564 | 0 | const FilterProjectionMap* filter_projections = nullptr) { |
1565 | 0 | const auto file_column_id = LocalColumnId(mapping->file_local_id.value()); |
1566 | 0 | if (!is_predicate_column && |
1567 | 0 | std::ranges::find_if(file_request->predicate_columns, [&](const LocalColumnIndex& p) { |
1568 | 0 | return p.column_id() == file_column_id; |
1569 | 0 | }) != file_request->predicate_columns.end()) { |
1570 | | // This column is already projected for predicates, so skip adding it to non-predicate columns. |
1571 | 0 | return Status::OK(); |
1572 | 0 | } |
1573 | | // local_positions is the global read-column index for this scan request, so it also |
1574 | | // deduplicates predicate_columns and non_predicate_columns across all filter/projection paths. |
1575 | 0 | const bool newly_added = file_request->local_positions.count(file_column_id) == 0; |
1576 | 0 | if (newly_added) { |
1577 | 0 | file_request->local_positions.emplace(file_column_id, |
1578 | 0 | LocalIndex(file_request->local_positions.size())); |
1579 | 0 | } |
1580 | 0 | LocalColumnIndex projection = LocalColumnIndex::top_level(file_column_id); |
1581 | 0 | if (mapping->has_complex_projection) { |
1582 | 0 | if (complex_projection_has_pruned_children(*mapping)) { |
1583 | 0 | RETURN_IF_ERROR(rebuild_projected_file_type(mapping)); |
1584 | 0 | } |
1585 | 0 | RETURN_IF_ERROR(build_complex_projection(*mapping, &projection)); |
1586 | 0 | } |
1587 | 0 | if (is_predicate_column) { |
1588 | 0 | DCHECK(filter_projections != nullptr); |
1589 | | // TODO: merge non-predicate projections for the same column as well, to avoid duplicated projections when the same column is used in multiple predicates. |
1590 | 0 | RETURN_IF_ERROR(merge_filter_projection(filter_projections, &projection)); |
1591 | 0 | } |
1592 | 0 | auto existing_projection_it = std::ranges::find_if( |
1593 | 0 | *scan_columns, |
1594 | 0 | [&](const LocalColumnIndex& p) { return p.column_id() == file_column_id; }); |
1595 | 0 | auto exists = existing_projection_it != scan_columns->end(); |
1596 | 0 | if (exists) { |
1597 | 0 | RETURN_IF_ERROR(merge_local_column_index(&*existing_projection_it, projection)); |
1598 | 0 | } else { |
1599 | 0 | scan_columns->push_back(std::move(projection)); |
1600 | 0 | } |
1601 | | // FIXME: only `apply_projection_to_mapping_file_type` if exists == true ? |
1602 | 0 | RETURN_IF_ERROR(apply_projection_to_mapping_file_type( |
1603 | 0 | exists ? *existing_projection_it : scan_columns->back(), mapping)); |
1604 | 0 | if (is_predicate_column) { |
1605 | | // TODO: if the same column is used in both predicate and non-predicate projections, we can merge the two projections and only keep it in predicate_columns. |
1606 | 0 | file_request->non_predicate_columns.erase( |
1607 | 0 | std::ranges::find_if( |
1608 | 0 | file_request->non_predicate_columns, |
1609 | 0 | [&](const LocalColumnIndex& p) { return p.column_id() == file_column_id; }), |
1610 | 0 | file_request->non_predicate_columns.end()); |
1611 | 0 | } |
1612 | 0 | return Status::OK(); |
1613 | 0 | } |
1614 | | |
1615 | | static Status build_filter_projection_map(const std::vector<TableFilter>& table_filters, |
1616 | | std::vector<ColumnMapping>* mappings, |
1617 | 0 | FilterProjectionMap* filter_projections) { |
1618 | 0 | DORIS_CHECK(mappings != nullptr); |
1619 | 0 | DORIS_CHECK(filter_projections != nullptr); |
1620 | 0 | filter_projections->clear(); |
1621 | 0 | for (const auto& table_filter : table_filters) { |
1622 | 0 | if (table_filter.conjunct == nullptr) { |
1623 | 0 | continue; |
1624 | 0 | } |
1625 | 0 | std::vector<NestedStructPath> paths; |
1626 | 0 | collect_nested_struct_paths(table_filter.conjunct->root(), &paths); |
1627 | 0 | for (const auto& path : paths) { |
1628 | 0 | auto mapping_it = std::ranges::find_if(*mappings, [&](const ColumnMapping& mapping) { |
1629 | 0 | return mapping.global_index == path.root_global_index; |
1630 | 0 | }); |
1631 | 0 | if (mapping_it == mappings->end() || !mapping_it->file_local_id.has_value() || |
1632 | 0 | path.selectors.empty()) { |
1633 | 0 | continue; |
1634 | 0 | } |
1635 | | |
1636 | 0 | LocalColumnIndex root_projection; |
1637 | 0 | const ColumnMapping* leaf_mapping = nullptr; |
1638 | 0 | if (!resolve_nested_projection_with_index_mapping(path, *mappings, &root_projection, |
1639 | 0 | &leaf_mapping)) { |
1640 | 0 | LocalColumnIndex child_projection; |
1641 | 0 | RETURN_IF_ERROR(build_filter_projection_path(*mapping_it, path.selectors, |
1642 | 0 | &child_projection)); |
1643 | 0 | if (child_projection.field_id() < 0) { |
1644 | 0 | continue; |
1645 | 0 | } |
1646 | 0 | root_projection = LocalColumnIndex::partial_field(*mapping_it->file_local_id); |
1647 | 0 | root_projection.children.push_back(std::move(child_projection)); |
1648 | 0 | } |
1649 | 0 | auto filter_projection_it = filter_projections->find(root_projection.column_id()); |
1650 | 0 | if (filter_projection_it == filter_projections->end()) { |
1651 | 0 | filter_projections->emplace(root_projection.column_id(), |
1652 | 0 | std::move(root_projection)); |
1653 | 0 | continue; |
1654 | 0 | } |
1655 | 0 | RETURN_IF_ERROR( |
1656 | 0 | merge_local_column_index(&filter_projection_it->second, root_projection)); |
1657 | 0 | } |
1658 | 0 | } |
1659 | 0 | return Status::OK(); |
1660 | 0 | } |
1661 | | |
1662 | 0 | static void rebuild_projection(ColumnMapping* mapping, LocalIndex block_position) { |
1663 | 0 | DORIS_CHECK(mapping->file_local_id.has_value()); |
1664 | 0 | if (mapping->is_trivial || mapping->has_complex_projection) { |
1665 | 0 | mapping->projection = VExprContext::create_shared(TableSlotRef::create_shared( |
1666 | 0 | cast_set<int>(block_position.value()), cast_set<int>(block_position.value()), -1, |
1667 | 0 | mapping->file_type, mapping->file_column_name)); |
1668 | 0 | return; |
1669 | 0 | } |
1670 | | |
1671 | 0 | auto expr = Cast::create_shared(mapping->table_type); |
1672 | 0 | expr->add_child(TableSlotRef::create_shared(cast_set<int>(block_position.value()), |
1673 | 0 | cast_set<int>(block_position.value()), -1, |
1674 | 0 | mapping->file_type, mapping->file_column_name)); |
1675 | 0 | mapping->projection = VExprContext::create_shared(expr); |
1676 | 0 | } |
1677 | | |
1678 | | // Build file slot rewrite info from the localized filter targets. Only local targets can enter |
1679 | | // file-reader expressions; constant and unset targets stay above the file reader. |
1680 | | static std::map<GlobalIndex, FileSlotRewriteInfo> build_file_slot_rewrite_map( |
1681 | | const std::vector<ColumnMapping>& mappings, |
1682 | 0 | const std::map<GlobalIndex, FilterEntry>& filter_entries) { |
1683 | 0 | std::map<GlobalIndex, FileSlotRewriteInfo> global_to_file_slot; |
1684 | 0 | for (const auto& mapping : mappings) { |
1685 | 0 | const auto entry_it = filter_entries.find(mapping.global_index); |
1686 | 0 | if (entry_it == filter_entries.end() || !entry_it->second.is_local()) { |
1687 | 0 | continue; |
1688 | 0 | } |
1689 | 0 | DORIS_CHECK(mapping.file_local_id.has_value()); |
1690 | 0 | global_to_file_slot.emplace( |
1691 | 0 | mapping.global_index, |
1692 | 0 | FileSlotRewriteInfo {.block_position = entry_it->second.local_index().value(), |
1693 | 0 | .file_type = mapping.file_type, |
1694 | 0 | .table_type = mapping.table_type, |
1695 | 0 | .file_column_name = mapping.file_column_name}); |
1696 | 0 | } |
1697 | 0 | return global_to_file_slot; |
1698 | 0 | } |
1699 | | |
1700 | | static const ColumnDefinition* find_file_child_by_table_column( |
1701 | | const ColumnDefinition& table_column, const std::vector<ColumnDefinition>& file_children, |
1702 | 0 | TableColumnMappingMode mode) { |
1703 | 0 | return matcher_for_mode(mode).find(table_column, file_children); |
1704 | 0 | } |
1705 | | |
1706 | | static const ColumnDefinition* find_file_child_for_complex_wrapper( |
1707 | | const ColumnDefinition& table_child, const ColumnDefinition& file_field, |
1708 | 0 | TableColumnMappingMode mode) { |
1709 | 0 | if (file_field.children.empty()) { |
1710 | 0 | return nullptr; |
1711 | 0 | } |
1712 | 0 | return find_file_child_by_table_column(table_child, file_field.children, mode); |
1713 | 0 | } |
1714 | | |
1715 | | Status TableColumnMapper::create_mapping(const std::vector<ColumnDefinition>& projected_columns, |
1716 | | const std::map<std::string, Field>& partition_values, |
1717 | 0 | const std::vector<ColumnDefinition>& file_schema) { |
1718 | 0 | clear(); |
1719 | 0 | for (size_t column_idx = 0; column_idx < projected_columns.size(); ++column_idx) { |
1720 | 0 | const auto& table_column = projected_columns[column_idx]; |
1721 | 0 | ColumnMapping mapping; |
1722 | 0 | mapping.global_index = GlobalIndex(column_idx); |
1723 | 0 | mapping.table_column_name = table_column.name; |
1724 | 0 | mapping.table_type = table_column.type; |
1725 | 0 | if (const auto* partition_value = find_partition_value(table_column, partition_values); |
1726 | 0 | table_column.is_partition_key && partition_value != nullptr) { |
1727 | | // 1. Partition column, use partition value as a constant mapping. Note that partition column may also have default expression, but partition value should take precedence if it exists. |
1728 | 0 | _set_constant_mapping( |
1729 | 0 | &mapping, VExprContext::create_shared(TableLiteral::create_shared( |
1730 | 0 | mapping.table_type, *partition_value))); |
1731 | 0 | } else if (_options.mode == TableColumnMappingMode::BY_INDEX && |
1732 | 0 | !table_column.is_partition_key) { |
1733 | | // 2. BY_INDEX mapping, use the file column at the position specified by `ColumnDefinition::identifier` as a direct mapping. This mode is only used by Hive. |
1734 | 0 | RETURN_IF_ERROR(_create_by_index_mapping(table_column, file_schema, &mapping)); |
1735 | 0 | } else if (const auto* file_field = _find_file_field(table_column, file_schema)) { |
1736 | | // 3. Table column has a matching file column, use it as a direct mapping. |
1737 | 0 | RETURN_IF_ERROR(_create_direct_mapping(table_column, *file_field, &mapping)); |
1738 | 0 | } else if (table_column.default_expr != nullptr) { |
1739 | | // 4. Table column does not exist in file (column adding by schema evolution), which has a default expression, use it as a constant mapping. |
1740 | 0 | _set_constant_mapping(&mapping, table_column.default_expr); |
1741 | 0 | } else if (table_column.name == ROW_LINEAGE_ROW_ID) { |
1742 | | // 5. Virtual column, use special mapping to indicate it should be materialized by table reader instead of read from file or evaluated from expression. |
1743 | 0 | mapping.virtual_column_type = TableVirtualColumnType::ROW_ID; |
1744 | 0 | } else if (table_column.name == ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) { |
1745 | 0 | mapping.virtual_column_type = TableVirtualColumnType::LAST_UPDATED_SEQUENCE_NUMBER; |
1746 | 0 | } else { |
1747 | 0 | if (table_column.is_partition_key) { |
1748 | 0 | return Status::InvalidArgument( |
1749 | 0 | "Table column '{}' (global_index={}) does not have a matching partition " |
1750 | 0 | "value", |
1751 | 0 | table_column.name, mapping.global_index.value()); |
1752 | 0 | } |
1753 | 0 | if (!_options.allow_missing_columns) { |
1754 | 0 | return Status::InvalidArgument( |
1755 | 0 | "Table column '{}' (global_index={}) does not have a matching file column", |
1756 | 0 | table_column.name, mapping.global_index.value()); |
1757 | 0 | } |
1758 | 0 | } |
1759 | 0 | _mappings.push_back(std::move(mapping)); |
1760 | 0 | } |
1761 | 0 | return Status::OK(); |
1762 | 0 | } |
1763 | | |
1764 | | Status TableColumnMapper::_create_by_index_mapping(const ColumnDefinition& table_column, |
1765 | | const std::vector<ColumnDefinition>& file_schema, |
1766 | 0 | ColumnMapping* mapping) { |
1767 | 0 | DORIS_CHECK(mapping != nullptr); |
1768 | 0 | DORIS_CHECK(!table_column.is_partition_key); |
1769 | | |
1770 | | // Key contract: in BY_INDEX mode, `ColumnDefinition::identifier` TYPE_INT is interpreted as the |
1771 | | // 0-based position of this column inside `file_schema`. FE writes the physical file position |
1772 | | // of each non-partition projected column into that identifier. This interpretation allows: |
1773 | | // - sparse projection: read only a subset of file columns (for example only `_col2` |
1774 | | // and `_col4`); |
1775 | | // - column reordering: table column order differs from file column order; |
1776 | | // - no many-to-one mapping: FE must guarantee that each file position is referenced by at |
1777 | | // most one table column. |
1778 | 0 | const auto file_index = table_column.get_identifier_position(); |
1779 | | |
1780 | | // Case A: file_index is in range, so build a direct positional mapping. |
1781 | | // The file column name (for example `_col0`) is intentionally ignored here. |
1782 | 0 | if (file_index >= 0 && static_cast<size_t>(file_index) < file_schema.size()) { |
1783 | 0 | return _create_direct_mapping(table_column, file_schema[static_cast<size_t>(file_index)], |
1784 | 0 | mapping); |
1785 | 0 | } |
1786 | | |
1787 | | // Case B: file_index is out of range, which means the file does not contain this column. |
1788 | | // Route it through the missing-column path used by schema evolution. |
1789 | | // B1: the table column carries a default_expr injected by FE, so use the constant branch and |
1790 | | // materialize that value for every row. |
1791 | 0 | if (table_column.default_expr != nullptr) { |
1792 | 0 | _set_constant_mapping(mapping, table_column.default_expr); |
1793 | 0 | return Status::OK(); |
1794 | 0 | } |
1795 | | // B2: if missing columns are not allowed, fail explicitly instead of silently producing |
1796 | | // NULLs and hiding the issue. |
1797 | 0 | if (!_options.allow_missing_columns) { |
1798 | 0 | return Status::InvalidArgument( |
1799 | 0 | "Table column '{}' (file_index={}) is out of range for file schema of size {}", |
1800 | 0 | table_column.name, file_index, file_schema.size()); |
1801 | 0 | } |
1802 | | // B3: if missing columns are allowed, keep the mapping empty |
1803 | | // (`file_column_id` remains `nullopt`) and let the upper finalize stage fill |
1804 | | // NULL/default values. |
1805 | 0 | return Status::OK(); |
1806 | 0 | } |
1807 | | |
1808 | 0 | void TableColumnMapper::_set_constant_mapping(ColumnMapping* mapping, VExprContextSPtr expr) { |
1809 | 0 | DORIS_CHECK(mapping != nullptr); |
1810 | 0 | DORIS_CHECK(expr != nullptr); |
1811 | 0 | mapping->default_expr = std::move(expr); |
1812 | 0 | mapping->constant_index = _constant_map.add(ConstantEntry { |
1813 | 0 | .global_index = mapping->global_index, |
1814 | 0 | .expr = mapping->default_expr, |
1815 | 0 | .type = mapping->table_type, |
1816 | 0 | }); |
1817 | 0 | mapping->filter_conversion = FilterConversionType::CONSTANT; |
1818 | 0 | } |
1819 | | |
1820 | 0 | Status TableColumnMapper::_build_filter_entries(const FileScanRequest& file_request) { |
1821 | 0 | _filter_entries.clear(); |
1822 | 0 | for (const auto& mapping : _mappings) { |
1823 | 0 | FilterEntry entry; |
1824 | 0 | if (mapping.constant_index.has_value()) { |
1825 | 0 | entry = FilterEntry::constant(*mapping.constant_index); |
1826 | 0 | } else if (mapping.file_local_id.has_value() && |
1827 | 0 | filter_conversion_has_local_source(mapping.filter_conversion)) { |
1828 | 0 | const auto local_position_it = |
1829 | 0 | file_request.local_positions.find(LocalColumnId(*mapping.file_local_id)); |
1830 | 0 | if (local_position_it != file_request.local_positions.end()) { |
1831 | 0 | entry = FilterEntry::local(local_position_it->second); |
1832 | 0 | } |
1833 | 0 | } |
1834 | 0 | _filter_entries.emplace(mapping.global_index, entry); |
1835 | 0 | } |
1836 | 0 | return Status::OK(); |
1837 | 0 | } |
1838 | | |
1839 | | Status TableColumnMapper::create_scan_request( |
1840 | | const std::vector<TableFilter>& table_filters, |
1841 | | const TableColumnPredicates& table_column_predicates, |
1842 | | const std::vector<ColumnDefinition>& projected_columns, FileScanRequest* file_request, |
1843 | 0 | RuntimeState* runtime_state) { |
1844 | | // FileReader evaluates expressions against a file-local block. This mapper owns the |
1845 | | // table-column to file-column conversion, so it also owns the file-local block positions. |
1846 | 0 | file_request->predicate_columns.clear(); |
1847 | 0 | file_request->non_predicate_columns.clear(); |
1848 | 0 | file_request->local_positions.clear(); |
1849 | 0 | file_request->conjuncts.clear(); |
1850 | 0 | file_request->delete_conjuncts.clear(); |
1851 | 0 | file_request->column_predicate_filters.clear(); |
1852 | 0 | _filter_entries.clear(); |
1853 | | // 1. Build referenced non-predicate columns |
1854 | 0 | for (size_t column_idx = 0; column_idx < projected_columns.size(); ++column_idx) { |
1855 | 0 | const auto global_index = GlobalIndex(column_idx); |
1856 | 0 | auto* mapping = _find_mapping(global_index); |
1857 | 0 | if (mapping != nullptr && mapping->file_local_id.has_value()) { |
1858 | | // A file column can be read lazily as a non-predicate column only when it is not used |
1859 | | // by row-level expression filters. Single-column ColumnPredicate filters are pruning |
1860 | | // hints only and must not force row-level predicate materialization. |
1861 | 0 | bool used_by_filter = false; |
1862 | 0 | for (const auto& table_filter : table_filters) { |
1863 | 0 | const auto& global_indices = table_filter.global_indices; |
1864 | 0 | if (std::find(global_indices.begin(), global_indices.end(), global_index) != |
1865 | 0 | global_indices.end() && |
1866 | 0 | filter_conversion_has_local_source(mapping->filter_conversion)) { |
1867 | 0 | used_by_filter = true; |
1868 | 0 | break; |
1869 | 0 | } |
1870 | 0 | } |
1871 | 0 | if (!used_by_filter) { |
1872 | 0 | RETURN_IF_ERROR(add_scan_column(file_request, mapping, |
1873 | 0 | &file_request->non_predicate_columns, false)); |
1874 | 0 | } |
1875 | 0 | } |
1876 | 0 | } |
1877 | | // 2. Build referenced predicate columns |
1878 | 0 | RETURN_IF_ERROR( |
1879 | 0 | localize_filters(table_filters, table_column_predicates, file_request, runtime_state)); |
1880 | | // 3. Re-build projections for all referenced file columns to point to the correct file-local block positions. |
1881 | 0 | for (auto& mapping : _mappings) { |
1882 | 0 | if (!mapping.file_local_id.has_value()) { |
1883 | 0 | continue; |
1884 | 0 | } |
1885 | 0 | auto position_it = |
1886 | 0 | file_request->local_positions.find(LocalColumnId(*mapping.file_local_id)); |
1887 | 0 | DORIS_CHECK(position_it != file_request->local_positions.end()) |
1888 | 0 | << file_request->local_positions.size() << " " << *mapping.file_local_id << " " |
1889 | 0 | << mapping.file_column_name; |
1890 | 0 | rebuild_projection(&mapping, position_it->second); |
1891 | 0 | } |
1892 | 0 | return Status::OK(); |
1893 | 0 | } |
1894 | | |
1895 | 0 | ColumnMapping* TableColumnMapper::_find_mapping(GlobalIndex global_index) { |
1896 | 0 | for (auto& mapping : _mappings) { |
1897 | 0 | if (mapping.global_index == global_index) { |
1898 | 0 | return &mapping; |
1899 | 0 | } |
1900 | 0 | } |
1901 | 0 | return nullptr; |
1902 | 0 | } |
1903 | | |
1904 | | Status TableColumnMapper::localize_filters(const std::vector<TableFilter>& table_filters, |
1905 | | const TableColumnPredicates& table_column_predicates, |
1906 | | FileScanRequest* file_request, |
1907 | 0 | RuntimeState* runtime_state) { |
1908 | 0 | FilterProjectionMap filter_projections; |
1909 | 0 | RETURN_IF_ERROR(build_filter_projection_map(table_filters, &_mappings, &filter_projections)); |
1910 | 0 | for (const auto& table_filter : table_filters) { |
1911 | 0 | for (const auto& global_index : table_filter.global_indices) { |
1912 | 0 | auto* mapping = _find_mapping(global_index); |
1913 | 0 | if (mapping == nullptr || !mapping->file_local_id.has_value() || |
1914 | 0 | !filter_conversion_has_local_source(mapping->filter_conversion)) { |
1915 | 0 | continue; |
1916 | 0 | } |
1917 | 0 | RETURN_IF_ERROR(add_scan_column(file_request, mapping, &file_request->predicate_columns, |
1918 | 0 | true, &filter_projections)); |
1919 | 0 | } |
1920 | 0 | } |
1921 | 0 | RETURN_IF_ERROR(_build_filter_entries(*file_request)); |
1922 | | |
1923 | | // Build the complete table-slot rewrite map after all predicate columns have been assigned. |
1924 | | // This keeps expression localization independent from filter iteration order. |
1925 | 0 | const auto global_to_file_slot = build_file_slot_rewrite_map(_mappings, _filter_entries); |
1926 | 0 | for (const auto& table_filter : table_filters) { |
1927 | 0 | if (table_filter.conjunct != nullptr && |
1928 | 0 | table_filter_has_only_local_entries(table_filter, _filter_entries)) { |
1929 | 0 | RewriteContext rewrite_context {.runtime_state = runtime_state}; |
1930 | 0 | VExprSPtr rewrite_root; |
1931 | 0 | const auto clone_status = |
1932 | 0 | clone_table_expr_tree(table_filter.conjunct->root(), &rewrite_root); |
1933 | 0 | if (!clone_status.ok()) { |
1934 | 0 | continue; |
1935 | 0 | } |
1936 | 0 | auto localized_root = rewrite_table_expr_to_file_expr(rewrite_root, global_to_file_slot, |
1937 | 0 | &rewrite_context); |
1938 | 0 | auto localized_conjunct = VExprContext::create_shared(std::move(localized_root)); |
1939 | 0 | RETURN_IF_ERROR(rewrite_context.prepare_created_exprs(localized_conjunct.get())); |
1940 | 0 | file_request->conjuncts.push_back(std::move(localized_conjunct)); |
1941 | 0 | } |
1942 | 0 | } |
1943 | 0 | for (const auto& [global_index, predicates] : table_column_predicates) { |
1944 | 0 | const auto* mapping = _find_mapping(global_index); |
1945 | 0 | const auto entry_it = _filter_entries.find(global_index); |
1946 | 0 | if (mapping == nullptr || !mapping->file_local_id.has_value() || predicates.empty() || |
1947 | 0 | entry_it == _filter_entries.end() || !entry_it->second.is_local() || |
1948 | 0 | !column_predicate_can_use_local_source(mapping->filter_conversion) || |
1949 | 0 | mapping->file_type == nullptr) { |
1950 | 0 | continue; |
1951 | 0 | } |
1952 | 0 | FileColumnPredicateFilter column_predicate_filter; |
1953 | 0 | column_predicate_filter.file_column_id = LocalColumnId(*mapping->file_local_id); |
1954 | 0 | const auto file_primitive_type = remove_nullable(mapping->file_type)->get_primitive_type(); |
1955 | 0 | for (const auto& predicate : predicates) { |
1956 | 0 | DORIS_CHECK(predicate != nullptr); |
1957 | 0 | if (predicate->primitive_type() == file_primitive_type) { |
1958 | 0 | column_predicate_filter.predicates.push_back(predicate); |
1959 | 0 | } |
1960 | 0 | } |
1961 | 0 | if (column_predicate_filter.predicates.empty()) { |
1962 | 0 | continue; |
1963 | 0 | } |
1964 | 0 | file_request->column_predicate_filters.push_back(std::move(column_predicate_filter)); |
1965 | 0 | } |
1966 | 0 | for (const auto& table_filter : table_filters) { |
1967 | 0 | if (table_filter.conjunct == nullptr || |
1968 | 0 | !table_filter_has_only_local_entries(table_filter, _filter_entries)) { |
1969 | 0 | continue; |
1970 | 0 | } |
1971 | 0 | std::vector<FileColumnPredicateFilter> nested_column_predicate_filters; |
1972 | 0 | collect_nested_column_predicate_filters(table_filter.conjunct->root(), _mappings, |
1973 | 0 | &nested_column_predicate_filters); |
1974 | 0 | for (auto& column_predicate_filter : nested_column_predicate_filters) { |
1975 | 0 | merge_column_predicate_filter(std::move(column_predicate_filter), |
1976 | 0 | &file_request->column_predicate_filters); |
1977 | 0 | } |
1978 | 0 | } |
1979 | 0 | return Status::OK(); |
1980 | 0 | } |
1981 | | |
1982 | | const ColumnDefinition* TableColumnMapper::_find_file_field( |
1983 | | const ColumnDefinition& table_column, |
1984 | 0 | const std::vector<ColumnDefinition>& file_schema) const { |
1985 | 0 | return matcher_for_mode(_options.mode).find(table_column, file_schema); |
1986 | 0 | } |
1987 | | |
1988 | | Status TableColumnMapper::_create_direct_mapping(const ColumnDefinition& table_column, |
1989 | | const ColumnDefinition& file_field, |
1990 | 0 | ColumnMapping* mapping) const { |
1991 | 0 | DORIS_CHECK(mapping != nullptr); |
1992 | 0 | DORIS_CHECK(file_field.local_id >= 0); |
1993 | 0 | mapping->file_local_id = file_field.local_id; |
1994 | 0 | mapping->table_column_name = table_column.name; |
1995 | 0 | mapping->file_column_name = file_field.name; |
1996 | 0 | mapping->original_file_type = file_field.type; |
1997 | 0 | mapping->original_file_children = file_field.children; |
1998 | 0 | mapping->file_type = file_field.type; |
1999 | 0 | mapping->is_trivial = mapping->table_type->equals(*mapping->file_type); |
2000 | 0 | mapping->filter_conversion = mapping->is_trivial ? FilterConversionType::COPY_DIRECTLY |
2001 | 0 | : FilterConversionType::CAST_FILTER; |
2002 | 0 | mapping->child_mappings.clear(); |
2003 | |
|
2004 | 0 | if (!table_column.children.empty()) { |
2005 | 0 | DORIS_CHECK(is_complex_type(mapping->file_type->get_primitive_type())); |
2006 | 0 | for (const auto& table_child : table_column.children) { |
2007 | 0 | const auto* file_child = |
2008 | 0 | find_file_child_for_complex_wrapper(table_child, file_field, _options.mode); |
2009 | 0 | if (file_child == nullptr) { |
2010 | 0 | if (!_options.allow_missing_columns) { |
2011 | 0 | return Status::InvalidArgument( |
2012 | 0 | "Table child column '{}' does not have a matching file child " |
2013 | 0 | "under column '{}'", |
2014 | 0 | table_child.name, table_column.name); |
2015 | 0 | } |
2016 | 0 | ColumnMapping child_mapping; |
2017 | 0 | child_mapping.table_column_name = table_child.name; |
2018 | 0 | child_mapping.file_column_name = table_child.name; |
2019 | 0 | child_mapping.table_type = table_child.type; |
2020 | 0 | child_mapping.file_type = table_child.type; |
2021 | 0 | child_mapping.has_complex_projection = !table_child.children.empty(); |
2022 | 0 | child_mapping.filter_conversion = FilterConversionType::FINALIZE_ONLY; |
2023 | 0 | mapping->child_mappings.push_back(std::move(child_mapping)); |
2024 | 0 | continue; |
2025 | 0 | } |
2026 | 0 | ColumnMapping child_mapping; |
2027 | 0 | child_mapping.table_column_name = table_child.name; |
2028 | 0 | child_mapping.table_type = table_child.type; |
2029 | 0 | RETURN_IF_ERROR(_create_direct_mapping(table_child, *file_child, &child_mapping)); |
2030 | 0 | mapping->child_mappings.push_back(std::move(child_mapping)); |
2031 | 0 | } |
2032 | 0 | if (complex_projection_has_pruned_children(*mapping)) { |
2033 | 0 | mapping->has_complex_projection = true; |
2034 | | // If complex projection prunes some children, we have to rebuild the projected file type to make sure the reader expression can find the correct child types by name. |
2035 | 0 | RETURN_IF_ERROR(build_projected_child_type(mapping->file_type, mapping->child_mappings, |
2036 | 0 | &mapping->file_type)); |
2037 | 0 | DCHECK(mapping->table_type != nullptr); |
2038 | 0 | mapping->is_trivial = mapping->table_type->equals(*mapping->file_type); |
2039 | | // TODO: ? READER_EXPRESSION |
2040 | 0 | mapping->filter_conversion = mapping->is_trivial |
2041 | 0 | ? FilterConversionType::COPY_DIRECTLY |
2042 | 0 | : FilterConversionType::READER_EXPRESSION; |
2043 | 0 | } |
2044 | 0 | } |
2045 | 0 | return Status::OK(); |
2046 | 0 | } |
2047 | | |
2048 | | } // namespace doris::format |