Coverage Report

Created: 2026-06-25 13:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format_v2/column_mapper.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format_v2/column_mapper.h"
19
20
#include <algorithm>
21
#include <cstddef>
22
#include <memory>
23
#include <sstream>
24
#include <string_view>
25
#include <utility>
26
#include <vector>
27
28
#include "common/consts.h"
29
#include "common/exception.h"
30
#include "common/status.h"
31
#include "core/data_type/convert_field_to_type.h"
32
#include "core/data_type/data_type_array.h"
33
#include "core/data_type/data_type_map.h"
34
#include "core/data_type/data_type_nullable.h"
35
#include "core/data_type/data_type_string.h"
36
#include "core/data_type/data_type_struct.h"
37
#include "core/data_type/primitive_type.h"
38
#include "exprs/runtime_filter_expr.h"
39
#include "exprs/short_circuit_evaluation_expr.h"
40
#include "exprs/vcase_expr.h"
41
#include "exprs/vcast_expr.h"
42
#include "exprs/vcondition_expr.h"
43
#include "exprs/vectorized_fn_call.h"
44
#include "exprs/vexpr_context.h"
45
#include "exprs/vin_predicate.h"
46
#include "exprs/vliteral.h"
47
#include "format_v2/column_mapper_nested.h"
48
#include "format_v2/expr/cast.h"
49
#include "format_v2/file_reader.h"
50
#include "format_v2/schema_projection.h"
51
#include "format_v2/table_reader.h"
52
#include "gen_cpp/Exprs_types.h"
53
54
namespace doris::format {
55
56
namespace {
57
58
0
std::string mapping_mode_to_string(TableColumnMappingMode mode) {
59
0
    switch (mode) {
60
0
    case TableColumnMappingMode::BY_FIELD_ID:
61
0
        return "BY_FIELD_ID";
62
0
    case TableColumnMappingMode::BY_NAME:
63
0
        return "BY_NAME";
64
0
    case TableColumnMappingMode::BY_INDEX:
65
0
        return "BY_INDEX";
66
0
    }
67
0
    return "UNKNOWN";
68
0
}
69
70
376
bool column_has_name(const ColumnDefinition& column, const std::string& name) {
71
376
    if (to_lower(column.name) == to_lower(name)) {
72
160
        return true;
73
160
    }
74
216
    if (column.has_identifier_name() && to_lower(column.get_identifier_name()) == to_lower(name)) {
75
0
        return true;
76
0
    }
77
216
    return std::ranges::any_of(column.name_mapping, [&](const std::string& alias) {
78
1
        return to_lower(alias) == to_lower(name);
79
1
    });
80
216
}
81
82
255
bool column_names_match(const ColumnDefinition& lhs, const ColumnDefinition& rhs) {
83
255
    if (column_has_name(rhs, lhs.name)) {
84
148
        return true;
85
148
    }
86
107
    if (lhs.has_identifier_name() && column_has_name(rhs, lhs.get_identifier_name())) {
87
1
        return true;
88
1
    }
89
106
    return std::ranges::any_of(lhs.name_mapping, [&](const std::string& alias) {
90
18
        return column_has_name(rhs, alias);
91
18
    });
92
107
}
93
94
class ColumnMatcher {
95
public:
96
3
    virtual ~ColumnMatcher() = default;
97
    virtual const ColumnDefinition* find(
98
            const ColumnDefinition& table_column,
99
            const std::vector<ColumnDefinition>& file_schema) const = 0;
100
};
101
102
class FieldIdMatcher final : public ColumnMatcher {
103
public:
104
    const ColumnDefinition* find(const ColumnDefinition& table_column,
105
108
                                 const std::vector<ColumnDefinition>& file_schema) const override {
106
108
        if (!table_column.has_identifier_field_id()) {
107
7
            return nullptr;
108
7
        }
109
101
        const auto field_id = table_column.get_identifier_field_id();
110
165
        const auto field_it = std::ranges::find_if(file_schema, [&](const ColumnDefinition& field) {
111
165
            return field.has_identifier_field_id() && field.get_identifier_field_id() == field_id;
112
165
        });
113
101
        return field_it == file_schema.end() ? nullptr : &*field_it;
114
108
    }
115
};
116
117
class NameMatcher final : public ColumnMatcher {
118
public:
119
    const ColumnDefinition* find(const ColumnDefinition& table_column,
120
185
                                 const std::vector<ColumnDefinition>& file_schema) const override {
121
255
        const auto field_it = std::ranges::find_if(file_schema, [&](const ColumnDefinition& field) {
122
255
            return column_names_match(table_column, field);
123
255
        });
124
185
        return field_it == file_schema.end() ? nullptr : &*field_it;
125
185
    }
126
};
127
128
class PositionMatcher final : public ColumnMatcher {
129
public:
130
    const ColumnDefinition* find(const ColumnDefinition& table_column,
131
2
                                 const std::vector<ColumnDefinition>& file_schema) const override {
132
2
        if (!table_column.has_identifier_field_id()) {
133
2
            return nullptr;
134
2
        }
135
0
        const auto position = table_column.get_identifier_position();
136
0
        if (position < 0 || static_cast<size_t>(position) >= file_schema.size()) {
137
0
            return nullptr;
138
0
        }
139
0
        return &file_schema[static_cast<size_t>(position)];
140
0
    }
141
};
142
143
292
const ColumnMatcher& matcher_for_mode(TableColumnMappingMode mode) {
144
292
    static const FieldIdMatcher field_id_matcher;
145
292
    static const NameMatcher name_matcher;
146
292
    static const PositionMatcher position_matcher;
147
292
    switch (mode) {
148
108
    case TableColumnMappingMode::BY_FIELD_ID:
149
108
        return field_id_matcher;
150
182
    case TableColumnMappingMode::BY_NAME:
151
182
        return name_matcher;
152
2
    case TableColumnMappingMode::BY_INDEX:
153
2
        return position_matcher;
154
292
    }
155
0
    return field_id_matcher;
156
292
}
157
158
0
std::string virtual_column_type_to_string(TableVirtualColumnType type) {
159
0
    switch (type) {
160
0
    case TableVirtualColumnType::INVALID:
161
0
        return "INVALID";
162
0
    case TableVirtualColumnType::ROW_ID:
163
0
        return "ROW_ID";
164
0
    case TableVirtualColumnType::LAST_UPDATED_SEQUENCE_NUMBER:
165
0
        return "LAST_UPDATED_SEQUENCE_NUMBER";
166
0
    case TableVirtualColumnType::ICEBERG_ROWID:
167
0
        return "ICEBERG_ROWID";
168
0
    }
169
0
    return "UNKNOWN";
170
0
}
171
172
0
std::string filter_conversion_type_to_string(FilterConversionType type) {
173
0
    switch (type) {
174
0
    case FilterConversionType::COPY_DIRECTLY:
175
0
        return "COPY_DIRECTLY";
176
0
    case FilterConversionType::CAST_FILTER:
177
0
        return "CAST_FILTER";
178
0
    case FilterConversionType::READER_EXPRESSION:
179
0
        return "READER_EXPRESSION";
180
0
    case FilterConversionType::FINALIZE_ONLY:
181
0
        return "FINALIZE_ONLY";
182
0
    case FilterConversionType::CONSTANT:
183
0
        return "CONSTANT";
184
0
    }
185
0
    return "UNKNOWN";
186
0
}
187
188
0
std::string data_type_debug_string(const DataTypePtr& type) {
189
0
    return type == nullptr ? "null" : type->get_name();
190
0
}
191
192
const Field* find_partition_value(const ColumnDefinition& table_column,
193
230
                                  const std::map<std::string, Field>& partition_values) {
194
366
    const auto find_by_name = [&](const std::string& name) -> const Field* {
195
366
        const auto value_it = partition_values.find(name);
196
366
        return value_it == partition_values.end() ? nullptr : &value_it->second;
197
366
    };
198
230
    if (const auto* value = find_by_name(table_column.name); value != nullptr) {
199
9
        return value;
200
9
    }
201
221
    if (table_column.has_identifier_name()) {
202
130
        if (const auto* value = find_by_name(table_column.get_identifier_name());
203
130
            value != nullptr) {
204
0
            return value;
205
0
        }
206
130
    }
207
221
    for (const auto& alias : table_column.name_mapping) {
208
6
        if (const auto* value = find_by_name(alias); value != nullptr) {
209
1
            return value;
210
1
        }
211
6
    }
212
220
    return nullptr;
213
221
}
214
215
0
std::string field_debug_string(const Field& field) {
216
0
    std::ostringstream out;
217
0
    out << "Field{type=" << type_to_string(field.get_type()) << ", value=";
218
0
    switch (field.get_type()) {
219
0
    case TYPE_NULL:
220
0
        out << "null";
221
0
        break;
222
0
    case TYPE_INT:
223
0
        out << field.get<TYPE_INT>();
224
0
        break;
225
0
    case TYPE_BIGINT:
226
0
        out << field.get<TYPE_BIGINT>();
227
0
        break;
228
0
    case TYPE_STRING:
229
0
        out << field.get<TYPE_STRING>();
230
0
        break;
231
0
    default:
232
0
        out << field.to_debug_string(0);
233
0
        break;
234
0
    }
235
0
    out << "}";
236
0
    return out.str();
237
0
}
238
239
template <typename T, typename Formatter>
240
0
std::string join_debug_strings(const std::vector<T>& values, Formatter formatter) {
241
0
    std::ostringstream out;
242
0
    out << "[";
243
0
    for (size_t i = 0; i < values.size(); ++i) {
244
0
        if (i > 0) {
245
0
            out << ", ";
246
0
        }
247
0
        out << formatter(values[i]);
248
0
    }
249
0
    out << "]";
250
0
    return out.str();
251
0
}
Unexecuted instantiation: column_mapper.cpp:_ZN5doris6format12_GLOBAL__N_118join_debug_stringsINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEZNKS0_16ColumnDefinition12debug_stringEvE3$_0EES8_RKSt6vectorIT_SaISC_EET0_
Unexecuted instantiation: column_mapper.cpp:_ZN5doris6format12_GLOBAL__N_118join_debug_stringsINS0_16ColumnDefinitionEZNKS3_12debug_stringB5cxx11EvE3$_1EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISC_EET0_
Unexecuted instantiation: column_mapper.cpp:_ZN5doris6format12_GLOBAL__N_118join_debug_stringsINS0_16LocalColumnIndexEZNKS3_12debug_stringB5cxx11EvE3$_0EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISC_EET0_
Unexecuted instantiation: column_mapper.cpp:_ZN5doris6format12_GLOBAL__N_118join_debug_stringsINS0_16ColumnDefinitionEZNKS0_13ColumnMapping12debug_stringB5cxx11EvE3$_0EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_
Unexecuted instantiation: column_mapper.cpp:_ZN5doris6format12_GLOBAL__N_118join_debug_stringsINS0_13ColumnMappingEZNKS3_12debug_stringB5cxx11EvE3$_1EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISC_EET0_
Unexecuted instantiation: column_mapper.cpp:_ZN5doris6format12_GLOBAL__N_118join_debug_stringsINS0_13ColumnMappingEZNKS0_17TableColumnMapper12debug_stringB5cxx11EvE3$_0EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_
Unexecuted instantiation: column_mapper.cpp:_ZN5doris6format12_GLOBAL__N_118join_debug_stringsINS0_13ColumnMappingEZNKS0_17TableColumnMapper12debug_stringB5cxx11EvE3$_1EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_
252
253
} // namespace
254
255
struct FileSlotRewriteInfo {
256
    size_t block_position = 0;
257
    DataTypePtr file_type;
258
    DataTypePtr table_type;
259
    std::string file_column_name;
260
};
261
262
struct RewriteContext {
263
    RuntimeState* runtime_state = nullptr;
264
    std::vector<VExprSPtr> created_exprs {};
265
266
112
    void add_created_expr(VExprSPtr expr) { created_exprs.push_back(std::move(expr)); }
267
268
54
    Status prepare_created_exprs(VExprContext* context) const {
269
54
        DORIS_CHECK(context != nullptr);
270
54
        RowDescriptor row_desc;
271
111
        for (const auto& expr : created_exprs) {
272
111
            if (dynamic_cast<const Cast*>(expr.get()) != nullptr && runtime_state == nullptr) {
273
0
                return Status::InvalidArgument(
274
0
                        "RuntimeState is required to prepare rewritten cast expression {}",
275
0
                        expr->expr_name());
276
0
            }
277
111
            RETURN_IF_ERROR(expr->prepare(runtime_state, row_desc, context));
278
111
        }
279
54
        return Status::OK();
280
54
    }
281
};
282
283
static VExprSPtr create_file_slot_ref(const VSlotRef& slot_ref,
284
                                      const FileSlotRewriteInfo& rewrite_info,
285
60
                                      RewriteContext* rewrite_context) {
286
60
    auto ref =
287
60
            VSlotRef::create_shared(slot_ref.slot_id(), cast_set<int>(rewrite_info.block_position),
288
60
                                    -1, rewrite_info.file_type, rewrite_info.file_column_name);
289
60
    rewrite_context->add_created_expr(ref);
290
60
    return ref;
291
60
}
292
293
208
static bool is_cast_expr(const VExprSPtr& expr) {
294
208
    return dynamic_cast<const Cast*>(expr.get()) != nullptr;
295
208
}
296
297
136
static bool is_binary_comparison_predicate(const VExprSPtr& expr) {
298
136
    if (expr == nullptr || expr->get_num_children() != 2 ||
299
136
        (expr->node_type() != TExprNodeType::BINARY_PRED &&
300
95
         expr->node_type() != TExprNodeType::NULL_AWARE_BINARY_PRED)) {
301
95
        return false;
302
95
    }
303
41
    switch (expr->op()) {
304
1
    case TExprOpcode::EQ:
305
1
    case TExprOpcode::EQ_FOR_NULL:
306
1
    case TExprOpcode::NE:
307
1
    case TExprOpcode::GE:
308
38
    case TExprOpcode::GT:
309
38
    case TExprOpcode::LE:
310
41
    case TExprOpcode::LT:
311
41
        return true;
312
0
    default:
313
0
        return false;
314
41
    }
315
41
}
316
317
0
std::string TableColumnMapperOptions::debug_string() const {
318
0
    std::ostringstream out;
319
0
    out << "TableColumnMapperOptions{mode=" << mapping_mode_to_string(mode) << "}";
320
0
    return out.str();
321
0
}
322
323
0
std::string ColumnDefinition::debug_string() const {
324
0
    std::ostringstream out;
325
0
    out << "ColumnDefinition{name=" << name << ", identifier=" << field_debug_string(identifier)
326
0
        << ", name_mapping="
327
0
        << join_debug_strings(name_mapping, [](const std::string& name) { return name; })
328
0
        << ", local_id=" << local_id << ", type=" << data_type_debug_string(type) << ", children="
329
0
        << join_debug_strings(children,
330
0
                              [](const ColumnDefinition& child) { return child.debug_string(); })
331
0
        << ", has_default_expr=" << (default_expr != nullptr)
332
0
        << ", is_partition_key=" << is_partition_key << "}";
333
0
    return out.str();
334
0
}
335
336
0
std::string LocalColumnIndex::debug_string() const {
337
0
    std::ostringstream out;
338
0
    out << "LocalColumnIndex{index=" << index << ", project_all_children=" << project_all_children
339
0
        << ", children="
340
0
        << join_debug_strings(children,
341
0
                              [](const LocalColumnIndex& child) { return child.debug_string(); })
342
0
        << "}";
343
0
    return out.str();
344
0
}
345
346
0
std::string ColumnMapping::debug_string() const {
347
0
    std::ostringstream out;
348
0
    out << "ColumnMapping{global_index=" << global_index
349
0
        << ", table_column_name=" << table_column_name << ", file_local_id=";
350
0
    if (file_local_id.has_value()) {
351
0
        out << *file_local_id;
352
0
    } else {
353
0
        out << "null";
354
0
    }
355
0
    out << ", constant_index=";
356
0
    if (constant_index.has_value()) {
357
0
        out << *constant_index;
358
0
    } else {
359
0
        out << "null";
360
0
    }
361
0
    out << ", file_column_name=" << file_column_name
362
0
        << ", original_file_type=" << data_type_debug_string(original_file_type)
363
0
        << ", original_file_children="
364
0
        << join_debug_strings(original_file_children,
365
0
                              [](const ColumnDefinition& child) { return child.debug_string(); })
366
0
        << ", file_type=" << data_type_debug_string(file_type)
367
0
        << ", table_type=" << data_type_debug_string(table_type)
368
0
        << ", has_projection=" << (projection != nullptr) << ", child_mappings="
369
0
        << join_debug_strings(child_mappings,
370
0
                              [](const ColumnMapping& child) { return child.debug_string(); })
371
0
        << ", is_trivial=" << is_trivial << ", is_constant=" << constant_index.has_value()
372
0
        << ", filter_conversion=" << filter_conversion_type_to_string(filter_conversion)
373
0
        << ", virtual_column_type=" << virtual_column_type_to_string(virtual_column_type)
374
0
        << ", has_default_expr=" << (default_expr != nullptr) << "}";
375
0
    return out.str();
376
0
}
377
378
0
std::string TableColumnMapper::debug_string() const {
379
0
    std::ostringstream out;
380
0
    out << "TableColumnMapper{options=" << _options.debug_string() << ", mappings="
381
0
        << join_debug_strings(_mappings,
382
0
                              [](const ColumnMapping& mapping) { return mapping.debug_string(); })
383
0
        << ", hidden_mappings="
384
0
        << join_debug_strings(_hidden_mappings,
385
0
                              [](const ColumnMapping& mapping) { return mapping.debug_string(); })
386
0
        << ", constant_count=" << _constant_map.size() << "}";
387
0
    return out.str();
388
0
}
389
390
static const FileSlotRewriteInfo* find_slot_rewrite_info(
391
        const VExprSPtr& expr,
392
        const std::map<GlobalIndex, FileSlotRewriteInfo>& global_to_file_slot,
393
67
        const VSlotRef** slot_ref) {
394
67
    if (expr == nullptr) {
395
0
        return nullptr;
396
0
    }
397
67
    VExprSPtr slot_expr = expr;
398
67
    const bool input_is_cast = is_cast_expr(expr) && expr->get_num_children() == 1;
399
67
    if (is_cast_expr(expr) && expr->get_num_children() == 1) {
400
3
        slot_expr = expr->children()[0];
401
3
    }
402
67
    if (!slot_expr->is_slot_ref()) {
403
40
        return nullptr;
404
40
    }
405
27
    const auto* candidate_slot_ref = assert_cast<const VSlotRef*>(slot_expr.get());
406
27
    const auto rewrite_it = global_to_file_slot.find(slot_ref_global_index(*candidate_slot_ref));
407
27
    if (rewrite_it == global_to_file_slot.end()) {
408
0
        return nullptr;
409
0
    }
410
27
    if (input_is_cast && !expr->data_type()->equals(*rewrite_it->second.table_type)) {
411
1
        return nullptr;
412
1
    }
413
26
    if (slot_ref != nullptr) {
414
26
        *slot_ref = candidate_slot_ref;
415
26
    }
416
26
    return &rewrite_it->second;
417
27
}
418
419
276
static bool filter_conversion_has_local_source(FilterConversionType conversion) {
420
276
    switch (conversion) {
421
201
    case FilterConversionType::COPY_DIRECTLY:
422
243
    case FilterConversionType::CAST_FILTER:
423
258
    case FilterConversionType::READER_EXPRESSION:
424
258
        return true;
425
18
    case FilterConversionType::FINALIZE_ONLY:
426
18
    case FilterConversionType::CONSTANT:
427
18
        return false;
428
276
    }
429
0
    return false;
430
276
}
431
432
9
static bool column_predicate_can_use_local_source(FilterConversionType conversion) {
433
9
    switch (conversion) {
434
9
    case FilterConversionType::COPY_DIRECTLY:
435
9
        return true;
436
0
    case FilterConversionType::CAST_FILTER:
437
0
    case FilterConversionType::READER_EXPRESSION:
438
0
    case FilterConversionType::FINALIZE_ONLY:
439
0
    case FilterConversionType::CONSTANT:
440
0
        return false;
441
9
    }
442
0
    return false;
443
9
}
444
445
static bool table_filter_has_only_local_entries(
446
136
        const TableFilter& table_filter, const std::map<GlobalIndex, FilterEntry>& filter_entries) {
447
138
    for (const auto global_index : table_filter.global_indices) {
448
138
        const auto entry_it = filter_entries.find(global_index);
449
138
        if (entry_it == filter_entries.end() || !entry_it->second.is_local()) {
450
22
            return false;
451
22
        }
452
138
    }
453
114
    return true;
454
136
}
455
456
static VExprSPtr unwrap_literal_for_file_cast(const VExprSPtr& expr,
457
31
                                              const DataTypePtr& table_type) {
458
31
    if (expr == nullptr) {
459
0
        return nullptr;
460
0
    }
461
31
    if (expr->is_literal()) {
462
31
        return expr;
463
31
    }
464
0
    if (is_cast_expr(expr) && expr->get_num_children() == 1 && expr->children()[0]->is_literal() &&
465
0
        expr->children()[0]->data_type()->equals(*table_type)) {
466
0
        return expr->children()[0];
467
0
    }
468
0
    return nullptr;
469
0
}
470
471
0
static Field literal_field_from_expr(const VExpr& literal_expr) {
472
0
    DORIS_CHECK(literal_expr.is_literal());
473
0
    const auto* literal = dynamic_cast<const VLiteral*>(&literal_expr);
474
0
    DORIS_CHECK(literal != nullptr);
475
0
    Field field;
476
0
    literal->get_column_ptr()->get(0, field);
477
0
    return field;
478
0
}
479
480
// Table filter localization clones an already-prepared table expr and then rewrites it to file
481
// slots. Only split-local literals and BE cast nodes need table-reader-specific clone behavior;
482
// plain slot refs and literals use their own VExpr::clone_node().
483
339
static Status clone_table_expr_node(const VExpr& expr, VExprSPtr* cloned_expr) {
484
339
    DORIS_CHECK(cloned_expr != nullptr);
485
339
    if (const auto* split_literal = dynamic_cast<const SplitLocalFileLiteral*>(&expr)) {
486
0
        *cloned_expr = std::make_shared<SplitLocalFileLiteral>(
487
0
                split_literal->data_type(), literal_field_from_expr(expr),
488
0
                split_literal->original_type(), split_literal->original_field());
489
339
    } else if (const auto* vcast_expr = dynamic_cast<const VCastExpr*>(&expr);
490
339
               vcast_expr != nullptr && vcast_expr->node_type() == TExprNodeType::CAST_EXPR) {
491
0
        *cloned_expr = Cast::create_shared(vcast_expr->data_type());
492
0
    }
493
339
    return Status::OK();
494
339
}
495
496
86
Status clone_table_expr_tree(const VExprSPtr& expr, VExprSPtr* cloned_expr) {
497
86
    DORIS_CHECK(cloned_expr != nullptr);
498
86
    if (expr == nullptr) {
499
0
        *cloned_expr = nullptr;
500
0
        return Status::OK();
501
0
    }
502
86
    return expr->deep_clone(cloned_expr, clone_table_expr_node);
503
86
}
504
505
static VExprSPtr original_table_literal(const VExprSPtr& literal_expr,
506
32
                                        RewriteContext* rewrite_context = nullptr) {
507
32
    DORIS_CHECK(literal_expr != nullptr);
508
32
    DORIS_CHECK(literal_expr->is_literal());
509
32
    const auto* rewritten_literal = dynamic_cast<const SplitLocalFileLiteral*>(literal_expr.get());
510
32
    if (rewritten_literal == nullptr) {
511
32
        return literal_expr;
512
32
    }
513
0
    auto literal = VLiteral::create_shared(rewritten_literal->original_type(),
514
0
                                           rewritten_literal->original_field());
515
0
    if (rewrite_context != nullptr) {
516
0
        rewrite_context->add_created_expr(literal);
517
0
    }
518
0
    return literal;
519
32
}
520
521
68
static ColumnDefinition hidden_column_from_slot_ref(const VSlotRef& slot_ref) {
522
68
    ColumnDefinition column;
523
68
    column.name = slot_ref.column_name();
524
68
    column.identifier = Field::create_field<TYPE_STRING>(column.name);
525
68
    column.type = slot_ref.data_type();
526
68
    return column;
527
68
}
528
529
static void collect_top_level_slot_columns(const VExprSPtr& expr,
530
266
                                           std::map<GlobalIndex, ColumnDefinition>* columns) {
531
266
    DORIS_CHECK(columns != nullptr);
532
266
    if (expr == nullptr) {
533
0
        return;
534
0
    }
535
266
    if (expr->is_slot_ref()) {
536
68
        const auto* slot_ref = assert_cast<const VSlotRef*>(expr.get());
537
68
        columns->try_emplace(slot_ref_global_index(*slot_ref),
538
68
                             hidden_column_from_slot_ref(*slot_ref));
539
68
        return;
540
68
    }
541
203
    for (const auto& child : expr->children()) {
542
203
        collect_top_level_slot_columns(child, columns);
543
203
    }
544
198
}
545
546
static VExprSPtr rewrite_literal_to_file_type(const VExprSPtr& literal_expr,
547
                                              const FileSlotRewriteInfo& rewrite_info,
548
29
                                              RewriteContext* rewrite_context) {
549
29
    DORIS_CHECK(literal_expr != nullptr);
550
29
    DORIS_CHECK(literal_expr->is_literal());
551
29
    const auto original_literal = original_table_literal(literal_expr, rewrite_context);
552
29
    const Field original_field = literal_field(original_literal);
553
29
    if (rewrite_info.file_type->equals(*original_literal->data_type())) {
554
5
        return original_literal;
555
5
    }
556
24
    Field file_field;
557
24
    try {
558
24
        convert_field_to_type(original_field, *rewrite_info.file_type, &file_field,
559
24
                              original_literal->data_type().get());
560
24
    } catch (const Exception&) {
561
2
        return nullptr;
562
2
    }
563
22
    if (file_field.is_null()) {
564
0
        return nullptr;
565
0
    }
566
22
    if (file_field.get_type() != remove_nullable(rewrite_info.file_type)->get_primitive_type()) {
567
0
        return nullptr;
568
0
    }
569
22
    auto literal = std::make_shared<SplitLocalFileLiteral>(
570
22
            rewrite_info.file_type, file_field, original_literal->data_type(), original_field);
571
22
    rewrite_context->add_created_expr(literal);
572
22
    return literal;
573
22
}
574
575
static bool rewrite_binary_slot_literal_predicate(
576
        const VExprSPtr& expr,
577
        const std::map<GlobalIndex, FileSlotRewriteInfo>& global_to_file_slot,
578
136
        RewriteContext* rewrite_context) {
579
136
    if (!is_binary_comparison_predicate(expr)) {
580
95
        return false;
581
95
    }
582
41
    auto children = expr->children();
583
41
    const VSlotRef* slot_ref = nullptr;
584
41
    const FileSlotRewriteInfo* rewrite_info =
585
41
            find_slot_rewrite_info(children[0], global_to_file_slot, &slot_ref);
586
41
    int slot_child_idx = 0;
587
41
    int literal_child_idx = 1;
588
41
    if (rewrite_info == nullptr) {
589
20
        rewrite_info = find_slot_rewrite_info(children[1], global_to_file_slot, &slot_ref);
590
20
        slot_child_idx = 1;
591
20
        literal_child_idx = 0;
592
20
    }
593
41
    if (rewrite_info == nullptr || slot_ref == nullptr) {
594
19
        return false;
595
19
    }
596
22
    auto literal_expr =
597
22
            unwrap_literal_for_file_cast(children[literal_child_idx], rewrite_info->table_type);
598
22
    if (literal_expr == nullptr) {
599
0
        return false;
600
0
    }
601
602
22
    auto rewritten_literal =
603
22
            rewrite_literal_to_file_type(literal_expr, *rewrite_info, rewrite_context);
604
22
    if (rewritten_literal == nullptr) {
605
1
        children[literal_child_idx] = original_table_literal(literal_expr, rewrite_context);
606
1
        expr->set_children(std::move(children));
607
1
        return false;
608
1
    }
609
610
21
    children[slot_child_idx] = create_file_slot_ref(*slot_ref, *rewrite_info, rewrite_context);
611
21
    children[literal_child_idx] = std::move(rewritten_literal);
612
21
    expr->set_children(std::move(children));
613
21
    return true;
614
22
}
615
616
static bool rewrite_in_slot_literal_predicate(
617
        const VExprSPtr& expr,
618
        const std::map<GlobalIndex, FileSlotRewriteInfo>& global_to_file_slot,
619
115
        RewriteContext* rewrite_context) {
620
115
    if (expr->node_type() != TExprNodeType::IN_PRED || expr->get_num_children() < 2) {
621
109
        return false;
622
109
    }
623
6
    auto children = expr->children();
624
6
    const VSlotRef* slot_ref = nullptr;
625
6
    const FileSlotRewriteInfo* rewrite_info =
626
6
            find_slot_rewrite_info(children[0], global_to_file_slot, &slot_ref);
627
6
    if (rewrite_info == nullptr || slot_ref == nullptr) {
628
2
        return false;
629
2
    }
630
631
4
    VExprSPtrs rewritten_literals;
632
4
    rewritten_literals.reserve(children.size() - 1);
633
10
    for (size_t child_idx = 1; child_idx < children.size(); ++child_idx) {
634
7
        auto literal_expr =
635
7
                unwrap_literal_for_file_cast(children[child_idx], rewrite_info->table_type);
636
7
        if (literal_expr == nullptr) {
637
0
            return false;
638
0
        }
639
7
        auto rewritten_literal =
640
7
                rewrite_literal_to_file_type(literal_expr, *rewrite_info, rewrite_context);
641
7
        if (rewritten_literal == nullptr) {
642
3
            for (size_t restore_idx = 1; restore_idx < children.size(); ++restore_idx) {
643
2
                auto restore_literal = unwrap_literal_for_file_cast(children[restore_idx],
644
2
                                                                    rewrite_info->table_type);
645
2
                if (restore_literal != nullptr) {
646
2
                    children[restore_idx] =
647
2
                            original_table_literal(restore_literal, rewrite_context);
648
2
                }
649
2
            }
650
1
            expr->set_children(std::move(children));
651
1
            return false;
652
1
        }
653
6
        rewritten_literals.push_back(std::move(rewritten_literal));
654
6
    }
655
656
3
    children[0] = create_file_slot_ref(*slot_ref, *rewrite_info, rewrite_context);
657
9
    for (size_t literal_idx = 0; literal_idx < rewritten_literals.size(); ++literal_idx) {
658
6
        children[literal_idx + 1] = std::move(rewritten_literals[literal_idx]);
659
6
    }
660
3
    expr->set_children(std::move(children));
661
3
    return true;
662
4
}
663
664
static VExprSPtr create_file_struct_child_name_literal(const std::string& file_child_name,
665
23
                                                       RewriteContext* rewrite_context) {
666
23
    auto literal = VLiteral::create_shared(std::make_shared<DataTypeString>(),
667
23
                                           Field::create_field<TYPE_STRING>(file_child_name));
668
23
    rewrite_context->add_created_expr(literal);
669
23
    return literal;
670
23
}
671
672
static bool needs_complex_file_slot_cast(const DataTypePtr& file_type,
673
9
                                         const DataTypePtr& table_type) {
674
9
    if (file_type == nullptr || table_type == nullptr || file_type->equals(*table_type)) {
675
0
        return false;
676
0
    }
677
9
    const auto file_nested_type = remove_nullable(file_type);
678
9
    const auto table_nested_type = remove_nullable(table_type);
679
9
    if (file_nested_type->equals(*table_nested_type)) {
680
0
        return false;
681
0
    }
682
9
    return is_complex_type(file_nested_type->get_primitive_type()) ||
683
9
           is_complex_type(table_nested_type->get_primitive_type());
684
9
}
685
686
23
static bool collect_struct_element_chain(const VExprSPtr& expr, std::vector<VExprSPtr>* chain) {
687
23
    DORIS_CHECK(chain != nullptr);
688
23
    if (!is_struct_element_expr(expr)) {
689
0
        return false;
690
0
    }
691
23
    const auto& parent = expr->children()[0];
692
23
    if (is_struct_element_expr(parent)) {
693
3
        if (!collect_struct_element_chain(parent, chain)) {
694
0
            return false;
695
0
        }
696
20
    } else if (!parent->is_slot_ref()) {
697
        // Only support file-local rewrite for struct child chains rooted directly at a top-level
698
        // slot, for example `element_at(s, 'a')` or `element_at(element_at(s, 'a'), 'b')`.
699
        //
700
        // Do not localize computed complex parents such as
701
        // `element_at(element_at(map_values(m), 1), 'full_name')`. The intermediate map/array
702
        // result has already been reshaped by scan projection and may have a different child order
703
        // from the table expression. Partially rewriting that expression against the file block can
704
        // silently evaluate the wrong struct child and filter out valid rows. Those predicates must
705
        // remain as table-level conjuncts and be evaluated after TableReader materialization.
706
0
        return false;
707
0
    }
708
23
    chain->push_back(expr);
709
23
    return true;
710
23
}
711
712
static bool rewrite_struct_element_path_to_file_expr(
713
        const VExprSPtr& expr, const std::vector<ColumnMapping>& mappings,
714
        const std::map<GlobalIndex, FileSlotRewriteInfo>& global_to_file_slot,
715
23
        RewriteContext* rewrite_context) {
716
23
    ResolvedNestedStructPath resolved;
717
23
    if (!resolve_nested_struct_expr_for_file(expr, mappings, &resolved)) {
718
3
        return false;
719
3
    }
720
721
20
    std::vector<VExprSPtr> struct_element_chain;
722
20
    if (!collect_struct_element_chain(expr, &struct_element_chain) ||
723
20
        struct_element_chain.size() != resolved.file_child_names.size() ||
724
20
        struct_element_chain.size() != resolved.file_child_types.size()) {
725
0
        return false;
726
0
    }
727
728
20
    auto root_children = struct_element_chain.front()->children();
729
20
    if (!root_children[0]->is_slot_ref()) {
730
0
        return false;
731
0
    }
732
20
    const auto* slot_ref = assert_cast<const VSlotRef*>(root_children[0].get());
733
20
    const auto rewrite_it = global_to_file_slot.find(slot_ref_global_index(*slot_ref));
734
20
    if (rewrite_it == global_to_file_slot.end()) {
735
0
        return false;
736
0
    }
737
738
    // File-local conjuncts are prepared against the file-reader Block, so both the root slot and
739
    // every struct selector must be expressed in file schema terms. For a renamed Iceberg field,
740
    // keeping the table selector would prepare `element_at(file_struct<rename>, 'renamed')` and
741
    // fail before any rows are read. Rewrite the whole chain while ColumnMapping still preserves
742
    // the table-to-file relationship. Example:
743
    //   table filter: element_at(element_at(s, 'renamed_parent'), 'renamed_leaf')
744
    //   old file:     s<parent<leaf>>
745
    //   file filter:  element_at(element_at(s, 'parent'), 'leaf')
746
20
    root_children[0] = create_file_slot_ref(*slot_ref, rewrite_it->second, rewrite_context);
747
20
    struct_element_chain.front()->set_children(std::move(root_children));
748
43
    for (size_t idx = 0; idx < struct_element_chain.size(); ++idx) {
749
23
        auto children = struct_element_chain[idx]->children();
750
23
        children[1] = create_file_struct_child_name_literal(resolved.file_child_names[idx],
751
23
                                                            rewrite_context);
752
23
        struct_element_chain[idx]->set_children(std::move(children));
753
        // The selector name and the expression return type must be moved to file schema together.
754
        // Example:
755
        //   table filter: element_at(element_at(s, 'new_a'), 'new_aa') = 50
756
        //   old file:     s.new_a STRUCT<aa, bb>
757
        //   file filter:  element_at(element_at(s, 'new_a'), 'aa') = 50
758
        //
759
        // If the inner element_at keeps the table return type STRUCT<new_aa, bb>, preparing the
760
        // outer element_at(..., 'aa') fails before scanning because `aa` is not a table field.
761
23
        struct_element_chain[idx]->data_type() = resolved.file_child_types[idx];
762
23
    }
763
20
    return true;
764
20
}
765
766
static VExprSPtr rewrite_table_expr_to_file_expr(
767
        const VExprSPtr& expr,
768
        const std::map<GlobalIndex, FileSlotRewriteInfo>& global_to_file_slot,
769
        const std::vector<ColumnMapping>& filter_mappings, RewriteContext* rewrite_context,
770
137
        bool* can_localize) {
771
137
    if (expr == nullptr) {
772
0
        return nullptr;
773
0
    }
774
137
    DORIS_CHECK(rewrite_context != nullptr);
775
137
    DORIS_CHECK(can_localize != nullptr);
776
137
    if (auto* runtime_filter = dynamic_cast<RuntimeFilterExpr*>(expr.get());
777
137
        runtime_filter != nullptr) {
778
1
        auto impl = runtime_filter->get_impl();
779
1
        if (impl == nullptr) {
780
0
            *can_localize = false;
781
0
            return expr;
782
0
        }
783
1
        auto localized_impl = rewrite_table_expr_to_file_expr(
784
1
                impl, global_to_file_slot, filter_mappings, rewrite_context, can_localize);
785
1
        if (!*can_localize) {
786
0
            return expr;
787
0
        }
788
1
        runtime_filter->set_impl(std::move(localized_impl));
789
1
        return expr;
790
1
    }
791
136
    if (rewrite_binary_slot_literal_predicate(expr, global_to_file_slot, rewrite_context)) {
792
21
        return expr;
793
21
    }
794
115
    if (rewrite_in_slot_literal_predicate(expr, global_to_file_slot, rewrite_context)) {
795
3
        return expr;
796
3
    }
797
112
    if (is_struct_element_expr(expr)) {
798
23
        if (!rewrite_struct_element_path_to_file_expr(expr, filter_mappings, global_to_file_slot,
799
23
                                                      rewrite_context)) {
800
            // The scanner still evaluates the original table-level conjunct after TableReader
801
            // finalizes the output block. Skipping an unlocalizable file conjunct is therefore
802
            // safer than preparing a partially rewritten expression against the wrong struct
803
            // layout. In particular, do not generate file-local conjuncts for computed complex
804
            // parents such as `element_at(element_at(map_values(m), 1), 'field')`; only direct
805
            // slot-rooted struct chains are supported here.
806
3
            *can_localize = false;
807
3
        }
808
23
        return expr;
809
23
    }
810
89
    if (expr->is_slot_ref()) {
811
15
        const auto* slot_ref = assert_cast<const VSlotRef*>(expr.get());
812
15
        const auto rewrite_it = global_to_file_slot.find(slot_ref_global_index(*slot_ref));
813
15
        if (rewrite_it != global_to_file_slot.end()) {
814
15
            const auto& rewrite_info = rewrite_it->second;
815
15
            auto file_slot = create_file_slot_ref(*slot_ref, rewrite_info, rewrite_context);
816
15
            if (rewrite_info.file_type->equals(*rewrite_info.table_type)) {
817
7
                return file_slot;
818
7
            }
819
8
            if (needs_complex_file_slot_cast(rewrite_info.file_type, rewrite_info.table_type)) {
820
                // Generic file-local expressions cannot safely cast an evolved complex file slot
821
                // back to the table type. Example:
822
                //
823
                //   table filter: ARRAY_CONTAINS(MAP_KEYS(m), 'person5')
824
                //   old file:     m MAP<STRING, STRUCT<name, age>>
825
                //   table:        m MAP<STRING, STRUCT<age, full_name, gender>>
826
                //
827
                // Although MAP_KEYS only reads the key column, wrapping the file slot as
828
                // `CAST(file_m AS table_m)` forces the value struct cast first and fails because
829
                // the old and new value structs have different fields. Keep such filters at the
830
                // table level, where TableReader materializes the evolved complex value before
831
                // Scanner evaluates the original conjunct. Direct slot-rooted struct child paths
832
                // are handled by rewrite_struct_element_path_to_file_expr() above.
833
1
                *can_localize = false;
834
1
                return expr;
835
1
            }
836
7
            auto cast_expr = Cast::create_shared(rewrite_info.table_type);
837
7
            cast_expr->add_child(std::move(file_slot));
838
7
            rewrite_context->add_created_expr(cast_expr);
839
7
            return cast_expr;
840
8
        }
841
0
        return expr;
842
15
    }
843
    // The input is a split-local cloned tree. A previous split-local clone may already have
844
    // inserted Cast(slot). Keep that rewrite idempotent: rewrite the cast child from table slot to
845
    // the current split's file slot, and drop the cast when the current split no longer needs it.
846
74
    if (is_cast_expr(expr) && expr->get_num_children() == 1) {
847
4
        const auto& child = expr->children()[0];
848
4
        if (child->is_slot_ref()) {
849
2
            const auto* slot_ref = assert_cast<const VSlotRef*>(child.get());
850
2
            const auto rewrite_it = global_to_file_slot.find(slot_ref_global_index(*slot_ref));
851
2
            if (rewrite_it != global_to_file_slot.end() &&
852
2
                expr->data_type()->equals(*rewrite_it->second.table_type)) {
853
1
                auto rewritten_child =
854
1
                        create_file_slot_ref(*slot_ref, rewrite_it->second, rewrite_context);
855
1
                if (rewrite_it->second.file_type->equals(*rewrite_it->second.table_type)) {
856
0
                    return rewritten_child;
857
0
                }
858
1
                if (needs_complex_file_slot_cast(rewrite_it->second.file_type,
859
1
                                                 rewrite_it->second.table_type)) {
860
0
                    *can_localize = false;
861
0
                    return expr;
862
0
                }
863
1
                expr->set_children({std::move(rewritten_child)});
864
1
                return expr;
865
1
            }
866
2
        }
867
4
    }
868
869
73
    VExprSPtrs rewritten_children;
870
73
    rewritten_children.reserve(expr->children().size());
871
78
    for (const auto& child : expr->children()) {
872
78
        rewritten_children.push_back(rewrite_table_expr_to_file_expr(
873
78
                child, global_to_file_slot, filter_mappings, rewrite_context, can_localize));
874
78
    }
875
73
    expr->set_children(std::move(rewritten_children));
876
73
    return expr;
877
74
}
878
879
static constexpr const char* ROW_LINEAGE_ROW_ID = "_row_id";
880
static constexpr const char* ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER = "_last_updated_sequence_number";
881
static constexpr int32_t ROW_LINEAGE_ROW_ID_FIELD_ID = 2147483540;
882
static constexpr int32_t ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER_FIELD_ID = 2147483539;
883
884
155
static TableVirtualColumnType row_lineage_virtual_column_type(const std::string& column_name) {
885
155
    if (column_name == ROW_LINEAGE_ROW_ID) {
886
2
        return TableVirtualColumnType::ROW_ID;
887
2
    }
888
153
    if (column_name == ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) {
889
2
        return TableVirtualColumnType::LAST_UPDATED_SEQUENCE_NUMBER;
890
2
    }
891
151
    return TableVirtualColumnType::INVALID;
892
153
}
893
894
static TableVirtualColumnType row_lineage_virtual_column_type_by_field_id(
895
75
        const ColumnDefinition& column) {
896
75
    if (!column.has_identifier_field_id()) {
897
5
        return TableVirtualColumnType::INVALID;
898
5
    }
899
70
    switch (column.get_identifier_field_id()) {
900
10
    case ROW_LINEAGE_ROW_ID_FIELD_ID:
901
10
        return TableVirtualColumnType::ROW_ID;
902
9
    case ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER_FIELD_ID:
903
9
        return TableVirtualColumnType::LAST_UPDATED_SEQUENCE_NUMBER;
904
51
    default:
905
51
        return TableVirtualColumnType::INVALID;
906
70
    }
907
70
}
908
909
static TableVirtualColumnType row_lineage_virtual_column_type(const ColumnDefinition& column,
910
230
                                                              TableColumnMappingMode mode) {
911
230
    switch (mode) {
912
75
    case TableColumnMappingMode::BY_FIELD_ID:
913
75
        return row_lineage_virtual_column_type_by_field_id(column);
914
133
    case TableColumnMappingMode::BY_NAME:
915
155
    case TableColumnMappingMode::BY_INDEX:
916
155
        return row_lineage_virtual_column_type(column.name);
917
230
    }
918
0
    return TableVirtualColumnType::INVALID;
919
230
}
920
921
// Returns true when the current file type is not the exact nested type the scan should expose.
922
// This is about building the projected file-side type/projection, not about whether TableReader
923
// later needs to rematerialize the complex value back to table layout.
924
104
static bool needs_projected_file_type_rebuild(const ColumnMapping& mapping) {
925
104
    if (!is_complex_type(mapping.file_type->get_primitive_type())) {
926
26
        return false;
927
26
    }
928
78
    if (mapping.child_mappings.empty()) {
929
0
        return false;
930
0
    }
931
78
    DORIS_CHECK(mapping.file_type != nullptr);
932
78
    DORIS_CHECK(mapping.table_type != nullptr);
933
78
    if (remove_nullable(mapping.file_type)->get_primitive_type() !=
934
78
        remove_nullable(mapping.table_type)->get_primitive_type()) {
935
0
        return true;
936
0
    }
937
78
    if (!mapping.table_type->equals(*mapping.file_type)) {
938
61
        return true;
939
61
    }
940
30
    for (const auto& child_mapping : mapping.child_mappings) {
941
        // Rename-only child mappings do not change the file-side projected shape. If field-id
942
        // matching maps table child `renamed_b` to file child `b`, the file reader can still expose
943
        // the original file type as long as child count/order/types are unchanged.
944
30
        if (!child_mapping.file_local_id.has_value() ||
945
30
            needs_projected_file_type_rebuild(child_mapping)) {
946
1
            return true;
947
1
        }
948
30
    }
949
16
    return false;
950
17
}
951
952
static std::optional<size_t> file_child_ordinal_in_scan_type(const ColumnMapping& mapping,
953
76
                                                             const ColumnMapping& child_mapping) {
954
76
    if (!child_mapping.file_local_id.has_value()) {
955
1
        return std::nullopt;
956
1
    }
957
75
    const auto& file_children = !mapping.projected_file_children.empty()
958
75
                                        ? mapping.projected_file_children
959
75
                                        : mapping.original_file_children;
960
90
    const auto child_it = std::ranges::find_if(file_children, [&](const ColumnDefinition& child) {
961
90
        return child.file_local_id() == *child_mapping.file_local_id;
962
90
    });
963
75
    if (child_it == file_children.end()) {
964
0
        return std::nullopt;
965
0
    }
966
75
    return static_cast<size_t>(std::distance(file_children.begin(), child_it));
967
75
}
968
969
378
static bool needs_complex_rematerialize(const ColumnMapping& mapping) {
970
378
    if (mapping.child_mappings.empty()) {
971
289
        return false;
972
289
    }
973
89
    if (mapping.table_type == nullptr || mapping.file_type == nullptr ||
974
89
        !mapping.table_type->equals(*mapping.file_type)) {
975
24
        return true;
976
24
    }
977
133
    for (size_t table_child_idx = 0; table_child_idx < mapping.child_mappings.size();
978
76
         ++table_child_idx) {
979
76
        const auto& child_mapping = mapping.child_mappings[table_child_idx];
980
76
        const auto file_child_idx = file_child_ordinal_in_scan_type(mapping, child_mapping);
981
76
        if (!file_child_idx.has_value() || *file_child_idx != table_child_idx ||
982
76
            needs_complex_rematerialize(child_mapping) ||
983
76
            (child_mapping.table_type != nullptr && child_mapping.file_type != nullptr &&
984
68
             !child_mapping.table_type->equals(*child_mapping.file_type))) {
985
8
            return true;
986
8
        }
987
76
    }
988
57
    return false;
989
65
}
990
991
397
static bool mapping_can_use_file_column_directly(const ColumnMapping& mapping) {
992
397
    if (mapping.table_type == nullptr || mapping.file_type == nullptr) {
993
0
        return false;
994
0
    }
995
397
    const auto table_type = remove_nullable(mapping.table_type);
996
397
    const auto file_type = remove_nullable(mapping.file_type);
997
397
    const bool same_timestamptz_with_different_scale =
998
397
            table_type->get_primitive_type() == TYPE_TIMESTAMPTZ &&
999
397
            file_type->get_primitive_type() == TYPE_TIMESTAMPTZ;
1000
397
    if (!mapping.table_type->equals(*mapping.file_type) && !same_timestamptz_with_different_scale) {
1001
131
        return false;
1002
131
    }
1003
266
    return !needs_complex_rematerialize(mapping);
1004
397
}
1005
1006
static const ColumnDefinition* find_file_child_for_mapping(const ColumnDefinition& table_child,
1007
                                                           const ColumnDefinition& file_parent,
1008
                                                           TableColumnMappingMode mode,
1009
                                                           size_t table_child_idx,
1010
118
                                                           bool allow_ordinal_fallback) {
1011
118
    const auto file_parent_type = remove_nullable(file_parent.type)->get_primitive_type();
1012
118
    switch (file_parent_type) {
1013
9
    case TYPE_ARRAY:
1014
9
        DORIS_CHECK(file_parent.children.size() == 1);
1015
9
        return &file_parent.children[0];
1016
19
    case TYPE_MAP:
1017
19
        DORIS_CHECK(file_parent.children.size() == 2);
1018
19
        if (table_child.name == "key") {
1019
6
            return &file_parent.children[0];
1020
6
        }
1021
13
        if (table_child.name == "value") {
1022
13
            return &file_parent.children[1];
1023
13
        }
1024
0
        if (table_child.local_id == 0 || table_child.local_id == 1) {
1025
0
            return &file_parent.children[table_child.local_id];
1026
0
        }
1027
0
        return nullptr;
1028
90
    default:
1029
        // Hive BY_INDEX is a top-level column matching rule. Once a complex root is selected by
1030
        // file position, nested struct children follow Hive reader's historical name matching
1031
        // semantics; their integer identifiers can be field ids, not file positions.
1032
90
        const auto nested_mode =
1033
90
                mode == TableColumnMappingMode::BY_INDEX ? TableColumnMappingMode::BY_NAME : mode;
1034
90
        if (const auto* file_child =
1035
90
                    matcher_for_mode(nested_mode).find(table_child, file_parent.children);
1036
90
            file_child != nullptr) {
1037
71
            return file_child;
1038
71
        }
1039
19
        if (allow_ordinal_fallback && mode == TableColumnMappingMode::BY_FIELD_ID &&
1040
19
            !table_child.has_identifier_field_id()) {
1041
            // Synthetic children are derived from the table DataType when nested ColumnDefinition
1042
            // metadata has been pruned away. They do not carry Iceberg field ids, so try a name
1043
            // match before falling back to ordinal order. Example:
1044
            //   table value type: Struct(age, full_name, gender)
1045
            //   old file value:   Struct(name, age)
1046
            // Name matching keeps `age -> age`; the later unused-child fallback can then map the
1047
            // renamed `full_name -> name` instead of consuming `age` twice.
1048
3
            if (const auto* file_child = NameMatcher().find(table_child, file_parent.children);
1049
3
                file_child != nullptr) {
1050
1
                return file_child;
1051
1
            }
1052
3
        }
1053
        // Some callers only carry the full complex DataType for a projected table column, without
1054
        // expanded nested ColumnDefinitions. In that case we can still preserve full materialization
1055
        // by walking table/file struct fields by ordinal. This is a fallback only: explicit
1056
        // ColumnDefinition children keep using the requested table-format matching rule, which is
1057
        // required for precise schema evolution.
1058
18
        if (allow_ordinal_fallback && table_child_idx < file_parent.children.size()) {
1059
3
            return &file_parent.children[table_child_idx];
1060
3
        }
1061
15
        return nullptr;
1062
118
    }
1063
118
}
1064
1065
static ColumnDefinition synthetic_child_definition(const std::string& name, DataTypePtr type,
1066
13
                                                   int32_t local_id) {
1067
13
    ColumnDefinition child;
1068
13
    child.identifier = Field::create_field<TYPE_STRING>(name);
1069
13
    child.local_id = local_id;
1070
13
    child.name = name;
1071
13
    child.type = std::move(type);
1072
13
    return child;
1073
13
}
1074
1075
static std::vector<ColumnDefinition> synthesize_complex_children_from_type(
1076
4
        const DataTypePtr& type) {
1077
4
    std::vector<ColumnDefinition> children;
1078
4
    if (type == nullptr) {
1079
0
        return children;
1080
0
    }
1081
4
    const auto nested_type = remove_nullable(type);
1082
4
    switch (nested_type->get_primitive_type()) {
1083
0
    case TYPE_ARRAY: {
1084
0
        const auto* array_type = assert_cast<const DataTypeArray*>(nested_type.get());
1085
0
        children.push_back(synthetic_child_definition("element", array_type->get_nested_type(), 0));
1086
0
        break;
1087
0
    }
1088
1
    case TYPE_MAP: {
1089
1
        const auto* map_type = assert_cast<const DataTypeMap*>(nested_type.get());
1090
1
        children.push_back(synthetic_child_definition("key", map_type->get_key_type(), 0));
1091
1
        children.push_back(synthetic_child_definition("value", map_type->get_value_type(), 1));
1092
1
        break;
1093
0
    }
1094
3
    case TYPE_STRUCT: {
1095
3
        const auto* struct_type = assert_cast<const DataTypeStruct*>(nested_type.get());
1096
3
        children.reserve(struct_type->get_elements().size());
1097
12
        for (size_t idx = 0; idx < struct_type->get_elements().size(); ++idx) {
1098
9
            children.push_back(synthetic_child_definition(struct_type->get_element_name(idx),
1099
9
                                                          struct_type->get_element(idx),
1100
9
                                                          cast_set<int32_t>(idx)));
1101
9
        }
1102
3
        break;
1103
0
    }
1104
0
    default:
1105
0
        break;
1106
4
    }
1107
4
    return children;
1108
4
}
1109
1110
static bool has_table_child_named(const std::vector<ColumnDefinition>& children,
1111
15
                                  std::string_view name) {
1112
17
    return std::ranges::any_of(children, [&](const ColumnDefinition& child) {
1113
17
        return std::string_view(child.name) == name;
1114
17
    });
1115
15
}
1116
1117
static void complete_required_complex_children_from_type(const DataTypePtr& type,
1118
57
                                                         std::vector<ColumnDefinition>* children) {
1119
57
    DORIS_CHECK(children != nullptr);
1120
57
    if (type == nullptr) {
1121
0
        return;
1122
0
    }
1123
57
    const auto nested_type = remove_nullable(type);
1124
57
    switch (nested_type->get_primitive_type()) {
1125
11
    case TYPE_MAP: {
1126
11
        const auto* map_type = assert_cast<const DataTypeMap*>(nested_type.get());
1127
        // MAP key/value are structural children, not independently materializable table fields.
1128
        // A key-only projection can still be attached to a whole-map output root, for example:
1129
        //   SELECT * FROM t WHERE ARRAY_CONTAINS(MAP_KEYS(new_map_column), 'person5')
1130
        //
1131
        // In that shape the scanner keeps the value stream readable, but the table projection can
1132
        // carry only the key child. Add the missing value child so recursive mapping can evolve the
1133
        // value type instead of letting TableReader cast old/new value structs directly.
1134
11
        if (has_table_child_named(*children, "key") && !has_table_child_named(*children, "value")) {
1135
2
            children->push_back(synthetic_child_definition("value", map_type->get_value_type(), 1));
1136
2
        }
1137
11
        break;
1138
0
    }
1139
8
    case TYPE_ARRAY:
1140
        // ARRAY has only one required structural child (`element`), so a non-empty projection is
1141
        // already rooted at the element path.
1142
8
        break;
1143
38
    case TYPE_STRUCT:
1144
        // STRUCT children are real fields and must remain prunable. Completing missing struct
1145
        // fields here would turn `SELECT s.a` into a full-struct read and undo nested projection.
1146
38
        break;
1147
0
    default:
1148
0
        break;
1149
57
    }
1150
57
}
1151
1152
76
static Status validate_file_schema_children(const ColumnDefinition& file_field) {
1153
76
    if (file_field.type == nullptr) {
1154
0
        return Status::InternalError("File column '{}' has null type", file_field.name);
1155
0
    }
1156
76
    const auto nested_type = remove_nullable(file_field.type);
1157
76
    size_t expected_children = 0;
1158
76
    bool complex_with_fixed_children = true;
1159
76
    switch (nested_type->get_primitive_type()) {
1160
9
    case TYPE_ARRAY:
1161
9
        expected_children = 1;
1162
9
        break;
1163
13
    case TYPE_MAP:
1164
13
        expected_children = 2;
1165
13
        break;
1166
54
    case TYPE_STRUCT:
1167
54
        expected_children =
1168
54
                assert_cast<const DataTypeStruct*>(nested_type.get())->get_elements().size();
1169
54
        break;
1170
0
    default:
1171
0
        complex_with_fixed_children = false;
1172
0
        break;
1173
76
    }
1174
76
    if (!complex_with_fixed_children || file_field.children.size() == expected_children) {
1175
75
        return Status::OK();
1176
75
    }
1177
1
    return Status::InternalError(
1178
1
            "Malformed complex file schema for column '{}': type={}, expected_children={}, "
1179
1
            "actual_children={}",
1180
1
            file_field.name, file_field.type->get_name(), expected_children,
1181
1
            file_field.children.size());
1182
76
}
1183
1184
183
static bool has_projected_file_children(const ColumnMapping& mapping) {
1185
183
    if (mapping.original_file_children.empty() || mapping.projected_file_children.empty()) {
1186
137
        return false;
1187
137
    }
1188
46
    if (mapping.original_file_children.size() != mapping.projected_file_children.size()) {
1189
21
        return true;
1190
21
    }
1191
65
    for (size_t idx = 0; idx < mapping.original_file_children.size(); ++idx) {
1192
40
        if (mapping.original_file_children[idx].file_local_id() !=
1193
40
            mapping.projected_file_children[idx].file_local_id()) {
1194
0
            return true;
1195
0
        }
1196
40
    }
1197
25
    return false;
1198
25
}
1199
1200
183
static bool needs_nested_file_projection(const ColumnMapping& mapping) {
1201
183
    if (has_projected_file_children(mapping)) {
1202
        // Return True if the projected child column is missing / re-ordered
1203
21
        return true;
1204
21
    }
1205
162
    return std::ranges::any_of(mapping.child_mappings, [](const ColumnMapping& child_mapping) {
1206
40
        return needs_nested_file_projection(child_mapping);
1207
40
    });
1208
183
}
1209
1210
static Status build_complex_projection(const ColumnMapping& mapping, LocalColumnIndex* projection);
1211
1212
// Build the projected file children/type according to the pruned complex projection. For example,
1213
// if we have a struct column `s` with children `id` and `name`, and the projection only keeps
1214
// `s.name`, then the file reader should expose `STRUCT<name ...>`.
1215
static Status rebuild_projected_file_children_and_type(
1216
        const DataTypePtr& file_type, const std::vector<ColumnDefinition>& original_file_children,
1217
        const std::vector<ColumnMapping>& child_mappings,
1218
62
        std::vector<ColumnDefinition>* projected_file_children, DataTypePtr* projected_type) {
1219
62
    DORIS_CHECK(file_type != nullptr);
1220
62
    DORIS_CHECK(projected_file_children != nullptr);
1221
62
    DORIS_CHECK(projected_type != nullptr);
1222
62
    ColumnDefinition field;
1223
62
    field.type = file_type;
1224
62
    field.children = original_file_children;
1225
62
    LocalColumnIndex projection = LocalColumnIndex::partial_local(-1);
1226
62
    projection.children.reserve(child_mappings.size());
1227
80
    for (const auto* child_mapping : present_child_mappings_in_file_order(child_mappings)) {
1228
80
        DORIS_CHECK(child_mapping->file_local_id.has_value());
1229
80
        LocalColumnIndex child_projection;
1230
80
        RETURN_IF_ERROR(build_complex_projection(*child_mapping, &child_projection));
1231
80
        projection.children.push_back(std::move(child_projection));
1232
80
    }
1233
1234
62
    ColumnDefinition projected_field;
1235
62
    RETURN_IF_ERROR(project_column_definition(field, projection, &projected_field));
1236
62
    *projected_file_children = std::move(projected_field.children);
1237
62
    *projected_type = std::move(projected_field.type);
1238
62
    return Status::OK();
1239
62
}
1240
1241
// Build the complex column projection according to the ColumnMapping which is re-ordered by the
1242
// file-schema's order.
1243
//
1244
// For MAP, a partial projection represents value-subtree pruning only. The key child is not a
1245
// projected output shape; file readers still read full keys to construct ColumnMap offsets and keep
1246
// key semantics unchanged. If a caller tries to project only/prune the key child, the common schema
1247
// projection helper rejects it.
1248
160
static Status build_complex_projection(const ColumnMapping& mapping, LocalColumnIndex* projection) {
1249
160
    if (projection == nullptr) {
1250
0
        return Status::InvalidArgument("projection is null");
1251
0
    }
1252
160
    DORIS_CHECK(mapping.file_local_id.has_value());
1253
160
    *projection = LocalColumnIndex::local(*mapping.file_local_id);
1254
160
    projection->project_all_children = mapping.child_mappings.empty();
1255
160
    projection->children.clear();
1256
160
    const auto present_children = present_child_mappings_in_file_order(mapping.child_mappings);
1257
160
    if (!projection->project_all_children && present_children.empty()) {
1258
        // All requested table children under this complex node are missing/default-only. The file
1259
        // reader cannot expose an empty complex projection, but TableReader can still rematerialize
1260
        // the table shape from a full file subtree and fill the missing children with defaults.
1261
4
        projection->project_all_children = true;
1262
4
        return Status::OK();
1263
4
    }
1264
156
    for (const auto* child_mapping : present_children) {
1265
59
        LocalColumnIndex child_projection;
1266
59
        RETURN_IF_ERROR(build_complex_projection(*child_mapping, &child_projection));
1267
59
        projection->children.push_back(std::move(child_projection));
1268
59
    }
1269
156
    if (!projection->project_all_children && projection->children.empty()) {
1270
0
        return Status::NotSupported("Projection for complex column {} contains no file children",
1271
0
                                    mapping.file_column_name);
1272
0
    }
1273
156
    return Status::OK();
1274
156
}
1275
1276
using FilterProjectionMap = std::map<LocalColumnId, LocalColumnIndex>;
1277
1278
// Update the mapping's file type according to the projection, and determine whether the projection
1279
// is trivial (i.e. the projected file type is the same as the table type, so no need to
1280
// rematerialize the complex value back to table layout after reading from file).
1281
static Status apply_projection_to_mapping_file_type(const LocalColumnIndex& projection,
1282
149
                                                    ColumnMapping* mapping) {
1283
149
    DORIS_CHECK(mapping != nullptr);
1284
149
    if (mapping->original_file_type == nullptr) {
1285
0
        mapping->original_file_type = mapping->file_type;
1286
0
    }
1287
149
    if (mapping->original_file_type == nullptr ||
1288
149
        !is_complex_type(remove_nullable(mapping->original_file_type)->get_primitive_type())) {
1289
106
        return Status::OK();
1290
106
    }
1291
43
    ColumnDefinition field;
1292
43
    field.type = mapping->original_file_type;
1293
43
    field.children = mapping->original_file_children;
1294
43
    ColumnDefinition projected_field;
1295
43
    RETURN_IF_ERROR(project_column_definition(field, projection, &projected_field));
1296
43
    mapping->file_type = std::move(projected_field.type);
1297
43
    mapping->projected_file_children = std::move(projected_field.children);
1298
43
    mapping->is_trivial = mapping_can_use_file_column_directly(*mapping);
1299
43
    return Status::OK();
1300
43
}
1301
1302
static Status merge_filter_projection(const FilterProjectionMap* filter_projections,
1303
61
                                      LocalColumnIndex* projection) {
1304
61
    DORIS_CHECK(projection != nullptr);
1305
61
    if (filter_projections == nullptr) {
1306
0
        return Status::OK();
1307
0
    }
1308
61
    const auto filter_projection_it = filter_projections->find(projection->column_id());
1309
61
    if (filter_projection_it == filter_projections->end()) {
1310
46
        return Status::OK();
1311
46
    }
1312
    // Merge predicate-only nested paths into the root projection that is about to be scanned.
1313
    // Example: `SELECT s.a WHERE s.b > 1` first builds the output projection `s -> a` from
1314
    // ColumnMapping, while build_nested_struct_filter_projection_map() records `s -> b`. This merge
1315
    // produces one file scan projection `s -> a,b`.
1316
15
    RETURN_IF_ERROR(merge_local_column_index(projection, filter_projection_it->second));
1317
15
    return Status::OK();
1318
15
}
1319
1320
2
static bool table_root_is_map(const ColumnMapping& mapping) {
1321
2
    if (mapping.table_type == nullptr) {
1322
0
        return false;
1323
0
    }
1324
2
    return remove_nullable(mapping.table_type)->get_primitive_type() == TYPE_MAP;
1325
2
}
1326
1327
static Status add_scan_column(FileScanRequest* file_request, ColumnMapping* mapping,
1328
                              bool is_predicate_column, bool force_full_complex_scan_projection,
1329
150
                              const FilterProjectionMap* filter_projections = nullptr) {
1330
150
    const auto file_column_id = LocalColumnId(mapping->file_local_id.value());
1331
150
    LocalColumnIndex projection = LocalColumnIndex::top_level(file_column_id);
1332
    // Columnar readers can turn a complex mapping into a nested file projection, but
1333
    // row-oriented readers must scan the full top-level complex field because all children are
1334
    // encoded in the same text cell.
1335
150
    if (!force_full_complex_scan_projection && needs_nested_file_projection(*mapping)) {
1336
21
        RETURN_IF_ERROR(build_complex_projection(*mapping, &projection));
1337
21
    }
1338
150
    if (is_predicate_column && !force_full_complex_scan_projection) {
1339
61
        DCHECK(filter_projections != nullptr);
1340
        // If a projected complex root is also used by a predicate, rebuild the predicate scan
1341
        // projection from the output mapping before merging predicate-only children. For
1342
        // `SELECT s.a WHERE s.b > 1`, build_complex_projection() produces `s -> a` and
1343
        // merge_filter_projection() adds `s -> b`, so the predicate column reads both children.
1344
61
        RETURN_IF_ERROR(merge_filter_projection(filter_projections, &projection));
1345
61
    }
1346
150
    FileScanRequestBuilder builder(file_request);
1347
150
    if (is_predicate_column) {
1348
61
        return builder.add_predicate_column(std::move(projection));
1349
61
    }
1350
89
    return builder.add_non_predicate_column(std::move(projection));
1351
150
}
1352
1353
static const LocalColumnIndex* find_scan_projection(
1354
238
        const std::vector<LocalColumnIndex>& scan_columns, LocalColumnId file_column_id) {
1355
238
    const auto projection_it =
1356
238
            std::ranges::find_if(scan_columns, [&](const LocalColumnIndex& projection) {
1357
189
                return projection.column_id() == file_column_id;
1358
189
            });
1359
238
    return projection_it == scan_columns.end() ? nullptr : &*projection_it;
1360
238
}
1361
1362
// Apply the final scan projection of one root file column back to its ColumnMapping. This updates
1363
// mapping.file_type/projected_file_children from the original file schema to the exact shape that
1364
// FileReader will return.
1365
//
1366
// Example: for `SELECT s.a WHERE s.b > 1`, add_scan_column() keeps only one predicate scan
1367
// projection `s -> a,b`. Applying that projection changes the mapping's file type from the full
1368
// file struct `s<a,b,c>` to the projected file struct `s<a,b>`, so later filter rewrite and
1369
// TableReader final materialization use the same column shape as the file-local block.
1370
static Status apply_scan_projection_to_mapping_file_type(const FileScanRequest& file_request,
1371
149
                                                         ColumnMapping* mapping) {
1372
149
    DORIS_CHECK(mapping != nullptr);
1373
149
    DORIS_CHECK(mapping->file_local_id.has_value());
1374
149
    const auto file_column_id = LocalColumnId(*mapping->file_local_id);
1375
    // Predicate columns are the actual scan projection when a column is used by row-level filters:
1376
    // add_scan_column() removes the duplicate non-predicate projection in that case.
1377
149
    const auto* projection = find_scan_projection(file_request.predicate_columns, file_column_id);
1378
149
    if (projection == nullptr) {
1379
89
        projection = find_scan_projection(file_request.non_predicate_columns, file_column_id);
1380
89
    }
1381
149
    DORIS_CHECK(projection != nullptr);
1382
149
    return apply_projection_to_mapping_file_type(*projection, mapping);
1383
149
}
1384
1385
// Build extra scan projections required only by row-level filters on nested struct children.
1386
//
1387
// Example: for `SELECT s.a FROM t WHERE s.b.c > 1`, the output projection may only contain `s.a`,
1388
// but the file reader must also read `s.b.c` to evaluate the predicate. This function collects the
1389
// table-side filter path, resolves it through ColumnMapping first, and records the corresponding
1390
// file-side projection in filter_projections. This keeps renamed fields consistent across the scan
1391
// projection, row-level conjunct rewrite, and nested predicate pruning. Example:
1392
//   table filter path: s -> renamed_b -> c
1393
//   old file path:     s -> b -> c
1394
//   recorded path:     s -> b -> c
1395
// When add_scan_column() adds the same root as a predicate column, it rebuilds that root from the
1396
// output mapping, merges this filter-only projection into it, and removes the duplicate
1397
// non-predicate root entry.
1398
static Status build_nested_struct_filter_projection_map(
1399
        const std::vector<TableFilter>& table_filters, const std::vector<ColumnMapping>& mappings,
1400
130
        FilterProjectionMap* filter_projections) {
1401
130
    DORIS_CHECK(filter_projections != nullptr);
1402
130
    filter_projections->clear();
1403
130
    for (const auto& table_filter : table_filters) {
1404
71
        if (table_filter.conjunct == nullptr) {
1405
2
            continue;
1406
2
        }
1407
        // Collect all nested struct paths in the table filter. For example, for
1408
        // `s.id > 5 AND element_at(s, 'renamed_name') = 'abc'`, collect the table paths
1409
        // `s -> id` and `s -> renamed_name`, then resolve each one to its file-side projection.
1410
69
        std::vector<NestedStructPath> paths;
1411
69
        collect_nested_struct_paths(table_filter.conjunct->root(), &paths);
1412
69
        for (const auto& path : paths) {
1413
23
            auto mapping_it = std::ranges::find_if(mappings, [&](const ColumnMapping& mapping) {
1414
23
                return mapping.global_index == path.root_global_index;
1415
23
            });
1416
22
            if (mapping_it == mappings.end() || !mapping_it->file_local_id.has_value() ||
1417
22
                path.selectors.empty()) {
1418
0
                continue;
1419
0
            }
1420
1421
22
            ResolvedNestedStructPath resolved;
1422
22
            LocalColumnIndex root_projection;
1423
22
            if (!resolve_nested_struct_path_for_file(path, mappings, &resolved)) {
1424
2
                if (!table_root_is_map(*mapping_it)) {
1425
1
                    continue;
1426
1
                }
1427
                // Direct map value filters such as `m.value.a > 1` need the value leaf for row
1428
                // evaluation even when the query only projects another value child. This is only a
1429
                // scan projection fallback; complex map/array expressions are still not rewritten
1430
                // into file-local conjuncts.
1431
1
                LocalColumnIndex child_projection;
1432
1
                RETURN_IF_ERROR(build_file_child_projection_from_schema(
1433
1
                        mapping_it->original_file_children, path.selectors, &child_projection));
1434
1
                if (child_projection.local_id() < 0) {
1435
0
                    continue;
1436
0
                }
1437
1
                root_projection = LocalColumnIndex::partial_local(*mapping_it->file_local_id);
1438
1
                root_projection.children.push_back(std::move(child_projection));
1439
20
            } else {
1440
20
                root_projection = std::move(resolved.file_projection);
1441
20
            }
1442
21
            auto filter_projection_it = filter_projections->find(root_projection.column_id());
1443
21
            if (filter_projection_it == filter_projections->end()) {
1444
17
                filter_projections->emplace(root_projection.column_id(),
1445
17
                                            std::move(root_projection));
1446
17
                continue;
1447
17
            }
1448
4
            RETURN_IF_ERROR(
1449
4
                    merge_local_column_index(&filter_projection_it->second, root_projection));
1450
4
        }
1451
69
    }
1452
130
    return Status::OK();
1453
130
}
1454
1455
140
static void rebuild_projection(ColumnMapping* mapping, LocalIndex block_position) {
1456
140
    DORIS_CHECK(mapping->file_local_id.has_value());
1457
140
    if (mapping->is_trivial || needs_complex_rematerialize(*mapping)) {
1458
124
        mapping->projection = VExprContext::create_shared(VSlotRef::create_shared(
1459
124
                cast_set<int>(block_position.value()), cast_set<int>(block_position.value()), -1,
1460
124
                mapping->file_type, mapping->file_column_name));
1461
124
        return;
1462
124
    }
1463
1464
16
    auto expr = Cast::create_shared(mapping->table_type);
1465
16
    expr->add_child(VSlotRef::create_shared(cast_set<int>(block_position.value()),
1466
16
                                            cast_set<int>(block_position.value()), -1,
1467
16
                                            mapping->file_type, mapping->file_column_name));
1468
16
    mapping->projection = VExprContext::create_shared(expr);
1469
16
}
1470
1471
// Build file slot rewrite info from the localized filter targets. Only local targets can enter
1472
// file-reader expressions; constant and unset targets stay above the file reader.
1473
static std::map<GlobalIndex, FileSlotRewriteInfo> build_file_slot_rewrite_map(
1474
        const std::vector<ColumnMapping>& mappings,
1475
130
        const std::map<GlobalIndex, FilterEntry>& filter_entries) {
1476
130
    std::map<GlobalIndex, FileSlotRewriteInfo> global_to_file_slot;
1477
173
    for (const auto& mapping : mappings) {
1478
173
        const auto entry_it = filter_entries.find(mapping.global_index);
1479
173
        if (entry_it == filter_entries.end() || !entry_it->second.is_local()) {
1480
34
            continue;
1481
34
        }
1482
139
        DORIS_CHECK(mapping.file_local_id.has_value());
1483
139
        global_to_file_slot.emplace(
1484
139
                mapping.global_index,
1485
139
                FileSlotRewriteInfo {.block_position = entry_it->second.local_index().value(),
1486
139
                                     .file_type = mapping.file_type,
1487
139
                                     .table_type = mapping.table_type,
1488
139
                                     .file_column_name = mapping.file_column_name});
1489
139
    }
1490
130
    return global_to_file_slot;
1491
130
}
1492
1493
Status TableColumnMapper::_create_by_index_mapping(const ColumnDefinition& table_column,
1494
                                                   const std::vector<ColumnDefinition>& file_schema,
1495
19
                                                   ColumnMapping* mapping) {
1496
19
    DORIS_CHECK(mapping != nullptr);
1497
19
    DORIS_CHECK(!table_column.is_partition_key);
1498
1499
    // Key contract: in BY_INDEX mode, `ColumnDefinition::identifier` TYPE_INT is interpreted as the
1500
    // 0-based position of this column inside `file_schema`. FE writes the physical file position
1501
    // of each non-partition projected column into that identifier. This interpretation allows:
1502
    //   - sparse projection: read only a subset of file columns (for example only `_col2`
1503
    //     and `_col4`);
1504
    //   - column reordering: table column order differs from file column order;
1505
    //   - no many-to-one mapping: FE must guarantee that each file position is referenced by at
1506
    //     most one table column.
1507
19
    const auto file_index = table_column.get_identifier_position();
1508
1509
    // Case A: file_index is in range, so build a direct positional mapping.
1510
    // The file column name (for example `_col0`) is intentionally ignored here.
1511
19
    if (file_index >= 0 && static_cast<size_t>(file_index) < file_schema.size()) {
1512
16
        return _create_direct_mapping(table_column, file_schema[static_cast<size_t>(file_index)],
1513
16
                                      mapping);
1514
16
    }
1515
1516
    // Case B: file_index is out of range, which means the file does not contain this column.
1517
    // Route it through the missing-column path used by schema evolution.
1518
3
    if (table_column.default_expr != nullptr) {
1519
1
        _set_constant_mapping(mapping, table_column.default_expr);
1520
1
        return Status::OK();
1521
1
    }
1522
    // Keep the mapping empty (`file_local_id` remains `nullopt`) and let the upper finalize
1523
    // stage fill NULL/default values.
1524
2
    return Status::OK();
1525
3
}
1526
1527
17
void TableColumnMapper::_set_constant_mapping(ColumnMapping* mapping, VExprContextSPtr expr) {
1528
17
    DORIS_CHECK(mapping != nullptr);
1529
17
    DORIS_CHECK(expr != nullptr);
1530
17
    mapping->default_expr = std::move(expr);
1531
17
    mapping->constant_index = _constant_map.add(ConstantEntry {
1532
17
            .global_index = mapping->global_index,
1533
17
            .expr = mapping->default_expr,
1534
17
            .type = mapping->table_type,
1535
17
    });
1536
17
    mapping->filter_conversion = FilterConversionType::CONSTANT;
1537
17
}
1538
1539
Status TableColumnMapper::_create_mapping_for_column(const ColumnDefinition& table_column,
1540
                                                     GlobalIndex global_index,
1541
230
                                                     ColumnMapping* mapping) {
1542
230
    DORIS_CHECK(mapping != nullptr);
1543
230
    *mapping = ColumnMapping {};
1544
230
    mapping->global_index = global_index;
1545
230
    mapping->table_column_name = table_column.name;
1546
230
    mapping->table_type = table_column.type;
1547
230
    const auto row_lineage_type = row_lineage_virtual_column_type(table_column, _options.mode);
1548
230
    if (const auto* partition_value = find_partition_value(table_column, _partition_values);
1549
230
        table_column.is_partition_key && partition_value != nullptr) {
1550
        // Partition values are split constants and must take precedence over defaults.
1551
10
        _set_constant_mapping(mapping, VExprContext::create_shared(VLiteral::create_shared(
1552
10
                                               mapping->table_type, *partition_value)));
1553
220
    } else if (_options.mode == TableColumnMappingMode::BY_INDEX &&
1554
220
               !table_column.is_partition_key && table_column.has_identifier_field_id()) {
1555
        // BY_INDEX interprets ColumnDefinition::identifier as physical file position.
1556
19
        RETURN_IF_ERROR(_create_by_index_mapping(table_column, _file_schema, mapping));
1557
201
    } else if (const auto* file_field = _find_file_field(table_column, _file_schema)) {
1558
        // Normal physical file column mapping.
1559
171
        RETURN_IF_ERROR(_create_direct_mapping(table_column, *file_field, mapping));
1560
170
        if (row_lineage_type != TableVirtualColumnType::INVALID) {
1561
            // Iceberg v3 rewritten files may physically contain row lineage metadata fields.
1562
            // File non-null values must be preserved, while file NULLs still inherit from data file
1563
            // metadata in IcebergTableReader. Therefore the mapping has a real file source plus a
1564
            // virtual post-materialization step, and filters must wait for finalize output.
1565
10
            mapping->virtual_column_type = row_lineage_type;
1566
10
            mapping->filter_conversion = FilterConversionType::FINALIZE_ONLY;
1567
10
        }
1568
170
    } else if (row_lineage_type != TableVirtualColumnType::INVALID) {
1569
        // Iceberg row lineage metadata fields are optional in data files. Missing fields are exposed
1570
        // as all-NULL table columns first; IcebergTableReader fills inherited values only when the
1571
        // split carries first_row_id / last_updated_sequence_number metadata.
1572
        // FE may attach a default_expr to these hidden metadata columns, but the Iceberg v3
1573
        // inheritance rule must take precedence over the generic missing-column default path.
1574
13
        mapping->virtual_column_type = row_lineage_type;
1575
17
    } else if (table_column.name == BeConsts::ICEBERG_ROWID_COL) {
1576
        // Doris internal Iceberg row locator is never a physical Iceberg data column. It is built
1577
        // from file path, row position and partition metadata for delete/update/merge.
1578
2
        mapping->virtual_column_type = TableVirtualColumnType::ICEBERG_ROWID;
1579
15
    } else if (table_column.default_expr != nullptr) {
1580
        // Missing schema-evolution column with an explicit default expression.
1581
6
        _set_constant_mapping(mapping, table_column.default_expr);
1582
9
    } else {
1583
9
        if (table_column.is_partition_key) {
1584
0
            return Status::InvalidArgument(
1585
0
                    "Table column '{}' (global_index={}) does not have a matching partition value",
1586
0
                    table_column.name, mapping->global_index.value());
1587
0
        }
1588
9
    }
1589
229
    return Status::OK();
1590
230
}
1591
1592
Status TableColumnMapper::_create_hidden_filter_mapping(const ColumnDefinition& table_column,
1593
                                                        GlobalIndex global_index,
1594
2
                                                        ColumnMapping* mapping) {
1595
2
    auto status = _create_mapping_for_column(table_column, global_index, mapping);
1596
2
    if (mapping->file_local_id.has_value() || mapping->constant_index.has_value() ||
1597
2
        mapping->virtual_column_type != TableVirtualColumnType::INVALID) {
1598
0
        return Status::OK();
1599
0
    }
1600
2
    if (_options.mode == TableColumnMappingMode::BY_NAME) {
1601
0
        return status;
1602
0
    }
1603
1604
    // Predicate-only slot refs carry the table name/type but do not carry the table-format field
1605
    // id used by BY_FIELD_ID or the file position used by BY_INDEX. Use a name fallback only for
1606
    // hidden filter localization; projected columns still obey the requested mapping mode.
1607
2
    const auto* file_field =
1608
2
            matcher_for_mode(TableColumnMappingMode::BY_NAME).find(table_column, _file_schema);
1609
2
    if (file_field == nullptr) {
1610
0
        return status;
1611
0
    }
1612
2
    ColumnMapping fallback_mapping;
1613
2
    fallback_mapping.global_index = global_index;
1614
2
    fallback_mapping.table_column_name = table_column.name;
1615
2
    fallback_mapping.table_type = table_column.type;
1616
2
    RETURN_IF_ERROR(_create_direct_mapping(table_column, *file_field, &fallback_mapping));
1617
2
    *mapping = std::move(fallback_mapping);
1618
2
    return Status::OK();
1619
2
}
1620
1621
Status TableColumnMapper::_build_hidden_filter_mappings(
1622
122
        const std::vector<TableFilter>& table_filters) {
1623
122
    _hidden_mappings.clear();
1624
1625
122
    std::map<GlobalIndex, ColumnDefinition> filter_columns;
1626
122
    for (const auto& table_filter : table_filters) {
1627
65
        if (table_filter.conjunct != nullptr) {
1628
63
            collect_top_level_slot_columns(table_filter.conjunct->root(), &filter_columns);
1629
63
        }
1630
65
    }
1631
1632
    // TableColumnPredicates only carry GlobalIndex and predicate objects. They do not provide the
1633
    // top-level column name/type needed to build a hidden mapping, so a predicate-only column can
1634
    // be hidden-mapped only when the same root slot also appears in a conjunct.
1635
122
    for (const auto& [global_index, table_column] : filter_columns) {
1636
64
        if (_find_mapping(global_index) != nullptr) {
1637
            // Ignore columns that are already mapped by the projected columns
1638
62
            continue;
1639
62
        }
1640
2
        ColumnMapping mapping;
1641
2
        RETURN_IF_ERROR(_create_hidden_filter_mapping(table_column, global_index, &mapping));
1642
2
        if (mapping.file_local_id.has_value() || mapping.constant_index.has_value() ||
1643
2
            mapping.virtual_column_type != TableVirtualColumnType::INVALID) {
1644
2
            _hidden_mappings.push_back(std::move(mapping));
1645
2
        }
1646
2
    }
1647
122
    return Status::OK();
1648
122
}
1649
1650
Status TableColumnMapper::create_mapping(const std::vector<ColumnDefinition>& projected_columns,
1651
                                         const std::map<std::string, Field>& partition_values,
1652
160
                                         const std::vector<ColumnDefinition>& file_schema) {
1653
160
    clear();
1654
160
    _partition_values = partition_values;
1655
160
    _file_schema = file_schema;
1656
387
    for (size_t column_idx = 0; column_idx < projected_columns.size(); ++column_idx) {
1657
228
        ColumnMapping mapping;
1658
228
        RETURN_IF_ERROR(_create_mapping_for_column(projected_columns[column_idx],
1659
228
                                                   GlobalIndex(column_idx), &mapping));
1660
227
        _mappings.push_back(std::move(mapping));
1661
227
    }
1662
159
    return Status::OK();
1663
160
}
1664
1665
390
std::vector<ColumnMapping> TableColumnMapper::_filter_visible_mappings() const {
1666
390
    std::vector<ColumnMapping> mappings;
1667
390
    mappings.reserve(_mappings.size() + _hidden_mappings.size());
1668
390
    mappings.insert(mappings.end(), _mappings.begin(), _mappings.end());
1669
390
    mappings.insert(mappings.end(), _hidden_mappings.begin(), _hidden_mappings.end());
1670
390
    return mappings;
1671
390
}
1672
1673
130
Status TableColumnMapper::_build_filter_entries(const FileScanRequest& file_request) {
1674
130
    _filter_entries.clear();
1675
130
    const auto mappings = _filter_visible_mappings();
1676
173
    for (const auto& mapping : mappings) {
1677
173
        FilterEntry entry;
1678
173
        if (mapping.constant_index.has_value()) {
1679
10
            entry = FilterEntry::constant(*mapping.constant_index);
1680
163
        } else if (mapping.file_local_id.has_value() &&
1681
163
                   filter_conversion_has_local_source(mapping.filter_conversion)) {
1682
140
            const auto local_position_it =
1683
140
                    file_request.local_positions.find(LocalColumnId(*mapping.file_local_id));
1684
140
            if (local_position_it != file_request.local_positions.end()) {
1685
139
                entry = FilterEntry::local(local_position_it->second);
1686
139
            }
1687
140
        }
1688
173
        _filter_entries.emplace(mapping.global_index, entry);
1689
173
    }
1690
130
    return Status::OK();
1691
130
}
1692
1693
Status TableColumnMapper::create_scan_request(
1694
        const std::vector<TableFilter>& table_filters,
1695
        const TableColumnPredicates& table_column_predicates,
1696
        const std::vector<ColumnDefinition>& projected_columns, FileScanRequest* file_request,
1697
122
        RuntimeState* runtime_state) {
1698
    // FileReader evaluates expressions against a file-local block. This mapper owns the
1699
    // table-column to file-column conversion, so it also owns the file-local block positions.
1700
122
    file_request->predicate_columns.clear();
1701
122
    file_request->non_predicate_columns.clear();
1702
122
    file_request->local_positions.clear();
1703
122
    file_request->conjuncts.clear();
1704
122
    file_request->delete_conjuncts.clear();
1705
122
    file_request->column_predicate_filters.clear();
1706
122
    _filter_entries.clear();
1707
    // 1. Build referenced non-predicate columns
1708
284
    for (size_t column_idx = 0; column_idx < projected_columns.size(); ++column_idx) {
1709
162
        const auto global_index = GlobalIndex(column_idx);
1710
162
        auto* mapping = _find_mapping(global_index);
1711
162
        if (mapping != nullptr && mapping->file_local_id.has_value()) {
1712
            // A file column can be read lazily as a non-predicate column only when it is not used
1713
            // by row-level expression filters. Single-column ColumnPredicate filters are pruning
1714
            // hints only and must not force row-level predicate materialization.
1715
140
            bool used_by_filter = false;
1716
140
            for (const auto& table_filter : table_filters) {
1717
79
                const auto& global_indices = table_filter.global_indices;
1718
79
                if (std::find(global_indices.begin(), global_indices.end(), global_index) !=
1719
79
                            global_indices.end() &&
1720
79
                    filter_conversion_has_local_source(mapping->filter_conversion)) {
1721
55
                    used_by_filter = true;
1722
55
                    break;
1723
55
                }
1724
79
            }
1725
140
            if (!used_by_filter || !enable_lazy_materialization()) {
1726
87
                RETURN_IF_ERROR(add_scan_column(file_request, mapping, false,
1727
87
                                                force_full_complex_scan_projection()));
1728
87
            }
1729
140
        }
1730
162
    }
1731
    // 2. Build referenced predicate columns
1732
    // Hidden filter mappings must be built before localizing filters, so that they can be localized together with visible mappings and referenced by localized filter expressions.
1733
122
    RETURN_IF_ERROR(_build_hidden_filter_mappings(table_filters));
1734
122
    RETURN_IF_ERROR(
1735
122
            localize_filters(table_filters, table_column_predicates, file_request, runtime_state));
1736
    // 3. Rebuild output projection expressions for projected columns. localize_filters() has
1737
    // already applied the final scan projection to mapping.file_type/projected_file_children before
1738
    // rewriting filter expressions.
1739
162
    for (auto& mapping : _mappings) {
1740
162
        if (!mapping.file_local_id.has_value()) {
1741
22
            continue;
1742
22
        }
1743
140
        auto position_it =
1744
140
                file_request->local_positions.find(LocalColumnId(*mapping.file_local_id));
1745
140
        DORIS_CHECK(position_it != file_request->local_positions.end())
1746
0
                << file_request->local_positions.size() << " " << *mapping.file_local_id << " "
1747
0
                << mapping.file_column_name;
1748
140
        rebuild_projection(&mapping, position_it->second);
1749
140
    }
1750
122
    return Status::OK();
1751
122
}
1752
1753
310
ColumnMapping* TableColumnMapper::_find_mapping(GlobalIndex global_index) {
1754
400
    for (auto& mapping : _mappings) {
1755
400
        if (mapping.global_index == global_index) {
1756
306
            return &mapping;
1757
306
        }
1758
400
    }
1759
4
    return nullptr;
1760
310
}
1761
1762
84
ColumnMapping* TableColumnMapper::_find_filter_mapping(GlobalIndex global_index) {
1763
84
    if (auto* mapping = _find_mapping(global_index); mapping != nullptr) {
1764
82
        return mapping;
1765
82
    }
1766
2
    for (auto& mapping : _hidden_mappings) {
1767
2
        if (mapping.global_index == global_index) {
1768
2
            return &mapping;
1769
2
        }
1770
2
    }
1771
0
    return nullptr;
1772
2
}
1773
1774
Status TableColumnMapper::localize_filters(const std::vector<TableFilter>& table_filters,
1775
                                           const TableColumnPredicates& table_column_predicates,
1776
                                           FileScanRequest* file_request,
1777
130
                                           RuntimeState* runtime_state) {
1778
130
    FilterProjectionMap filter_projections;
1779
130
    auto filter_mappings = _filter_visible_mappings();
1780
130
    RETURN_IF_ERROR(build_nested_struct_filter_projection_map(table_filters, filter_mappings,
1781
130
                                                              &filter_projections));
1782
130
    for (const auto& table_filter : table_filters) {
1783
74
        for (const auto& global_index : table_filter.global_indices) {
1784
74
            auto* mapping = _find_filter_mapping(global_index);
1785
74
            if (mapping == nullptr || !mapping->file_local_id.has_value() ||
1786
74
                !filter_conversion_has_local_source(mapping->filter_conversion)) {
1787
11
                continue;
1788
11
            }
1789
63
            RETURN_IF_ERROR(add_scan_column(file_request, mapping, enable_lazy_materialization(),
1790
63
                                            force_full_complex_scan_projection(),
1791
63
                                            &filter_projections));
1792
63
        }
1793
71
    }
1794
    // Rebuild the file type for every scan-local mapping before expression rewrite. Predicate-only
1795
    // hidden mappings must see the same projected file type as the file reader will produce.
1796
171
    for (auto& mapping : _mappings) {
1797
171
        if (mapping.file_local_id.has_value() &&
1798
171
            file_request->local_positions.contains(LocalColumnId(*mapping.file_local_id))) {
1799
147
            RETURN_IF_ERROR(apply_scan_projection_to_mapping_file_type(*file_request, &mapping));
1800
147
        }
1801
171
    }
1802
130
    for (auto& mapping : _hidden_mappings) {
1803
2
        if (mapping.file_local_id.has_value() &&
1804
2
            file_request->local_positions.contains(LocalColumnId(*mapping.file_local_id))) {
1805
2
            RETURN_IF_ERROR(apply_scan_projection_to_mapping_file_type(*file_request, &mapping));
1806
2
        }
1807
2
    }
1808
130
    RETURN_IF_ERROR(_build_filter_entries(*file_request));
1809
1810
    // Build the complete table-slot rewrite map after all predicate columns have been assigned.
1811
    // This keeps expression localization independent from filter iteration order.
1812
130
    filter_mappings = _filter_visible_mappings();
1813
130
    const auto global_to_file_slot = build_file_slot_rewrite_map(filter_mappings, _filter_entries);
1814
130
    for (const auto& table_filter : table_filters) {
1815
71
        if (table_filter.conjunct != nullptr &&
1816
71
            table_filter_has_only_local_entries(table_filter, _filter_entries)) {
1817
58
            RewriteContext rewrite_context {.runtime_state = runtime_state};
1818
58
            VExprSPtr rewrite_root;
1819
58
            Status clone_status;
1820
58
            try {
1821
58
                clone_status = clone_table_expr_tree(table_filter.conjunct->root(), &rewrite_root);
1822
58
            } catch (const Exception& e) {
1823
                // Some table filters contain complex intermediate values, for example
1824
                // `element_at(MAP_VALUES(m)[1], 'age') > 30`. The current file-local rewrite only
1825
                // understands top-level slots and struct-element paths rooted at top-level slots;
1826
                // cloning such expressions can hit the generic TExpr complex-type limitation.
1827
                // Leave them above TableReader, where Scanner evaluates the original table-level
1828
                // conjunct after final materialization.
1829
0
#ifndef NDEBUG
1830
0
                return Status::InternalError(
1831
0
                        "Failed to clone table filter for file-local rewrite: {}, expr={}",
1832
0
                        e.to_string(), table_filter.conjunct->root()->debug_string());
1833
#else
1834
                continue;
1835
#endif
1836
0
            } catch (const std::exception& e) {
1837
0
#ifndef NDEBUG
1838
0
                return Status::InternalError(
1839
0
                        "Failed to clone table filter for file-local rewrite: {}, expr={}",
1840
0
                        e.what(), table_filter.conjunct->root()->debug_string());
1841
#else
1842
                continue;
1843
#endif
1844
0
            }
1845
58
            if (!clone_status.ok()) {
1846
0
#ifndef NDEBUG
1847
0
                return Status::InternalError(
1848
0
                        "Failed to clone table filter for file-local rewrite: {}, expr={}",
1849
0
                        clone_status.to_string(), table_filter.conjunct->root()->debug_string());
1850
#else
1851
                continue;
1852
#endif
1853
0
            }
1854
58
            bool can_localize = true;
1855
58
            auto localized_root = rewrite_table_expr_to_file_expr(rewrite_root, global_to_file_slot,
1856
58
                                                                  filter_mappings, &rewrite_context,
1857
58
                                                                  &can_localize);
1858
58
            if (!can_localize) {
1859
4
                continue;
1860
4
            }
1861
54
            auto localized_conjunct = VExprContext::create_shared(std::move(localized_root));
1862
54
            RETURN_IF_ERROR(rewrite_context.prepare_created_exprs(localized_conjunct.get()));
1863
54
            file_request->conjuncts.push_back(std::move(localized_conjunct));
1864
54
        }
1865
71
    }
1866
130
    if (enable_column_predicate_filters()) {
1867
126
        for (const auto& [global_index, predicates] : table_column_predicates) {
1868
10
            const auto* mapping = _find_filter_mapping(global_index);
1869
10
            const auto entry_it = _filter_entries.find(global_index);
1870
10
            if (mapping == nullptr || !mapping->file_local_id.has_value() || predicates.empty() ||
1871
10
                entry_it == _filter_entries.end() || !entry_it->second.is_local() ||
1872
10
                !column_predicate_can_use_local_source(mapping->filter_conversion) ||
1873
10
                mapping->file_type == nullptr) {
1874
1
                continue;
1875
1
            }
1876
9
            FileColumnPredicateFilter column_predicate_filter;
1877
9
            column_predicate_filter.file_column_id = LocalColumnId(*mapping->file_local_id);
1878
9
            column_predicate_filter.target =
1879
9
                    FileNestedPredicateTarget(column_predicate_filter.file_column_id);
1880
9
            const auto file_primitive_type =
1881
9
                    remove_nullable(mapping->file_type)->get_primitive_type();
1882
9
            for (const auto& predicate : predicates) {
1883
9
                DORIS_CHECK(predicate != nullptr);
1884
9
                if (predicate->primitive_type() == file_primitive_type) {
1885
9
                    column_predicate_filter.predicates.push_back(predicate);
1886
9
                }
1887
9
            }
1888
9
            if (column_predicate_filter.predicates.empty()) {
1889
0
                continue;
1890
0
            }
1891
9
            file_request->column_predicate_filters.push_back(std::move(column_predicate_filter));
1892
9
        }
1893
126
        for (const auto& table_filter : table_filters) {
1894
69
            if (table_filter.conjunct == nullptr ||
1895
69
                !table_filter_has_only_local_entries(table_filter, _filter_entries)) {
1896
13
                continue;
1897
13
            }
1898
56
            std::vector<FileColumnPredicateFilter> nested_column_predicate_filters;
1899
56
            collect_nested_column_predicate_filters(table_filter.conjunct->root(), filter_mappings,
1900
56
                                                    &nested_column_predicate_filters);
1901
56
            for (auto& column_predicate_filter : nested_column_predicate_filters) {
1902
12
                merge_column_predicate_filter(std::move(column_predicate_filter),
1903
12
                                              &file_request->column_predicate_filters);
1904
12
            }
1905
56
        }
1906
126
    }
1907
130
    return Status::OK();
1908
130
}
1909
1910
const ColumnDefinition* TableColumnMapper::_find_file_field(
1911
        const ColumnDefinition& table_column,
1912
201
        const std::vector<ColumnDefinition>& file_schema) const {
1913
201
    if (table_column.name.starts_with(BeConsts::GLOBAL_ROWID_COL)) {
1914
2
        const auto field_it = std::ranges::find_if(file_schema, [](const ColumnDefinition& field) {
1915
2
            return field.column_type == ColumnType::GLOBAL_ROWID;
1916
2
        });
1917
1
        return field_it == file_schema.end() ? nullptr : &*field_it;
1918
1
    }
1919
200
    return matcher_for_mode(_options.mode).find(table_column, file_schema);
1920
201
}
1921
1922
Status TableColumnMapper::_create_direct_mapping(const ColumnDefinition& table_column,
1923
                                                 const ColumnDefinition& file_field,
1924
292
                                                 ColumnMapping* mapping) const {
1925
292
    DORIS_CHECK(mapping != nullptr);
1926
292
    DORIS_CHECK(file_field.local_id >= 0 || file_field.local_id == GLOBAL_ROWID_COLUMN_ID);
1927
292
    mapping->file_local_id = file_field.local_id;
1928
292
    mapping->table_column_name = table_column.name;
1929
292
    mapping->file_column_name = file_field.name;
1930
292
    mapping->original_file_type = file_field.type;
1931
292
    mapping->original_file_children = file_field.children;
1932
292
    mapping->projected_file_children = file_field.children;
1933
292
    mapping->file_type = file_field.type;
1934
292
    mapping->is_trivial = mapping_can_use_file_column_directly(*mapping);
1935
292
    mapping->filter_conversion = mapping->is_trivial ? FilterConversionType::COPY_DIRECTLY
1936
292
                                                     : FilterConversionType::CAST_FILTER;
1937
292
    mapping->child_mappings.clear();
1938
1939
292
    auto table_children = table_column.children;
1940
292
    const auto nested_table_type = remove_nullable(mapping->table_type);
1941
    // Some scan paths, especially SELECT *, only carry the complete complex DataType for a table
1942
    // column and leave ColumnDefinition::children empty. If the file type is an older complex
1943
    // schema, treating this as a leaf mapping would make TableReader fall back to a plain CAST.
1944
    // That is invalid for evolved structs with different field counts.
1945
    //
1946
    // Example:
1947
    //   table column type: Map(String, Struct(age, full_name, gender))
1948
    //   old file type:    Map(String, Struct(age, name))
1949
    //   table children:   empty
1950
    //
1951
    // Synthesize key/value/struct-field children from the table type so the normal recursive
1952
    // mapping path can rematerialize `name -> full_name` and fill missing `gender` with defaults,
1953
    // instead of trying to CAST Struct(age, name) to Struct(age, full_name, gender).
1954
292
    const bool synthesized_table_children =
1955
292
            table_children.empty() && is_complex_type(nested_table_type->get_primitive_type()) &&
1956
292
            !mapping->table_type->equals(*mapping->file_type);
1957
292
    if (synthesized_table_children) {
1958
4
        table_children = synthesize_complex_children_from_type(mapping->table_type);
1959
288
    } else if (!table_children.empty() && !mapping->table_type->equals(*mapping->file_type)) {
1960
57
        complete_required_complex_children_from_type(mapping->table_type, &table_children);
1961
57
    }
1962
1963
292
    if (!table_children.empty()) {
1964
76
        if (!is_complex_type(remove_nullable(mapping->file_type)->get_primitive_type())) {
1965
0
            return Status::NotSupported(
1966
0
                    "Cannot map complex table column '{}' to scalar parquet column '{}', table "
1967
0
                    "type={}, file type={}",
1968
0
                    table_column.name, file_field.name, mapping->table_type->get_name(),
1969
0
                    mapping->file_type->get_name());
1970
0
        }
1971
76
        RETURN_IF_ERROR(validate_file_schema_children(file_field));
1972
75
        std::vector<int32_t> synthesized_used_file_child_ids;
1973
193
        for (size_t table_child_idx = 0; table_child_idx < table_children.size();
1974
118
             ++table_child_idx) {
1975
118
            const auto& table_child = table_children[table_child_idx];
1976
118
            const auto* file_child =
1977
118
                    find_file_child_for_mapping(table_child, file_field, _options.mode,
1978
118
                                                table_child_idx, synthesized_table_children);
1979
118
            if (synthesized_table_children && file_child != nullptr) {
1980
8
                const auto file_child_id = file_child->file_local_id();
1981
8
                if (std::ranges::find(synthesized_used_file_child_ids, file_child_id) !=
1982
8
                    synthesized_used_file_child_ids.end()) {
1983
2
                    file_child = nullptr;
1984
2
                    for (const auto& candidate : file_field.children) {
1985
2
                        const auto candidate_id = candidate.file_local_id();
1986
2
                        if (std::ranges::find(synthesized_used_file_child_ids, candidate_id) ==
1987
2
                            synthesized_used_file_child_ids.end()) {
1988
2
                            file_child = &candidate;
1989
2
                            break;
1990
2
                        }
1991
2
                    }
1992
2
                }
1993
8
                if (file_child != nullptr) {
1994
8
                    synthesized_used_file_child_ids.push_back(file_child->file_local_id());
1995
8
                }
1996
8
            }
1997
118
            if (file_child == nullptr) {
1998
15
                ColumnMapping child_mapping;
1999
15
                child_mapping.table_column_name = table_child.name;
2000
15
                child_mapping.file_column_name = table_child.name;
2001
15
                child_mapping.table_type = table_child.type;
2002
15
                child_mapping.file_type = table_child.type;
2003
15
                child_mapping.filter_conversion = FilterConversionType::FINALIZE_ONLY;
2004
15
                mapping->child_mappings.push_back(std::move(child_mapping));
2005
15
                continue;
2006
15
            }
2007
103
            ColumnMapping child_mapping;
2008
103
            child_mapping.table_column_name = table_child.name;
2009
103
            child_mapping.table_type = table_child.type;
2010
103
            RETURN_IF_ERROR(_create_direct_mapping(table_child, *file_child, &child_mapping));
2011
103
            mapping->child_mappings.push_back(std::move(child_mapping));
2012
103
        }
2013
75
        if (needs_projected_file_type_rebuild(*mapping)) {
2014
            // If complex projection prunes some children, we have to rebuild the projected file type to make sure the reader expression can find the correct child types by name.
2015
62
            RETURN_IF_ERROR(rebuild_projected_file_children_and_type(
2016
62
                    mapping->file_type, mapping->original_file_children, mapping->child_mappings,
2017
62
                    &mapping->projected_file_children, &mapping->file_type));
2018
62
            DCHECK(mapping->table_type != nullptr);
2019
62
            mapping->is_trivial = mapping_can_use_file_column_directly(*mapping);
2020
            // TODO: ? READER_EXPRESSION
2021
62
            mapping->filter_conversion = mapping->is_trivial
2022
62
                                                 ? FilterConversionType::COPY_DIRECTLY
2023
62
                                                 : FilterConversionType::READER_EXPRESSION;
2024
62
        }
2025
75
    }
2026
291
    return Status::OK();
2027
292
}
2028
2029
} // namespace doris::format