Coverage Report

Created: 2026-07-02 15:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format_v2/column_mapper.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format_v2/column_mapper.h"
19
20
#include <algorithm>
21
#include <cstddef>
22
#include <memory>
23
#include <sstream>
24
#include <string_view>
25
#include <utility>
26
#include <vector>
27
28
#include "common/consts.h"
29
#include "common/exception.h"
30
#include "common/status.h"
31
#include "core/data_type/convert_field_to_type.h"
32
#include "core/data_type/data_type_array.h"
33
#include "core/data_type/data_type_map.h"
34
#include "core/data_type/data_type_nullable.h"
35
#include "core/data_type/data_type_string.h"
36
#include "core/data_type/data_type_struct.h"
37
#include "core/data_type/primitive_type.h"
38
#include "exprs/runtime_filter_expr.h"
39
#include "exprs/short_circuit_evaluation_expr.h"
40
#include "exprs/vcase_expr.h"
41
#include "exprs/vcast_expr.h"
42
#include "exprs/vcondition_expr.h"
43
#include "exprs/vectorized_fn_call.h"
44
#include "exprs/vexpr_context.h"
45
#include "exprs/vin_predicate.h"
46
#include "exprs/vliteral.h"
47
#include "format_v2/column_mapper_nested.h"
48
#include "format_v2/expr/cast.h"
49
#include "format_v2/file_reader.h"
50
#include "format_v2/schema_projection.h"
51
#include "format_v2/table_reader.h"
52
#include "gen_cpp/Exprs_types.h"
53
54
namespace doris::format {
55
56
namespace {
57
58
16
std::string mapping_mode_to_string(TableColumnMappingMode mode) {
59
16
    switch (mode) {
60
1
    case TableColumnMappingMode::BY_FIELD_ID:
61
1
        return "BY_FIELD_ID";
62
14
    case TableColumnMappingMode::BY_NAME:
63
14
        return "BY_NAME";
64
1
    case TableColumnMappingMode::BY_INDEX:
65
1
        return "BY_INDEX";
66
16
    }
67
0
    return "UNKNOWN";
68
16
}
69
70
13.0M
bool column_has_name(const ColumnDefinition& column, const std::string& name) {
71
13.0M
    if (to_lower(column.name) == to_lower(name)) {
72
290k
        return true;
73
290k
    }
74
12.8M
    if (column.has_identifier_name() && to_lower(column.get_identifier_name()) == to_lower(name)) {
75
0
        return true;
76
0
    }
77
12.7M
    return std::ranges::any_of(column.name_mapping, [&](const std::string& alias) {
78
1
        return to_lower(alias) == to_lower(name);
79
1
    });
80
12.7M
}
81
82
6.79M
bool column_names_match(const ColumnDefinition& lhs, const ColumnDefinition& rhs) {
83
6.79M
    if (column_has_name(rhs, lhs.name)) {
84
290k
        return true;
85
290k
    }
86
6.50M
    if (lhs.has_identifier_name() && column_has_name(rhs, lhs.get_identifier_name())) {
87
1
        return true;
88
1
    }
89
6.50M
    return std::ranges::any_of(lhs.name_mapping, [&](const std::string& alias) {
90
18
        return column_has_name(rhs, alias);
91
18
    });
92
6.50M
}
93
94
class ColumnMatcher {
95
public:
96
3
    virtual ~ColumnMatcher() = default;
97
    virtual const ColumnDefinition* find(
98
            const ColumnDefinition& table_column,
99
            const std::vector<ColumnDefinition>& file_schema) const = 0;
100
};
101
102
class FieldIdMatcher final : public ColumnMatcher {
103
public:
104
    const ColumnDefinition* find(const ColumnDefinition& table_column,
105
75.3k
                                 const std::vector<ColumnDefinition>& file_schema) const override {
106
75.3k
        if (!table_column.has_identifier_field_id()) {
107
7
            return nullptr;
108
7
        }
109
75.3k
        const auto field_id = table_column.get_identifier_field_id();
110
469k
        const auto field_it = std::ranges::find_if(file_schema, [&](const ColumnDefinition& field) {
111
469k
            return field.has_identifier_field_id() && field.get_identifier_field_id() == field_id;
112
469k
        });
113
75.3k
        return field_it == file_schema.end() ? nullptr : &*field_it;
114
75.3k
    }
115
};
116
117
class NameMatcher final : public ColumnMatcher {
118
public:
119
    const ColumnDefinition* find(const ColumnDefinition& table_column,
120
290k
                                 const std::vector<ColumnDefinition>& file_schema) const override {
121
6.79M
        const auto field_it = std::ranges::find_if(file_schema, [&](const ColumnDefinition& field) {
122
6.79M
            return column_names_match(table_column, field);
123
6.79M
        });
124
290k
        return field_it == file_schema.end() ? nullptr : &*field_it;
125
290k
    }
126
};
127
128
class PositionMatcher final : public ColumnMatcher {
129
public:
130
    const ColumnDefinition* find(const ColumnDefinition& table_column,
131
2
                                 const std::vector<ColumnDefinition>& file_schema) const override {
132
2
        if (!table_column.has_identifier_field_id()) {
133
2
            return nullptr;
134
2
        }
135
0
        const auto position = table_column.get_identifier_position();
136
0
        if (position < 0 || static_cast<size_t>(position) >= file_schema.size()) {
137
0
            return nullptr;
138
0
        }
139
0
        return &file_schema[static_cast<size_t>(position)];
140
0
    }
141
};
142
143
366k
const ColumnMatcher& matcher_for_mode(TableColumnMappingMode mode) {
144
366k
    static const FieldIdMatcher field_id_matcher;
145
366k
    static const NameMatcher name_matcher;
146
366k
    static const PositionMatcher position_matcher;
147
366k
    switch (mode) {
148
75.3k
    case TableColumnMappingMode::BY_FIELD_ID:
149
75.3k
        return field_id_matcher;
150
291k
    case TableColumnMappingMode::BY_NAME:
151
291k
        return name_matcher;
152
2
    case TableColumnMappingMode::BY_INDEX:
153
2
        return position_matcher;
154
366k
    }
155
0
    return field_id_matcher;
156
366k
}
157
158
12
std::string virtual_column_type_to_string(TableVirtualColumnType type) {
159
12
    switch (type) {
160
9
    case TableVirtualColumnType::INVALID:
161
9
        return "INVALID";
162
1
    case TableVirtualColumnType::ROW_ID:
163
1
        return "ROW_ID";
164
1
    case TableVirtualColumnType::LAST_UPDATED_SEQUENCE_NUMBER:
165
1
        return "LAST_UPDATED_SEQUENCE_NUMBER";
166
1
    case TableVirtualColumnType::ICEBERG_ROWID:
167
1
        return "ICEBERG_ROWID";
168
12
    }
169
0
    return "UNKNOWN";
170
12
}
171
172
12
std::string filter_conversion_type_to_string(FilterConversionType type) {
173
12
    switch (type) {
174
3
    case FilterConversionType::COPY_DIRECTLY:
175
3
        return "COPY_DIRECTLY";
176
1
    case FilterConversionType::CAST_FILTER:
177
1
        return "CAST_FILTER";
178
1
    case FilterConversionType::READER_EXPRESSION:
179
1
        return "READER_EXPRESSION";
180
6
    case FilterConversionType::FINALIZE_ONLY:
181
6
        return "FINALIZE_ONLY";
182
1
    case FilterConversionType::CONSTANT:
183
1
        return "CONSTANT";
184
12
    }
185
0
    return "UNKNOWN";
186
12
}
187
188
47
std::string data_type_debug_string(const DataTypePtr& type) {
189
47
    return type == nullptr ? "null" : type->get_name();
190
47
}
191
192
11
std::string field_debug_string(const Field& field) {
193
11
    std::ostringstream out;
194
11
    out << "Field{type=" << type_to_string(field.get_type()) << ", value=";
195
11
    switch (field.get_type()) {
196
0
    case TYPE_NULL:
197
0
        out << "null";
198
0
        break;
199
9
    case TYPE_INT:
200
9
        out << field.get<TYPE_INT>();
201
9
        break;
202
0
    case TYPE_BIGINT:
203
0
        out << field.get<TYPE_BIGINT>();
204
0
        break;
205
2
    case TYPE_STRING:
206
2
        out << field.get<TYPE_STRING>();
207
2
        break;
208
0
    default:
209
0
        out << field.to_debug_string(0);
210
0
        break;
211
11
    }
212
11
    out << "}";
213
11
    return out.str();
214
11
}
215
216
template <typename T, typename Formatter>
217
50
std::string join_debug_strings(const std::vector<T>& values, Formatter formatter) {
218
50
    std::ostringstream out;
219
50
    out << "[";
220
72
    for (size_t i = 0; i < values.size(); ++i) {
221
22
        if (i > 0) {
222
1
            out << ", ";
223
1
        }
224
22
        out << formatter(values[i]);
225
22
    }
226
50
    out << "]";
227
50
    return out.str();
228
50
}
column_mapper.cpp:_ZN5doris6format12_GLOBAL__N_118join_debug_stringsINSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEEZNKS0_16ColumnDefinition12debug_stringEvE3$_0EES8_RKSt6vectorIT_SaISC_EET0_
Line
Count
Source
217
11
std::string join_debug_strings(const std::vector<T>& values, Formatter formatter) {
218
11
    std::ostringstream out;
219
11
    out << "[";
220
19
    for (size_t i = 0; i < values.size(); ++i) {
221
8
        if (i > 0) {
222
0
            out << ", ";
223
0
        }
224
8
        out << formatter(values[i]);
225
8
    }
226
11
    out << "]";
227
11
    return out.str();
228
11
}
column_mapper.cpp:_ZN5doris6format12_GLOBAL__N_118join_debug_stringsINS0_16ColumnDefinitionEZNKS3_12debug_stringB5cxx11EvE3$_1EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISC_EET0_
Line
Count
Source
217
11
std::string join_debug_strings(const std::vector<T>& values, Formatter formatter) {
218
11
    std::ostringstream out;
219
11
    out << "[";
220
12
    for (size_t i = 0; i < values.size(); ++i) {
221
1
        if (i > 0) {
222
0
            out << ", ";
223
0
        }
224
1
        out << formatter(values[i]);
225
1
    }
226
11
    out << "]";
227
11
    return out.str();
228
11
}
column_mapper.cpp:_ZN5doris6format12_GLOBAL__N_118join_debug_stringsINS0_16LocalColumnIndexEZNKS3_12debug_stringB5cxx11EvE3$_0EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISC_EET0_
Line
Count
Source
217
2
std::string join_debug_strings(const std::vector<T>& values, Formatter formatter) {
218
2
    std::ostringstream out;
219
2
    out << "[";
220
3
    for (size_t i = 0; i < values.size(); ++i) {
221
1
        if (i > 0) {
222
0
            out << ", ";
223
0
        }
224
1
        out << formatter(values[i]);
225
1
    }
226
2
    out << "]";
227
2
    return out.str();
228
2
}
column_mapper.cpp:_ZN5doris6format12_GLOBAL__N_118join_debug_stringsINS0_16ColumnDefinitionEZNKS0_13ColumnMapping12debug_stringB5cxx11EvE3$_0EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_
Line
Count
Source
217
12
std::string join_debug_strings(const std::vector<T>& values, Formatter formatter) {
218
12
    std::ostringstream out;
219
12
    out << "[";
220
17
    for (size_t i = 0; i < values.size(); ++i) {
221
5
        if (i > 0) {
222
0
            out << ", ";
223
0
        }
224
5
        out << formatter(values[i]);
225
5
    }
226
12
    out << "]";
227
12
    return out.str();
228
12
}
column_mapper.cpp:_ZN5doris6format12_GLOBAL__N_118join_debug_stringsINS0_13ColumnMappingEZNKS3_12debug_stringB5cxx11EvE3$_1EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISC_EET0_
Line
Count
Source
217
12
std::string join_debug_strings(const std::vector<T>& values, Formatter formatter) {
218
12
    std::ostringstream out;
219
12
    out << "[";
220
17
    for (size_t i = 0; i < values.size(); ++i) {
221
5
        if (i > 0) {
222
0
            out << ", ";
223
0
        }
224
5
        out << formatter(values[i]);
225
5
    }
226
12
    out << "]";
227
12
    return out.str();
228
12
}
column_mapper.cpp:_ZN5doris6format12_GLOBAL__N_118join_debug_stringsINS0_13ColumnMappingEZNKS0_17TableColumnMapper12debug_stringB5cxx11EvE3$_0EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_
Line
Count
Source
217
1
std::string join_debug_strings(const std::vector<T>& values, Formatter formatter) {
218
1
    std::ostringstream out;
219
1
    out << "[";
220
3
    for (size_t i = 0; i < values.size(); ++i) {
221
2
        if (i > 0) {
222
1
            out << ", ";
223
1
        }
224
2
        out << formatter(values[i]);
225
2
    }
226
1
    out << "]";
227
1
    return out.str();
228
1
}
column_mapper.cpp:_ZN5doris6format12_GLOBAL__N_118join_debug_stringsINS0_13ColumnMappingEZNKS0_17TableColumnMapper12debug_stringB5cxx11EvE3$_1EENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKSt6vectorIT_SaISD_EET0_
Line
Count
Source
217
1
std::string join_debug_strings(const std::vector<T>& values, Formatter formatter) {
218
1
    std::ostringstream out;
219
1
    out << "[";
220
1
    for (size_t i = 0; i < values.size(); ++i) {
221
0
        if (i > 0) {
222
0
            out << ", ";
223
0
        }
224
0
        out << formatter(values[i]);
225
0
    }
226
1
    out << "]";
227
1
    return out.str();
228
1
}
229
230
} // namespace
231
232
const Field* find_partition_value(const ColumnDefinition& table_column,
233
317k
                                  const std::map<std::string, Field>& partition_values) {
234
566k
    const auto find_by_name = [&](const std::string& name) -> const Field* {
235
566k
        const auto value_it = partition_values.find(name);
236
566k
        return value_it == partition_values.end() ? nullptr : &value_it->second;
237
566k
    };
238
317k
    if (const auto* value = find_by_name(table_column.name); value != nullptr) {
239
10.6k
        return value;
240
10.6k
    }
241
306k
    if (table_column.has_identifier_name()) {
242
249k
        if (const auto* value = find_by_name(table_column.get_identifier_name());
243
249k
            value != nullptr) {
244
0
            return value;
245
0
        }
246
249k
    }
247
306k
    for (const auto& alias : table_column.name_mapping) {
248
21
        if (const auto* value = find_by_name(alias); value != nullptr) {
249
1
            return value;
250
1
        }
251
21
    }
252
306k
    return nullptr;
253
306k
}
254
255
struct FileSlotRewriteInfo {
256
    size_t block_position = 0;
257
    DataTypePtr file_type;
258
    DataTypePtr table_type;
259
    std::string file_column_name;
260
};
261
262
struct RewriteContext {
263
    RuntimeState* runtime_state = nullptr;
264
    std::vector<VExprSPtr> created_exprs {};
265
266
22.0k
    void add_created_expr(VExprSPtr expr) { created_exprs.push_back(std::move(expr)); }
267
268
15.2k
    Status prepare_created_exprs(VExprContext* context) const {
269
15.2k
        DORIS_CHECK(context != nullptr);
270
15.2k
        RowDescriptor row_desc;
271
21.7k
        for (const auto& expr : created_exprs) {
272
21.7k
            if (dynamic_cast<const Cast*>(expr.get()) != nullptr && runtime_state == nullptr) {
273
0
                return Status::InvalidArgument(
274
0
                        "RuntimeState is required to prepare rewritten cast expression {}",
275
0
                        expr->expr_name());
276
0
            }
277
21.7k
            RETURN_IF_ERROR(expr->prepare(runtime_state, row_desc, context));
278
21.7k
        }
279
15.2k
        return Status::OK();
280
15.2k
    }
281
};
282
283
static VExprSPtr create_file_slot_ref(const VSlotRef& slot_ref,
284
                                      const FileSlotRewriteInfo& rewrite_info,
285
15.9k
                                      RewriteContext* rewrite_context) {
286
15.9k
    auto ref =
287
15.9k
            VSlotRef::create_shared(slot_ref.slot_id(), cast_set<int>(rewrite_info.block_position),
288
15.9k
                                    -1, rewrite_info.file_type, rewrite_info.file_column_name);
289
15.9k
    rewrite_context->add_created_expr(ref);
290
15.9k
    return ref;
291
15.9k
}
292
293
48.7k
static bool is_cast_expr(const VExprSPtr& expr) {
294
48.7k
    return dynamic_cast<const Cast*>(expr.get()) != nullptr;
295
48.7k
}
296
297
37.3k
static bool is_binary_comparison_predicate(const VExprSPtr& expr) {
298
37.3k
    if (expr == nullptr || expr->get_num_children() != 2 ||
299
37.3k
        (expr->node_type() != TExprNodeType::BINARY_PRED &&
300
27.2k
         expr->node_type() != TExprNodeType::NULL_AWARE_BINARY_PRED)) {
301
27.2k
        return false;
302
27.2k
    }
303
10.1k
    switch (expr->op()) {
304
4.44k
    case TExprOpcode::EQ:
305
4.44k
    case TExprOpcode::EQ_FOR_NULL:
306
4.99k
    case TExprOpcode::NE:
307
6.51k
    case TExprOpcode::GE:
308
8.13k
    case TExprOpcode::GT:
309
9.41k
    case TExprOpcode::LE:
310
10.0k
    case TExprOpcode::LT:
311
10.0k
        return true;
312
0
    default:
313
0
        return false;
314
10.1k
    }
315
10.1k
}
316
317
16
std::string TableColumnMapperOptions::debug_string() const {
318
16
    std::ostringstream out;
319
16
    out << "TableColumnMapperOptions{mode=" << mapping_mode_to_string(mode) << "}";
320
16
    return out.str();
321
16
}
322
323
11
std::string ColumnDefinition::debug_string() const {
324
11
    std::ostringstream out;
325
11
    out << "ColumnDefinition{name=" << name << ", identifier=" << field_debug_string(identifier)
326
11
        << ", name_mapping="
327
11
        << join_debug_strings(name_mapping, [](const std::string& name) { return name; })
328
11
        << ", local_id=" << local_id << ", type=" << data_type_debug_string(type) << ", children="
329
11
        << join_debug_strings(children,
330
11
                              [](const ColumnDefinition& child) { return child.debug_string(); })
331
11
        << ", has_default_expr=" << (default_expr != nullptr)
332
11
        << ", is_partition_key=" << is_partition_key << "}";
333
11
    return out.str();
334
11
}
335
336
2
std::string LocalColumnIndex::debug_string() const {
337
2
    std::ostringstream out;
338
2
    out << "LocalColumnIndex{index=" << index << ", project_all_children=" << project_all_children
339
2
        << ", children="
340
2
        << join_debug_strings(children,
341
2
                              [](const LocalColumnIndex& child) { return child.debug_string(); })
342
2
        << "}";
343
2
    return out.str();
344
2
}
345
346
12
std::string ColumnMapping::debug_string() const {
347
12
    std::ostringstream out;
348
12
    out << "ColumnMapping{global_index=" << global_index
349
12
        << ", table_column_name=" << table_column_name << ", file_local_id=";
350
12
    if (file_local_id.has_value()) {
351
7
        out << *file_local_id;
352
7
    } else {
353
5
        out << "null";
354
5
    }
355
12
    out << ", constant_index=";
356
12
    if (constant_index.has_value()) {
357
5
        out << *constant_index;
358
7
    } else {
359
7
        out << "null";
360
7
    }
361
12
    out << ", file_column_name=" << file_column_name
362
12
        << ", original_file_type=" << data_type_debug_string(original_file_type)
363
12
        << ", original_file_children="
364
12
        << join_debug_strings(original_file_children,
365
12
                              [](const ColumnDefinition& child) { return child.debug_string(); })
366
12
        << ", file_type=" << data_type_debug_string(file_type)
367
12
        << ", table_type=" << data_type_debug_string(table_type)
368
12
        << ", has_projection=" << (projection != nullptr) << ", child_mappings="
369
12
        << join_debug_strings(child_mappings,
370
12
                              [](const ColumnMapping& child) { return child.debug_string(); })
371
12
        << ", is_trivial=" << is_trivial << ", is_constant=" << constant_index.has_value()
372
12
        << ", filter_conversion=" << filter_conversion_type_to_string(filter_conversion)
373
12
        << ", virtual_column_type=" << virtual_column_type_to_string(virtual_column_type)
374
12
        << ", has_default_expr=" << (default_expr != nullptr) << "}";
375
12
    return out.str();
376
12
}
377
378
1
std::string TableColumnMapper::debug_string() const {
379
1
    std::ostringstream out;
380
1
    out << "TableColumnMapper{options=" << _options.debug_string() << ", mappings="
381
1
        << join_debug_strings(_mappings,
382
2
                              [](const ColumnMapping& mapping) { return mapping.debug_string(); })
383
1
        << ", hidden_mappings="
384
1
        << join_debug_strings(_hidden_mappings,
385
1
                              [](const ColumnMapping& mapping) { return mapping.debug_string(); })
386
1
        << ", constant_count=" << _constant_map.size() << "}";
387
1
    return out.str();
388
1
}
389
390
static const FileSlotRewriteInfo* find_slot_rewrite_info(
391
        const VExprSPtr& expr,
392
        const std::map<GlobalIndex, FileSlotRewriteInfo>& global_to_file_slot,
393
14.1k
        const VSlotRef** slot_ref) {
394
14.1k
    if (expr == nullptr) {
395
0
        return nullptr;
396
0
    }
397
14.1k
    VExprSPtr slot_expr = expr;
398
14.1k
    const bool input_is_cast = is_cast_expr(expr) && expr->get_num_children() == 1;
399
14.1k
    if (is_cast_expr(expr) && expr->get_num_children() == 1) {
400
264
        slot_expr = expr->children()[0];
401
264
    }
402
14.1k
    if (!slot_expr->is_slot_ref()) {
403
7.19k
        return nullptr;
404
7.19k
    }
405
6.94k
    const auto* candidate_slot_ref = assert_cast<const VSlotRef*>(slot_expr.get());
406
6.94k
    const auto rewrite_it = global_to_file_slot.find(slot_ref_global_index(*candidate_slot_ref));
407
6.94k
    if (rewrite_it == global_to_file_slot.end()) {
408
0
        return nullptr;
409
0
    }
410
6.94k
    if (input_is_cast && !expr->data_type()->equals(*rewrite_it->second.table_type)) {
411
232
        return nullptr;
412
232
    }
413
6.71k
    if (slot_ref != nullptr) {
414
6.71k
        *slot_ref = candidate_slot_ref;
415
6.71k
    }
416
6.71k
    return &rewrite_it->second;
417
6.94k
}
418
419
332k
static bool filter_conversion_has_local_source(FilterConversionType conversion) {
420
332k
    switch (conversion) {
421
324k
    case FilterConversionType::COPY_DIRECTLY:
422
327k
    case FilterConversionType::CAST_FILTER:
423
332k
    case FilterConversionType::READER_EXPRESSION:
424
332k
        return true;
425
96
    case FilterConversionType::FINALIZE_ONLY:
426
96
    case FilterConversionType::CONSTANT:
427
96
        return false;
428
332k
    }
429
0
    return false;
430
332k
}
431
432
8.21k
static bool column_predicate_can_use_local_source(FilterConversionType conversion) {
433
8.21k
    switch (conversion) {
434
7.83k
    case FilterConversionType::COPY_DIRECTLY:
435
7.83k
        return true;
436
370
    case FilterConversionType::CAST_FILTER:
437
370
    case FilterConversionType::READER_EXPRESSION:
438
370
    case FilterConversionType::FINALIZE_ONLY:
439
370
    case FilterConversionType::CONSTANT:
440
370
        return false;
441
8.21k
    }
442
0
    return false;
443
8.21k
}
444
445
static bool table_filter_has_only_local_entries(
446
45.2k
        const TableFilter& table_filter, const std::map<GlobalIndex, FilterEntry>& filter_entries) {
447
46.1k
    for (const auto global_index : table_filter.global_indices) {
448
46.1k
        const auto entry_it = filter_entries.find(global_index);
449
46.1k
        if (entry_it == filter_entries.end() || !entry_it->second.is_local()) {
450
12.4k
            return false;
451
12.4k
        }
452
46.1k
    }
453
32.7k
    return true;
454
45.2k
}
455
456
static VExprSPtr unwrap_literal_for_file_cast(const VExprSPtr& expr,
457
7.23k
                                              const DataTypePtr& table_type) {
458
7.23k
    if (expr == nullptr) {
459
0
        return nullptr;
460
0
    }
461
7.23k
    if (expr->is_literal()) {
462
7.23k
        return expr;
463
7.23k
    }
464
6
    if (is_cast_expr(expr) && expr->get_num_children() == 1 && expr->children()[0]->is_literal() &&
465
6
        expr->children()[0]->data_type()->equals(*table_type)) {
466
0
        return expr->children()[0];
467
0
    }
468
6
    return nullptr;
469
6
}
470
471
0
static Field literal_field_from_expr(const VExpr& literal_expr) {
472
0
    DORIS_CHECK(literal_expr.is_literal());
473
0
    const auto* literal = dynamic_cast<const VLiteral*>(&literal_expr);
474
0
    DORIS_CHECK(literal != nullptr);
475
0
    Field field;
476
0
    literal->get_column_ptr()->get(0, field);
477
0
    return field;
478
0
}
479
480
// Table filter localization clones an already-prepared table expr and then rewrites it to file
481
// slots. Only split-local literals and BE cast nodes need table-reader-specific clone behavior;
482
// plain slot refs and literals use their own VExpr::clone_node().
483
174k
static Status clone_table_expr_node(const VExpr& expr, VExprSPtr* cloned_expr) {
484
174k
    DORIS_CHECK(cloned_expr != nullptr);
485
174k
    if (const auto* split_literal = dynamic_cast<const SplitLocalFileLiteral*>(&expr)) {
486
0
        *cloned_expr = std::make_shared<SplitLocalFileLiteral>(
487
0
                split_literal->data_type(), literal_field_from_expr(expr),
488
0
                split_literal->original_type(), split_literal->original_field());
489
174k
    } else if (const auto* vcast_expr = dynamic_cast<const VCastExpr*>(&expr);
490
174k
               vcast_expr != nullptr && vcast_expr->node_type() == TExprNodeType::CAST_EXPR) {
491
1.15k
        *cloned_expr = Cast::create_shared(vcast_expr->data_type());
492
1.15k
    }
493
174k
    return Status::OK();
494
174k
}
495
496
63.8k
Status clone_table_expr_tree(const VExprSPtr& expr, VExprSPtr* cloned_expr) {
497
63.8k
    DORIS_CHECK(cloned_expr != nullptr);
498
63.8k
    if (expr == nullptr) {
499
0
        *cloned_expr = nullptr;
500
0
        return Status::OK();
501
0
    }
502
63.8k
    return expr->deep_clone(cloned_expr, clone_table_expr_node);
503
63.8k
}
504
505
static VExprSPtr original_table_literal(const VExprSPtr& literal_expr,
506
7.26k
                                        RewriteContext* rewrite_context = nullptr) {
507
7.26k
    DORIS_CHECK(literal_expr != nullptr);
508
7.26k
    DORIS_CHECK(literal_expr->is_literal());
509
7.26k
    const auto* rewritten_literal = dynamic_cast<const SplitLocalFileLiteral*>(literal_expr.get());
510
7.26k
    if (rewritten_literal == nullptr) {
511
7.26k
        return literal_expr;
512
7.26k
    }
513
0
    auto literal = VLiteral::create_shared(rewritten_literal->original_type(),
514
0
                                           rewritten_literal->original_field());
515
0
    if (rewrite_context != nullptr) {
516
0
        rewrite_context->add_created_expr(literal);
517
0
    }
518
0
    return literal;
519
7.26k
}
520
521
23.6k
static ColumnDefinition hidden_column_from_slot_ref(const VSlotRef& slot_ref) {
522
23.6k
    ColumnDefinition column;
523
23.6k
    column.name = slot_ref.column_name();
524
23.6k
    column.identifier = Field::create_field<TYPE_STRING>(column.name);
525
23.6k
    column.type = slot_ref.data_type();
526
23.6k
    return column;
527
23.6k
}
528
529
static void collect_top_level_slot_columns(const VExprSPtr& expr,
530
76.0k
                                           std::map<GlobalIndex, ColumnDefinition>* columns) {
531
76.0k
    DORIS_CHECK(columns != nullptr);
532
76.0k
    if (expr == nullptr) {
533
0
        return;
534
0
    }
535
76.0k
    if (expr->is_slot_ref()) {
536
23.6k
        const auto* slot_ref = assert_cast<const VSlotRef*>(expr.get());
537
23.6k
        columns->try_emplace(slot_ref_global_index(*slot_ref),
538
23.6k
                             hidden_column_from_slot_ref(*slot_ref));
539
23.6k
        return;
540
23.6k
    }
541
53.1k
    for (const auto& child : expr->children()) {
542
53.1k
        collect_top_level_slot_columns(child, columns);
543
53.1k
    }
544
52.4k
}
545
546
static VExprSPtr rewrite_literal_to_file_type(const VExprSPtr& literal_expr,
547
                                              const FileSlotRewriteInfo& rewrite_info,
548
7.23k
                                              RewriteContext* rewrite_context) {
549
7.23k
    DORIS_CHECK(literal_expr != nullptr);
550
7.23k
    DORIS_CHECK(literal_expr->is_literal());
551
7.23k
    const auto original_literal = original_table_literal(literal_expr, rewrite_context);
552
7.23k
    const Field original_field = literal_field(original_literal);
553
7.23k
    if (rewrite_info.file_type->equals(*original_literal->data_type())) {
554
2.12k
        return original_literal;
555
2.12k
    }
556
5.10k
    Field file_field;
557
5.10k
    try {
558
5.10k
        convert_field_to_type(original_field, *rewrite_info.file_type, &file_field,
559
5.10k
                              original_literal->data_type().get());
560
5.10k
    } catch (const Exception&) {
561
30
        return nullptr;
562
30
    }
563
5.07k
    if (file_field.is_null()) {
564
0
        return nullptr;
565
0
    }
566
5.07k
    if (file_field.get_type() != remove_nullable(rewrite_info.file_type)->get_primitive_type()) {
567
0
        return nullptr;
568
0
    }
569
5.07k
    auto literal = std::make_shared<SplitLocalFileLiteral>(
570
5.07k
            rewrite_info.file_type, file_field, original_literal->data_type(), original_field);
571
5.07k
    rewrite_context->add_created_expr(literal);
572
5.07k
    return literal;
573
5.07k
}
574
575
static bool rewrite_binary_slot_literal_predicate(
576
        const VExprSPtr& expr,
577
        const std::map<GlobalIndex, FileSlotRewriteInfo>& global_to_file_slot,
578
37.3k
        RewriteContext* rewrite_context) {
579
37.3k
    if (!is_binary_comparison_predicate(expr)) {
580
27.2k
        return false;
581
27.2k
    }
582
10.1k
    auto children = expr->children();
583
10.1k
    const VSlotRef* slot_ref = nullptr;
584
10.1k
    const FileSlotRewriteInfo* rewrite_info =
585
10.1k
            find_slot_rewrite_info(children[0], global_to_file_slot, &slot_ref);
586
10.1k
    int slot_child_idx = 0;
587
10.1k
    int literal_child_idx = 1;
588
10.1k
    if (rewrite_info == nullptr) {
589
3.71k
        rewrite_info = find_slot_rewrite_info(children[1], global_to_file_slot, &slot_ref);
590
3.71k
        slot_child_idx = 1;
591
3.71k
        literal_child_idx = 0;
592
3.71k
    }
593
10.1k
    if (rewrite_info == nullptr || slot_ref == nullptr) {
594
3.71k
        return false;
595
3.71k
    }
596
6.39k
    auto literal_expr =
597
6.39k
            unwrap_literal_for_file_cast(children[literal_child_idx], rewrite_info->table_type);
598
6.39k
    if (literal_expr == nullptr) {
599
6
        return false;
600
6
    }
601
602
6.38k
    auto rewritten_literal =
603
6.38k
            rewrite_literal_to_file_type(literal_expr, *rewrite_info, rewrite_context);
604
6.38k
    if (rewritten_literal == nullptr) {
605
29
        children[literal_child_idx] = original_table_literal(literal_expr, rewrite_context);
606
29
        expr->set_children(std::move(children));
607
29
        return false;
608
29
    }
609
610
6.35k
    children[slot_child_idx] = create_file_slot_ref(*slot_ref, *rewrite_info, rewrite_context);
611
6.35k
    children[literal_child_idx] = std::move(rewritten_literal);
612
6.35k
    expr->set_children(std::move(children));
613
6.35k
    return true;
614
6.38k
}
615
616
static bool rewrite_in_slot_literal_predicate(
617
        const VExprSPtr& expr,
618
        const std::map<GlobalIndex, FileSlotRewriteInfo>& global_to_file_slot,
619
31.0k
        RewriteContext* rewrite_context) {
620
31.0k
    if (expr->node_type() != TExprNodeType::IN_PRED || expr->get_num_children() < 2) {
621
30.6k
        return false;
622
30.6k
    }
623
328
    auto children = expr->children();
624
328
    const VSlotRef* slot_ref = nullptr;
625
328
    const FileSlotRewriteInfo* rewrite_info =
626
328
            find_slot_rewrite_info(children[0], global_to_file_slot, &slot_ref);
627
328
    if (rewrite_info == nullptr || slot_ref == nullptr) {
628
2
        return false;
629
2
    }
630
631
326
    VExprSPtrs rewritten_literals;
632
326
    rewritten_literals.reserve(children.size() - 1);
633
1.17k
    for (size_t child_idx = 1; child_idx < children.size(); ++child_idx) {
634
849
        auto literal_expr =
635
849
                unwrap_literal_for_file_cast(children[child_idx], rewrite_info->table_type);
636
849
        if (literal_expr == nullptr) {
637
0
            return false;
638
0
        }
639
849
        auto rewritten_literal =
640
849
                rewrite_literal_to_file_type(literal_expr, *rewrite_info, rewrite_context);
641
849
        if (rewritten_literal == nullptr) {
642
3
            for (size_t restore_idx = 1; restore_idx < children.size(); ++restore_idx) {
643
2
                auto restore_literal = unwrap_literal_for_file_cast(children[restore_idx],
644
2
                                                                    rewrite_info->table_type);
645
2
                if (restore_literal != nullptr) {
646
2
                    children[restore_idx] =
647
2
                            original_table_literal(restore_literal, rewrite_context);
648
2
                }
649
2
            }
650
1
            expr->set_children(std::move(children));
651
1
            return false;
652
1
        }
653
848
        rewritten_literals.push_back(std::move(rewritten_literal));
654
848
    }
655
656
325
    children[0] = create_file_slot_ref(*slot_ref, *rewrite_info, rewrite_context);
657
1.17k
    for (size_t literal_idx = 0; literal_idx < rewritten_literals.size(); ++literal_idx) {
658
848
        children[literal_idx + 1] = std::move(rewritten_literals[literal_idx]);
659
848
    }
660
325
    expr->set_children(std::move(children));
661
325
    return true;
662
326
}
663
664
static VExprSPtr create_file_struct_child_name_literal(const std::string& file_child_name,
665
611
                                                       RewriteContext* rewrite_context) {
666
611
    auto literal = VLiteral::create_shared(std::make_shared<DataTypeString>(),
667
611
                                           Field::create_field<TYPE_STRING>(file_child_name));
668
611
    rewrite_context->add_created_expr(literal);
669
611
    return literal;
670
611
}
671
672
static bool needs_complex_file_slot_cast(const DataTypePtr& file_type,
673
733
                                         const DataTypePtr& table_type) {
674
733
    if (file_type == nullptr || table_type == nullptr || file_type->equals(*table_type)) {
675
0
        return false;
676
0
    }
677
733
    const auto file_nested_type = remove_nullable(file_type);
678
733
    const auto table_nested_type = remove_nullable(table_type);
679
733
    if (file_nested_type->equals(*table_nested_type)) {
680
0
        return false;
681
0
    }
682
733
    return is_complex_type(file_nested_type->get_primitive_type()) ||
683
733
           is_complex_type(table_nested_type->get_primitive_type());
684
733
}
685
686
611
static bool collect_struct_element_chain(const VExprSPtr& expr, std::vector<VExprSPtr>* chain) {
687
611
    DORIS_CHECK(chain != nullptr);
688
611
    if (!is_struct_element_expr(expr)) {
689
0
        return false;
690
0
    }
691
611
    const auto& parent = expr->children()[0];
692
611
    if (is_struct_element_expr(parent)) {
693
81
        if (!collect_struct_element_chain(parent, chain)) {
694
0
            return false;
695
0
        }
696
530
    } else if (!parent->is_slot_ref()) {
697
        // Only support file-local rewrite for struct child chains rooted directly at a top-level
698
        // slot, for example `element_at(s, 'a')` or `element_at(element_at(s, 'a'), 'b')`.
699
        //
700
        // Do not localize computed complex parents such as
701
        // `element_at(element_at(map_values(m), 1), 'full_name')`. The intermediate map/array
702
        // result has already been reshaped by scan projection and may have a different child order
703
        // from the table expression. Partially rewriting that expression against the file block can
704
        // silently evaluate the wrong struct child and filter out valid rows. Those predicates must
705
        // remain as table-level conjuncts and be evaluated after TableReader materialization.
706
0
        return false;
707
0
    }
708
611
    chain->push_back(expr);
709
611
    return true;
710
611
}
711
712
static bool rewrite_struct_element_path_to_file_expr(
713
        const VExprSPtr& expr, const std::vector<ColumnMapping>& mappings,
714
        const std::map<GlobalIndex, FileSlotRewriteInfo>& global_to_file_slot,
715
1.51k
        RewriteContext* rewrite_context) {
716
1.51k
    ResolvedNestedStructPath resolved;
717
1.51k
    if (!resolve_nested_struct_expr_for_file(expr, mappings, &resolved)) {
718
987
        return false;
719
987
    }
720
721
530
    std::vector<VExprSPtr> struct_element_chain;
722
530
    if (!collect_struct_element_chain(expr, &struct_element_chain) ||
723
530
        struct_element_chain.size() != resolved.file_child_names.size() ||
724
530
        struct_element_chain.size() != resolved.file_child_types.size()) {
725
0
        return false;
726
0
    }
727
728
530
    auto root_children = struct_element_chain.front()->children();
729
530
    if (!root_children[0]->is_slot_ref()) {
730
0
        return false;
731
0
    }
732
530
    const auto* slot_ref = assert_cast<const VSlotRef*>(root_children[0].get());
733
530
    const auto rewrite_it = global_to_file_slot.find(slot_ref_global_index(*slot_ref));
734
530
    if (rewrite_it == global_to_file_slot.end()) {
735
0
        return false;
736
0
    }
737
738
    // File-local conjuncts are prepared against the file-reader Block, so both the root slot and
739
    // every struct selector must be expressed in file schema terms. For a renamed Iceberg field,
740
    // keeping the table selector would prepare `element_at(file_struct<rename>, 'renamed')` and
741
    // fail before any rows are read. Rewrite the whole chain while ColumnMapping still preserves
742
    // the table-to-file relationship. Example:
743
    //   table filter: element_at(element_at(s, 'renamed_parent'), 'renamed_leaf')
744
    //   old file:     s<parent<leaf>>
745
    //   file filter:  element_at(element_at(s, 'parent'), 'leaf')
746
530
    root_children[0] = create_file_slot_ref(*slot_ref, rewrite_it->second, rewrite_context);
747
530
    struct_element_chain.front()->set_children(std::move(root_children));
748
1.14k
    for (size_t idx = 0; idx < struct_element_chain.size(); ++idx) {
749
611
        auto children = struct_element_chain[idx]->children();
750
611
        children[1] = create_file_struct_child_name_literal(resolved.file_child_names[idx],
751
611
                                                            rewrite_context);
752
611
        struct_element_chain[idx]->set_children(std::move(children));
753
        // The selector name and the expression return type must be moved to file schema together.
754
        // Example:
755
        //   table filter: element_at(element_at(s, 'new_a'), 'new_aa') = 50
756
        //   old file:     s.new_a STRUCT<aa, bb>
757
        //   file filter:  element_at(element_at(s, 'new_a'), 'aa') = 50
758
        //
759
        // If the inner element_at keeps the table return type STRUCT<new_aa, bb>, preparing the
760
        // outer element_at(..., 'aa') fails before scanning because `aa` is not a table field.
761
611
        struct_element_chain[idx]->data_type() = resolved.file_child_types[idx];
762
611
    }
763
530
    return true;
764
530
}
765
766
static VExprSPtr rewrite_table_expr_to_file_expr(
767
        const VExprSPtr& expr,
768
        const std::map<GlobalIndex, FileSlotRewriteInfo>& global_to_file_slot,
769
        const std::vector<ColumnMapping>& filter_mappings, RewriteContext* rewrite_context,
770
40.5k
        bool* can_localize) {
771
40.5k
    if (expr == nullptr) {
772
0
        return nullptr;
773
0
    }
774
40.5k
    DORIS_CHECK(rewrite_context != nullptr);
775
40.5k
    DORIS_CHECK(can_localize != nullptr);
776
40.5k
    if (auto* runtime_filter = dynamic_cast<RuntimeFilterExpr*>(expr.get());
777
40.5k
        runtime_filter != nullptr) {
778
3.18k
        auto impl = runtime_filter->get_impl();
779
3.18k
        if (impl == nullptr) {
780
0
            *can_localize = false;
781
0
            return expr;
782
0
        }
783
3.18k
        auto localized_impl = rewrite_table_expr_to_file_expr(
784
3.18k
                impl, global_to_file_slot, filter_mappings, rewrite_context, can_localize);
785
3.18k
        if (!*can_localize) {
786
0
            return expr;
787
0
        }
788
3.18k
        runtime_filter->set_impl(std::move(localized_impl));
789
3.18k
        return expr;
790
3.18k
    }
791
37.3k
    if (rewrite_binary_slot_literal_predicate(expr, global_to_file_slot, rewrite_context)) {
792
6.35k
        return expr;
793
6.35k
    }
794
30.9k
    if (rewrite_in_slot_literal_predicate(expr, global_to_file_slot, rewrite_context)) {
795
323
        return expr;
796
323
    }
797
30.6k
    if (is_struct_element_expr(expr)) {
798
1.51k
        if (!rewrite_struct_element_path_to_file_expr(expr, filter_mappings, global_to_file_slot,
799
1.51k
                                                      rewrite_context)) {
800
            // The scanner still evaluates the original table-level conjunct after TableReader
801
            // finalizes the output block. Skipping an unlocalizable file conjunct is therefore
802
            // safer than preparing a partially rewritten expression against the wrong struct
803
            // layout. In particular, do not generate file-local conjuncts for computed complex
804
            // parents such as `element_at(element_at(map_values(m), 1), 'field')`; only direct
805
            // slot-rooted struct chains are supported here.
806
987
            *can_localize = false;
807
987
        }
808
1.51k
        return expr;
809
1.51k
    }
810
29.1k
    if (expr->is_slot_ref()) {
811
8.72k
        const auto* slot_ref = assert_cast<const VSlotRef*>(expr.get());
812
8.72k
        const auto rewrite_it = global_to_file_slot.find(slot_ref_global_index(*slot_ref));
813
8.73k
        if (rewrite_it != global_to_file_slot.end()) {
814
8.73k
            const auto& rewrite_info = rewrite_it->second;
815
8.73k
            auto file_slot = create_file_slot_ref(*slot_ref, rewrite_info, rewrite_context);
816
8.73k
            if (rewrite_info.file_type->equals(*rewrite_info.table_type)) {
817
7.99k
                return file_slot;
818
7.99k
            }
819
740
            if (needs_complex_file_slot_cast(rewrite_info.file_type, rewrite_info.table_type)) {
820
                // Generic file-local expressions cannot safely cast an evolved complex file slot
821
                // back to the table type. Example:
822
                //
823
                //   table filter: ARRAY_CONTAINS(MAP_KEYS(m), 'person5')
824
                //   old file:     m MAP<STRING, STRUCT<name, age>>
825
                //   table:        m MAP<STRING, STRUCT<age, full_name, gender>>
826
                //
827
                // Although MAP_KEYS only reads the key column, wrapping the file slot as
828
                // `CAST(file_m AS table_m)` forces the value struct cast first and fails because
829
                // the old and new value structs have different fields. Keep such filters at the
830
                // table level, where TableReader materializes the evolved complex value before
831
                // Scanner evaluates the original conjunct. Direct slot-rooted struct child paths
832
                // are handled by rewrite_struct_element_path_to_file_expr() above.
833
295
                *can_localize = false;
834
295
                return expr;
835
295
            }
836
445
            auto cast_expr = Cast::create_shared(rewrite_info.table_type);
837
445
            cast_expr->add_child(std::move(file_slot));
838
445
            rewrite_context->add_created_expr(cast_expr);
839
445
            return cast_expr;
840
740
        }
841
18.4E
        return expr;
842
8.72k
    }
843
    // The input is a split-local cloned tree. A previous split-local clone may already have
844
    // inserted Cast(slot). Keep that rewrite idempotent: rewrite the cast child from table slot to
845
    // the current split's file slot, and drop the cast when the current split no longer needs it.
846
20.4k
    if (is_cast_expr(expr) && expr->get_num_children() == 1) {
847
951
        const auto& child = expr->children()[0];
848
951
        if (child->is_slot_ref()) {
849
919
            const auto* slot_ref = assert_cast<const VSlotRef*>(child.get());
850
919
            const auto rewrite_it = global_to_file_slot.find(slot_ref_global_index(*slot_ref));
851
919
            if (rewrite_it != global_to_file_slot.end() &&
852
919
                expr->data_type()->equals(*rewrite_it->second.table_type)) {
853
1
                auto rewritten_child =
854
1
                        create_file_slot_ref(*slot_ref, rewrite_it->second, rewrite_context);
855
1
                if (rewrite_it->second.file_type->equals(*rewrite_it->second.table_type)) {
856
0
                    return rewritten_child;
857
0
                }
858
1
                if (needs_complex_file_slot_cast(rewrite_it->second.file_type,
859
1
                                                 rewrite_it->second.table_type)) {
860
0
                    *can_localize = false;
861
0
                    return expr;
862
0
                }
863
1
                expr->set_children({std::move(rewritten_child)});
864
1
                return expr;
865
1
            }
866
919
        }
867
951
    }
868
869
20.4k
    VExprSPtrs rewritten_children;
870
20.4k
    rewritten_children.reserve(expr->children().size());
871
20.8k
    for (const auto& child : expr->children()) {
872
20.8k
        rewritten_children.push_back(rewrite_table_expr_to_file_expr(
873
20.8k
                child, global_to_file_slot, filter_mappings, rewrite_context, can_localize));
874
20.8k
    }
875
20.4k
    expr->set_children(std::move(rewritten_children));
876
20.4k
    return expr;
877
20.4k
}
878
879
static constexpr const char* ROW_LINEAGE_ROW_ID = "_row_id";
880
static constexpr const char* ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER = "_last_updated_sequence_number";
881
static constexpr int32_t ROW_LINEAGE_ROW_ID_FIELD_ID = 2147483540;
882
static constexpr int32_t ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER_FIELD_ID = 2147483539;
883
884
255k
static TableVirtualColumnType row_lineage_virtual_column_type(const std::string& column_name) {
885
255k
    if (column_name == ROW_LINEAGE_ROW_ID) {
886
2
        return TableVirtualColumnType::ROW_ID;
887
2
    }
888
255k
    if (column_name == ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER) {
889
2
        return TableVirtualColumnType::LAST_UPDATED_SEQUENCE_NUMBER;
890
2
    }
891
255k
    return TableVirtualColumnType::INVALID;
892
255k
}
893
894
static TableVirtualColumnType row_lineage_virtual_column_type_by_field_id(
895
61.3k
        const ColumnDefinition& column) {
896
61.3k
    if (!column.has_identifier_field_id()) {
897
251
        return TableVirtualColumnType::INVALID;
898
251
    }
899
61.1k
    switch (column.get_identifier_field_id()) {
900
262
    case ROW_LINEAGE_ROW_ID_FIELD_ID:
901
262
        return TableVirtualColumnType::ROW_ID;
902
203
    case ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER_FIELD_ID:
903
203
        return TableVirtualColumnType::LAST_UPDATED_SEQUENCE_NUMBER;
904
60.5k
    default:
905
60.5k
        return TableVirtualColumnType::INVALID;
906
61.1k
    }
907
61.1k
}
908
909
static TableVirtualColumnType row_lineage_virtual_column_type(const ColumnDefinition& column,
910
317k
                                                              TableColumnMappingMode mode) {
911
317k
    switch (mode) {
912
61.3k
    case TableColumnMappingMode::BY_FIELD_ID:
913
61.3k
        return row_lineage_virtual_column_type_by_field_id(column);
914
254k
    case TableColumnMappingMode::BY_NAME:
915
255k
    case TableColumnMappingMode::BY_INDEX:
916
255k
        return row_lineage_virtual_column_type(column.name);
917
317k
    }
918
0
    return TableVirtualColumnType::INVALID;
919
317k
}
920
921
// Returns true when the current file type is not the exact nested type the scan should expose.
922
// This is about building the projected file-side type/projection, not about whether TableReader
923
// later needs to rematerialize the complex value back to table layout.
924
441k
static bool needs_projected_file_type_rebuild(const ColumnMapping& mapping) {
925
441k
    if (!is_complex_type(mapping.file_type->get_primitive_type())) {
926
251k
        return false;
927
251k
    }
928
189k
    if (mapping.child_mappings.empty()) {
929
0
        return false;
930
0
    }
931
189k
    DORIS_CHECK(mapping.file_type != nullptr);
932
189k
    DORIS_CHECK(mapping.table_type != nullptr);
933
189k
    if (remove_nullable(mapping.file_type)->get_primitive_type() !=
934
189k
        remove_nullable(mapping.table_type)->get_primitive_type()) {
935
0
        return true;
936
0
    }
937
189k
    if (!mapping.table_type->equals(*mapping.file_type)) {
938
6.76k
        return true;
939
6.76k
    }
940
275k
    for (const auto& child_mapping : mapping.child_mappings) {
941
        // Rename-only child mappings do not change the file-side projected shape. If field-id
942
        // matching maps table child `renamed_b` to file child `b`, the file reader can still expose
943
        // the original file type as long as child count/order/types are unchanged.
944
275k
        if (!child_mapping.file_local_id.has_value() ||
945
275k
            needs_projected_file_type_rebuild(child_mapping)) {
946
331
            return true;
947
331
        }
948
275k
    }
949
182k
    return false;
950
182k
}
951
952
static std::optional<size_t> file_child_ordinal_in_scan_type(const ColumnMapping& mapping,
953
248k
                                                             const ColumnMapping& child_mapping) {
954
248k
    if (!child_mapping.file_local_id.has_value()) {
955
121
        return std::nullopt;
956
121
    }
957
248k
    const auto& file_children = !mapping.projected_file_children.empty()
958
248k
                                        ? mapping.projected_file_children
959
18.4E
                                        : mapping.original_file_children;
960
343k
    const auto child_it = std::ranges::find_if(file_children, [&](const ColumnDefinition& child) {
961
343k
        return child.file_local_id() == *child_mapping.file_local_id;
962
343k
    });
963
248k
    if (child_it == file_children.end()) {
964
0
        return std::nullopt;
965
0
    }
966
248k
    return static_cast<size_t>(std::distance(file_children.begin(), child_it));
967
248k
}
968
969
941k
static bool needs_complex_rematerialize(const ColumnMapping& mapping) {
970
941k
    if (mapping.child_mappings.empty()) {
971
773k
        return false;
972
773k
    }
973
167k
    if (mapping.table_type == nullptr || mapping.file_type == nullptr ||
974
167k
        !mapping.table_type->equals(*mapping.file_type)) {
975
3.54k
        return true;
976
3.54k
    }
977
411k
    for (size_t table_child_idx = 0; table_child_idx < mapping.child_mappings.size();
978
248k
         ++table_child_idx) {
979
248k
        const auto& child_mapping = mapping.child_mappings[table_child_idx];
980
248k
        const auto file_child_idx = file_child_ordinal_in_scan_type(mapping, child_mapping);
981
248k
        if (!file_child_idx.has_value() || *file_child_idx != table_child_idx ||
982
248k
            needs_complex_rematerialize(child_mapping) ||
983
248k
            (child_mapping.table_type != nullptr && child_mapping.file_type != nullptr &&
984
247k
             !child_mapping.table_type->equals(*child_mapping.file_type))) {
985
876
            return true;
986
876
        }
987
248k
    }
988
163k
    return false;
989
164k
}
990
991
706k
static bool mapping_can_use_file_column_directly(const ColumnMapping& mapping) {
992
706k
    if (mapping.table_type == nullptr || mapping.file_type == nullptr) {
993
0
        return false;
994
0
    }
995
706k
    const auto table_type = remove_nullable(mapping.table_type);
996
706k
    const auto file_type = remove_nullable(mapping.file_type);
997
706k
    const bool same_timestamptz_with_different_scale =
998
706k
            table_type->get_primitive_type() == TYPE_TIMESTAMPTZ &&
999
706k
            file_type->get_primitive_type() == TYPE_TIMESTAMPTZ;
1000
706k
    if (!mapping.table_type->equals(*mapping.file_type) && !same_timestamptz_with_different_scale) {
1001
18.1k
        return false;
1002
18.1k
    }
1003
688k
    return !needs_complex_rematerialize(mapping);
1004
706k
}
1005
1006
static const ColumnDefinition* find_file_child_for_mapping(const ColumnDefinition& table_child,
1007
                                                           const ColumnDefinition& file_parent,
1008
                                                           TableColumnMappingMode mode,
1009
                                                           size_t table_child_idx,
1010
254k
                                                           bool allow_ordinal_fallback) {
1011
254k
    const auto file_parent_type = remove_nullable(file_parent.type)->get_primitive_type();
1012
254k
    switch (file_parent_type) {
1013
74.1k
    case TYPE_ARRAY:
1014
74.1k
        DORIS_CHECK(file_parent.children.size() == 1);
1015
74.1k
        return &file_parent.children[0];
1016
117k
    case TYPE_MAP:
1017
117k
        DORIS_CHECK(file_parent.children.size() == 2);
1018
117k
        if (table_child.name == "key") {
1019
58.9k
            return &file_parent.children[0];
1020
58.9k
        }
1021
58.9k
        if (table_child.name == "value") {
1022
58.9k
            return &file_parent.children[1];
1023
58.9k
        }
1024
18.4E
        if (table_child.local_id == 0 || table_child.local_id == 1) {
1025
0
            return &file_parent.children[table_child.local_id];
1026
0
        }
1027
18.4E
        return nullptr;
1028
62.8k
    default:
1029
        // Hive BY_INDEX is a top-level column matching rule. Once a complex root is selected by
1030
        // file position, nested struct children follow Hive reader's historical name matching
1031
        // semantics; their integer identifiers can be field ids, not file positions.
1032
62.8k
        const auto nested_mode =
1033
62.8k
                mode == TableColumnMappingMode::BY_INDEX ? TableColumnMappingMode::BY_NAME : mode;
1034
62.8k
        if (const auto* file_child =
1035
62.8k
                    matcher_for_mode(nested_mode).find(table_child, file_parent.children);
1036
62.8k
            file_child != nullptr) {
1037
60.3k
            return file_child;
1038
60.3k
        }
1039
2.50k
        if (allow_ordinal_fallback && mode == TableColumnMappingMode::BY_FIELD_ID &&
1040
2.50k
            !table_child.has_identifier_field_id()) {
1041
            // Synthetic children are derived from the table DataType when nested ColumnDefinition
1042
            // metadata has been pruned away. They do not carry Iceberg field ids, so try a name
1043
            // match before falling back to ordinal order. Example:
1044
            //   table value type: Struct(age, full_name, gender)
1045
            //   old file value:   Struct(name, age)
1046
            // Name matching keeps `age -> age`; the later unused-child fallback can then map the
1047
            // renamed `full_name -> name` instead of consuming `age` twice.
1048
3
            if (const auto* file_child = NameMatcher().find(table_child, file_parent.children);
1049
3
                file_child != nullptr) {
1050
1
                return file_child;
1051
1
            }
1052
3
        }
1053
        // Some callers only carry the full complex DataType for a projected table column, without
1054
        // expanded nested ColumnDefinitions. In that case we can still preserve full materialization
1055
        // by walking table/file struct fields by ordinal. This is a fallback only: explicit
1056
        // ColumnDefinition children keep using the requested table-format matching rule, which is
1057
        // required for precise schema evolution.
1058
2.50k
        if (allow_ordinal_fallback && table_child_idx < file_parent.children.size()) {
1059
3
            return &file_parent.children[table_child_idx];
1060
3
        }
1061
2.50k
        return nullptr;
1062
254k
    }
1063
254k
}
1064
1065
static ColumnDefinition synthetic_child_definition(const std::string& name, DataTypePtr type,
1066
13
                                                   int32_t local_id) {
1067
13
    ColumnDefinition child;
1068
13
    child.identifier = Field::create_field<TYPE_STRING>(name);
1069
13
    child.local_id = local_id;
1070
13
    child.name = name;
1071
13
    child.type = std::move(type);
1072
13
    return child;
1073
13
}
1074
1075
static std::vector<ColumnDefinition> synthesize_complex_children_from_type(
1076
4
        const DataTypePtr& type) {
1077
4
    std::vector<ColumnDefinition> children;
1078
4
    if (type == nullptr) {
1079
0
        return children;
1080
0
    }
1081
4
    const auto nested_type = remove_nullable(type);
1082
4
    switch (nested_type->get_primitive_type()) {
1083
0
    case TYPE_ARRAY: {
1084
0
        const auto* array_type = assert_cast<const DataTypeArray*>(nested_type.get());
1085
0
        children.push_back(synthetic_child_definition("element", array_type->get_nested_type(), 0));
1086
0
        break;
1087
0
    }
1088
1
    case TYPE_MAP: {
1089
1
        const auto* map_type = assert_cast<const DataTypeMap*>(nested_type.get());
1090
1
        children.push_back(synthetic_child_definition("key", map_type->get_key_type(), 0));
1091
1
        children.push_back(synthetic_child_definition("value", map_type->get_value_type(), 1));
1092
1
        break;
1093
0
    }
1094
3
    case TYPE_STRUCT: {
1095
3
        const auto* struct_type = assert_cast<const DataTypeStruct*>(nested_type.get());
1096
3
        children.reserve(struct_type->get_elements().size());
1097
12
        for (size_t idx = 0; idx < struct_type->get_elements().size(); ++idx) {
1098
9
            children.push_back(synthetic_child_definition(struct_type->get_element_name(idx),
1099
9
                                                          struct_type->get_element(idx),
1100
9
                                                          cast_set<int32_t>(idx)));
1101
9
        }
1102
3
        break;
1103
0
    }
1104
0
    default:
1105
0
        break;
1106
4
    }
1107
4
    return children;
1108
4
}
1109
1110
static bool has_table_child_named(const std::vector<ColumnDefinition>& children,
1111
2.76k
                                  std::string_view name) {
1112
4.13k
    return std::ranges::any_of(children, [&](const ColumnDefinition& child) {
1113
4.13k
        return std::string_view(child.name) == name;
1114
4.13k
    });
1115
2.76k
}
1116
1117
static void complete_required_complex_children_from_type(const DataTypePtr& type,
1118
6.64k
                                                         std::vector<ColumnDefinition>* children) {
1119
6.64k
    DORIS_CHECK(children != nullptr);
1120
6.64k
    if (type == nullptr) {
1121
0
        return;
1122
0
    }
1123
6.64k
    const auto nested_type = remove_nullable(type);
1124
6.64k
    switch (nested_type->get_primitive_type()) {
1125
1.38k
    case TYPE_MAP: {
1126
1.38k
        const auto* map_type = assert_cast<const DataTypeMap*>(nested_type.get());
1127
        // MAP key/value are structural children, not independently materializable table fields.
1128
        // A key-only projection can still be attached to a whole-map output root, for example:
1129
        //   SELECT * FROM t WHERE ARRAY_CONTAINS(MAP_KEYS(new_map_column), 'person5')
1130
        //
1131
        // In that shape the scanner keeps the value stream readable, but the table projection can
1132
        // carry only the key child. Add the missing value child so recursive mapping can evolve the
1133
        // value type instead of letting TableReader cast old/new value structs directly.
1134
1.38k
        if (has_table_child_named(*children, "key") && !has_table_child_named(*children, "value")) {
1135
2
            children->push_back(synthetic_child_definition("value", map_type->get_value_type(), 1));
1136
2
        }
1137
1.38k
        break;
1138
0
    }
1139
1.60k
    case TYPE_ARRAY:
1140
        // ARRAY has only one required structural child (`element`), so a non-empty projection is
1141
        // already rooted at the element path.
1142
1.60k
        break;
1143
3.65k
    case TYPE_STRUCT:
1144
        // STRUCT children are real fields and must remain prunable. Completing missing struct
1145
        // fields here would turn `SELECT s.a` into a full-struct read and undo nested projection.
1146
3.65k
        break;
1147
0
    default:
1148
0
        break;
1149
6.64k
    }
1150
6.64k
}
1151
1152
166k
static Status validate_file_schema_children(const ColumnDefinition& file_field) {
1153
166k
    if (file_field.type == nullptr) {
1154
0
        return Status::InternalError("File column '{}' has null type", file_field.name);
1155
0
    }
1156
166k
    const auto nested_type = remove_nullable(file_field.type);
1157
166k
    size_t expected_children = 0;
1158
166k
    bool complex_with_fixed_children = true;
1159
166k
    switch (nested_type->get_primitive_type()) {
1160
74.1k
    case TYPE_ARRAY:
1161
74.1k
        expected_children = 1;
1162
74.1k
        break;
1163
58.9k
    case TYPE_MAP:
1164
58.9k
        expected_children = 2;
1165
58.9k
        break;
1166
33.1k
    case TYPE_STRUCT:
1167
33.1k
        expected_children =
1168
33.1k
                assert_cast<const DataTypeStruct*>(nested_type.get())->get_elements().size();
1169
33.1k
        break;
1170
0
    default:
1171
0
        complex_with_fixed_children = false;
1172
0
        break;
1173
166k
    }
1174
166k
    if (!complex_with_fixed_children || file_field.children.size() == expected_children) {
1175
166k
        return Status::OK();
1176
166k
    }
1177
3
    return Status::InternalError(
1178
3
            "Malformed complex file schema for column '{}': type={}, expected_children={}, "
1179
3
            "actual_children={}",
1180
3
            file_field.name, file_field.type->get_name(), expected_children,
1181
3
            file_field.children.size());
1182
166k
}
1183
1184
175k
static bool has_projected_file_children(const ColumnMapping& mapping) {
1185
175k
    if (mapping.original_file_children.empty() || mapping.projected_file_children.empty()) {
1186
135k
        return false;
1187
135k
    }
1188
40.3k
    if (mapping.original_file_children.size() != mapping.projected_file_children.size()) {
1189
2.53k
        return true;
1190
2.53k
    }
1191
99.6k
    for (size_t idx = 0; idx < mapping.original_file_children.size(); ++idx) {
1192
61.9k
        if (mapping.original_file_children[idx].file_local_id() !=
1193
61.9k
            mapping.projected_file_children[idx].file_local_id()) {
1194
0
            return true;
1195
0
        }
1196
61.9k
    }
1197
37.7k
    return false;
1198
37.7k
}
1199
1200
175k
static bool needs_nested_file_projection(const ColumnMapping& mapping) {
1201
175k
    if (has_projected_file_children(mapping)) {
1202
        // Return True if the projected child column is missing / re-ordered
1203
2.53k
        return true;
1204
2.53k
    }
1205
173k
    return std::ranges::any_of(mapping.child_mappings, [](const ColumnMapping& child_mapping) {
1206
62.8k
        return needs_nested_file_projection(child_mapping);
1207
62.8k
    });
1208
175k
}
1209
1210
static Status build_complex_projection(const ColumnMapping& mapping, LocalColumnIndex* projection);
1211
1212
// Build the projected file children/type according to the pruned complex projection. For example,
1213
// if we have a struct column `s` with children `id` and `name`, and the projection only keeps
1214
// `s.name`, then the file reader should expose `STRUCT<name ...>`.
1215
static Status rebuild_projected_file_children_and_type(
1216
        const DataTypePtr& file_type, const std::vector<ColumnDefinition>& original_file_children,
1217
        const std::vector<ColumnMapping>& child_mappings,
1218
6.94k
        std::vector<ColumnDefinition>* projected_file_children, DataTypePtr* projected_type) {
1219
6.94k
    DORIS_CHECK(file_type != nullptr);
1220
6.94k
    DORIS_CHECK(projected_file_children != nullptr);
1221
6.94k
    DORIS_CHECK(projected_type != nullptr);
1222
6.94k
    ColumnDefinition field;
1223
6.94k
    field.type = file_type;
1224
6.94k
    field.children = original_file_children;
1225
6.94k
    LocalColumnIndex projection = LocalColumnIndex::partial_local(-1);
1226
6.94k
    projection.children.reserve(child_mappings.size());
1227
9.89k
    for (const auto* child_mapping : present_child_mappings_in_file_order(child_mappings)) {
1228
9.89k
        DORIS_CHECK(child_mapping->file_local_id.has_value());
1229
9.89k
        LocalColumnIndex child_projection;
1230
9.89k
        RETURN_IF_ERROR(build_complex_projection(*child_mapping, &child_projection));
1231
9.89k
        projection.children.push_back(std::move(child_projection));
1232
9.89k
    }
1233
1234
6.94k
    ColumnDefinition projected_field;
1235
6.94k
    RETURN_IF_ERROR(project_column_definition(field, projection, &projected_field));
1236
6.94k
    *projected_file_children = std::move(projected_field.children);
1237
6.94k
    *projected_type = std::move(projected_field.type);
1238
6.94k
    return Status::OK();
1239
6.94k
}
1240
1241
// Build the complex column projection according to the ColumnMapping which is re-ordered by the
1242
// file-schema's order.
1243
//
1244
// For MAP, a partial projection represents value-subtree pruning only. The key child is not a
1245
// projected output shape; file readers still read full keys to construct ColumnMap offsets and keep
1246
// key semantics unchanged. If a caller tries to project only/prune the key child, the common schema
1247
// projection helper rejects it.
1248
20.5k
static Status build_complex_projection(const ColumnMapping& mapping, LocalColumnIndex* projection) {
1249
20.5k
    if (projection == nullptr) {
1250
0
        return Status::InvalidArgument("projection is null");
1251
0
    }
1252
20.5k
    DORIS_CHECK(mapping.file_local_id.has_value());
1253
20.5k
    *projection = LocalColumnIndex::local(*mapping.file_local_id);
1254
20.5k
    projection->project_all_children = mapping.child_mappings.empty();
1255
20.5k
    projection->children.clear();
1256
20.5k
    const auto present_children = present_child_mappings_in_file_order(mapping.child_mappings);
1257
20.5k
    if (!projection->project_all_children && present_children.empty()) {
1258
        // All requested table children under this complex node are missing/default-only. The file
1259
        // reader cannot expose an empty complex projection, but TableReader can still rematerialize
1260
        // the table shape from a full file subtree and fill the missing children with defaults.
1261
120
        projection->project_all_children = true;
1262
120
        return Status::OK();
1263
120
    }
1264
20.4k
    for (const auto* child_mapping : present_children) {
1265
8.13k
        LocalColumnIndex child_projection;
1266
8.13k
        RETURN_IF_ERROR(build_complex_projection(*child_mapping, &child_projection));
1267
8.13k
        projection->children.push_back(std::move(child_projection));
1268
8.13k
    }
1269
20.4k
    if (!projection->project_all_children && projection->children.empty()) {
1270
0
        return Status::NotSupported("Projection for complex column {} contains no file children",
1271
0
                                    mapping.file_column_name);
1272
0
    }
1273
20.4k
    return Status::OK();
1274
20.4k
}
1275
1276
using FilterProjectionMap = std::map<LocalColumnId, LocalColumnIndex>;
1277
1278
// Update the mapping's file type according to the projection, and determine whether the projection
1279
// is trivial (i.e. the projected file type is the same as the table type, so no need to
1280
// rematerialize the complex value back to table layout after reading from file).
1281
static Status apply_projection_to_mapping_file_type(const LocalColumnIndex& projection,
1282
302k
                                                    ColumnMapping* mapping) {
1283
302k
    DORIS_CHECK(mapping != nullptr);
1284
302k
    if (mapping->original_file_type == nullptr) {
1285
0
        mapping->original_file_type = mapping->file_type;
1286
0
    }
1287
302k
    if (mapping->original_file_type == nullptr ||
1288
302k
        !is_complex_type(remove_nullable(mapping->original_file_type)->get_primitive_type())) {
1289
156k
        return Status::OK();
1290
156k
    }
1291
145k
    ColumnDefinition field;
1292
145k
    field.type = mapping->original_file_type;
1293
145k
    field.children = mapping->original_file_children;
1294
145k
    ColumnDefinition projected_field;
1295
145k
    RETURN_IF_ERROR(project_column_definition(field, projection, &projected_field));
1296
145k
    mapping->file_type = std::move(projected_field.type);
1297
145k
    mapping->projected_file_children = std::move(projected_field.children);
1298
145k
    mapping->is_trivial = mapping_can_use_file_column_directly(*mapping);
1299
145k
    return Status::OK();
1300
145k
}
1301
1302
static Status merge_filter_projection(const FilterProjectionMap* filter_projections,
1303
16.7k
                                      LocalColumnIndex* projection) {
1304
16.7k
    DORIS_CHECK(projection != nullptr);
1305
16.7k
    if (filter_projections == nullptr) {
1306
0
        return Status::OK();
1307
0
    }
1308
16.7k
    const auto filter_projection_it = filter_projections->find(projection->column_id());
1309
16.7k
    if (filter_projection_it == filter_projections->end()) {
1310
16.1k
        return Status::OK();
1311
16.1k
    }
1312
    // Merge predicate-only nested paths into the root projection that is about to be scanned.
1313
    // Example: `SELECT s.a WHERE s.b > 1` first builds the output projection `s -> a` from
1314
    // ColumnMapping, while build_nested_struct_filter_projection_map() records `s -> b`. This merge
1315
    // produces one file scan projection `s -> a,b`.
1316
545
    RETURN_IF_ERROR(merge_local_column_index(projection, filter_projection_it->second));
1317
545
    return Status::OK();
1318
545
}
1319
1320
70
static bool table_root_is_map(const ColumnMapping& mapping) {
1321
70
    if (mapping.table_type == nullptr) {
1322
0
        return false;
1323
0
    }
1324
70
    return remove_nullable(mapping.table_type)->get_primitive_type() == TYPE_MAP;
1325
70
}
1326
1327
static Status add_scan_column(FileScanRequest* file_request, ColumnMapping* mapping,
1328
                              bool is_predicate_column, bool force_full_complex_scan_projection,
1329
305k
                              const FilterProjectionMap* filter_projections = nullptr) {
1330
305k
    const auto file_column_id = LocalColumnId(mapping->file_local_id.value());
1331
305k
    LocalColumnIndex projection = LocalColumnIndex::top_level(file_column_id);
1332
    // Columnar readers can turn a complex mapping into a nested file projection, but
1333
    // row-oriented readers must scan the full top-level complex field because all children are
1334
    // encoded in the same text cell.
1335
305k
    if (!force_full_complex_scan_projection && needs_nested_file_projection(*mapping)) {
1336
2.53k
        RETURN_IF_ERROR(build_complex_projection(*mapping, &projection));
1337
2.53k
    }
1338
305k
    if (is_predicate_column && !force_full_complex_scan_projection) {
1339
16.7k
        DCHECK(filter_projections != nullptr);
1340
        // If a projected complex root is also used by a predicate, rebuild the predicate scan
1341
        // projection from the output mapping before merging predicate-only children. For
1342
        // `SELECT s.a WHERE s.b > 1`, build_complex_projection() produces `s -> a` and
1343
        // merge_filter_projection() adds `s -> b`, so the predicate column reads both children.
1344
16.7k
        RETURN_IF_ERROR(merge_filter_projection(filter_projections, &projection));
1345
16.7k
    }
1346
305k
    FileScanRequestBuilder builder(file_request);
1347
305k
    if (is_predicate_column) {
1348
16.6k
        return builder.add_predicate_column(std::move(projection));
1349
16.6k
    }
1350
289k
    return builder.add_non_predicate_column(std::move(projection));
1351
305k
}
1352
1353
static const LocalColumnIndex* find_scan_projection(
1354
590k
        const std::vector<LocalColumnIndex>& scan_columns, LocalColumnId file_column_id) {
1355
590k
    const auto projection_it =
1356
7.11M
            std::ranges::find_if(scan_columns, [&](const LocalColumnIndex& projection) {
1357
7.11M
                return projection.column_id() == file_column_id;
1358
7.11M
            });
1359
590k
    return projection_it == scan_columns.end() ? nullptr : &*projection_it;
1360
590k
}
1361
1362
// Apply the final scan projection of one root file column back to its ColumnMapping. This updates
1363
// mapping.file_type/projected_file_children from the original file schema to the exact shape that
1364
// FileReader will return.
1365
//
1366
// Example: for `SELECT s.a WHERE s.b > 1`, add_scan_column() keeps only one predicate scan
1367
// projection `s -> a,b`. Applying that projection changes the mapping's file type from the full
1368
// file struct `s<a,b,c>` to the projected file struct `s<a,b>`, so later filter rewrite and
1369
// TableReader final materialization use the same column shape as the file-local block.
1370
static Status apply_scan_projection_to_mapping_file_type(const FileScanRequest& file_request,
1371
302k
                                                         ColumnMapping* mapping) {
1372
302k
    DORIS_CHECK(mapping != nullptr);
1373
302k
    DORIS_CHECK(mapping->file_local_id.has_value());
1374
302k
    const auto file_column_id = LocalColumnId(*mapping->file_local_id);
1375
    // Predicate columns are the actual scan projection when a column is used by row-level filters:
1376
    // add_scan_column() removes the duplicate non-predicate projection in that case.
1377
302k
    const auto* projection = find_scan_projection(file_request.predicate_columns, file_column_id);
1378
302k
    if (projection == nullptr) {
1379
288k
        projection = find_scan_projection(file_request.non_predicate_columns, file_column_id);
1380
288k
    }
1381
302k
    DORIS_CHECK(projection != nullptr);
1382
302k
    return apply_projection_to_mapping_file_type(*projection, mapping);
1383
302k
}
1384
1385
// Build extra scan projections required only by row-level filters on nested struct children.
1386
//
1387
// Example: for `SELECT s.a FROM t WHERE s.b.c > 1`, the output projection may only contain `s.a`,
1388
// but the file reader must also read `s.b.c` to evaluate the predicate. This function collects the
1389
// table-side filter path, resolves it through ColumnMapping first, and records the corresponding
1390
// file-side projection in filter_projections. This keeps renamed fields consistent across the scan
1391
// projection, row-level conjunct rewrite, and nested predicate pruning. Example:
1392
//   table filter path: s -> renamed_b -> c
1393
//   old file path:     s -> b -> c
1394
//   recorded path:     s -> b -> c
1395
// When add_scan_column() adds the same root as a predicate column, it rebuilds that root from the
1396
// output mapping, merges this filter-only projection into it, and removes the duplicate
1397
// non-predicate root entry.
1398
static Status build_nested_struct_filter_projection_map(
1399
        const std::vector<TableFilter>& table_filters, const std::vector<ColumnMapping>& mappings,
1400
38.7k
        FilterProjectionMap* filter_projections) {
1401
38.7k
    DORIS_CHECK(filter_projections != nullptr);
1402
38.7k
    filter_projections->clear();
1403
38.7k
    for (const auto& table_filter : table_filters) {
1404
23.0k
        if (table_filter.conjunct == nullptr) {
1405
2
            continue;
1406
2
        }
1407
        // Collect all nested struct paths in the table filter. For example, for
1408
        // `s.id > 5 AND element_at(s, 'renamed_name') = 'abc'`, collect the table paths
1409
        // `s -> id` and `s -> renamed_name`, then resolve each one to its file-side projection.
1410
23.0k
        std::vector<NestedStructPath> paths;
1411
23.0k
        collect_nested_struct_paths(table_filter.conjunct->root(), &paths);
1412
23.0k
        for (const auto& path : paths) {
1413
2.46k
            auto mapping_it = std::ranges::find_if(mappings, [&](const ColumnMapping& mapping) {
1414
2.46k
                return mapping.global_index == path.root_global_index;
1415
2.46k
            });
1416
820
            if (mapping_it == mappings.end() || !mapping_it->file_local_id.has_value() ||
1417
820
                path.selectors.empty()) {
1418
220
                continue;
1419
220
            }
1420
1421
600
            ResolvedNestedStructPath resolved;
1422
600
            LocalColumnIndex root_projection;
1423
600
            if (!resolve_nested_struct_path_for_file(path, mappings, &resolved)) {
1424
70
                if (!table_root_is_map(*mapping_it)) {
1425
69
                    continue;
1426
69
                }
1427
                // Direct map value filters such as `m.value.a > 1` need the value leaf for row
1428
                // evaluation even when the query only projects another value child. This is only a
1429
                // scan projection fallback; complex map/array expressions are still not rewritten
1430
                // into file-local conjuncts.
1431
1
                LocalColumnIndex child_projection;
1432
1
                RETURN_IF_ERROR(build_file_child_projection_from_schema(
1433
1
                        mapping_it->original_file_children, path.selectors, &child_projection));
1434
1
                if (child_projection.local_id() < 0) {
1435
0
                    continue;
1436
0
                }
1437
1
                root_projection = LocalColumnIndex::partial_local(*mapping_it->file_local_id);
1438
1
                root_projection.children.push_back(std::move(child_projection));
1439
530
            } else {
1440
530
                root_projection = std::move(resolved.file_projection);
1441
530
            }
1442
531
            auto filter_projection_it = filter_projections->find(root_projection.column_id());
1443
531
            if (filter_projection_it == filter_projections->end()) {
1444
495
                filter_projections->emplace(root_projection.column_id(),
1445
495
                                            std::move(root_projection));
1446
495
                continue;
1447
495
            }
1448
36
            RETURN_IF_ERROR(
1449
36
                    merge_local_column_index(&filter_projection_it->second, root_projection));
1450
36
        }
1451
23.0k
    }
1452
38.7k
    return Status::OK();
1453
38.7k
}
1454
1455
302k
static void rebuild_projection(ColumnMapping* mapping, LocalIndex block_position) {
1456
302k
    DORIS_CHECK(mapping->file_local_id.has_value());
1457
302k
    if (mapping->is_trivial || needs_complex_rematerialize(*mapping)) {
1458
300k
        mapping->projection = VExprContext::create_shared(VSlotRef::create_shared(
1459
300k
                cast_set<int>(block_position.value()), cast_set<int>(block_position.value()), -1,
1460
300k
                mapping->file_type, mapping->file_column_name));
1461
300k
        return;
1462
300k
    }
1463
1464
1.38k
    auto expr = Cast::create_shared(mapping->table_type);
1465
1.38k
    expr->add_child(VSlotRef::create_shared(cast_set<int>(block_position.value()),
1466
1.38k
                                            cast_set<int>(block_position.value()), -1,
1467
1.38k
                                            mapping->file_type, mapping->file_column_name));
1468
1.38k
    mapping->projection = VExprContext::create_shared(expr);
1469
1.38k
}
1470
1471
// Build file slot rewrite info from the localized filter targets. Only local targets can enter
1472
// file-reader expressions; constant and unset targets stay above the file reader.
1473
static std::map<GlobalIndex, FileSlotRewriteInfo> build_file_slot_rewrite_map(
1474
        const std::vector<ColumnMapping>& mappings,
1475
38.7k
        const std::map<GlobalIndex, FilterEntry>& filter_entries) {
1476
38.7k
    std::map<GlobalIndex, FileSlotRewriteInfo> global_to_file_slot;
1477
317k
    for (const auto& mapping : mappings) {
1478
317k
        const auto entry_it = filter_entries.find(mapping.global_index);
1479
317k
        if (entry_it == filter_entries.end() || !entry_it->second.is_local()) {
1480
14.8k
            continue;
1481
14.8k
        }
1482
302k
        DORIS_CHECK(mapping.file_local_id.has_value());
1483
302k
        global_to_file_slot.emplace(
1484
302k
                mapping.global_index,
1485
302k
                FileSlotRewriteInfo {.block_position = entry_it->second.local_index().value(),
1486
302k
                                     .file_type = mapping.file_type,
1487
302k
                                     .table_type = mapping.table_type,
1488
302k
                                     .file_column_name = mapping.file_column_name});
1489
302k
    }
1490
38.7k
    return global_to_file_slot;
1491
38.7k
}
1492
1493
Status TableColumnMapper::_create_by_index_mapping(const ColumnDefinition& table_column,
1494
                                                   const std::vector<ColumnDefinition>& file_schema,
1495
719
                                                   ColumnMapping* mapping) {
1496
719
    DORIS_CHECK(mapping != nullptr);
1497
719
    DORIS_CHECK(!table_column.is_partition_key);
1498
1499
    // Key contract: in BY_INDEX mode, `ColumnDefinition::identifier` TYPE_INT is interpreted as the
1500
    // 0-based position of this column inside `file_schema`. FE writes the physical file position
1501
    // of each non-partition projected column into that identifier. This interpretation allows:
1502
    //   - sparse projection: read only a subset of file columns (for example only `_col2`
1503
    //     and `_col4`);
1504
    //   - column reordering: table column order differs from file column order;
1505
    //   - no many-to-one mapping: FE must guarantee that each file position is referenced by at
1506
    //     most one table column.
1507
719
    const auto file_index = table_column.get_identifier_position();
1508
1509
    // Case A: file_index is in range, so build a direct positional mapping.
1510
    // The file column name (for example `_col0`) is intentionally ignored here.
1511
719
    if (file_index >= 0 && static_cast<size_t>(file_index) < file_schema.size()) {
1512
650
        return _create_direct_mapping(table_column, file_schema[static_cast<size_t>(file_index)],
1513
650
                                      mapping);
1514
650
    }
1515
1516
    // Case B: file_index is out of range, which means the file does not contain this column.
1517
    // Route it through the missing-column path used by schema evolution.
1518
69
    if (table_column.default_expr != nullptr) {
1519
65
        _set_constant_mapping(mapping, table_column.default_expr);
1520
65
        return Status::OK();
1521
65
    }
1522
    // Keep the mapping empty (`file_local_id` remains `nullopt`) and let the upper finalize
1523
    // stage fill NULL/default values.
1524
4
    return Status::OK();
1525
69
}
1526
1527
14.3k
void TableColumnMapper::_set_constant_mapping(ColumnMapping* mapping, VExprContextSPtr expr) {
1528
14.3k
    DORIS_CHECK(mapping != nullptr);
1529
14.3k
    DORIS_CHECK(expr != nullptr);
1530
14.3k
    mapping->default_expr = std::move(expr);
1531
14.3k
    mapping->constant_index = _constant_map.add(ConstantEntry {
1532
14.3k
            .global_index = mapping->global_index,
1533
14.3k
            .expr = mapping->default_expr,
1534
14.3k
            .type = mapping->table_type,
1535
14.3k
    });
1536
14.3k
    mapping->filter_conversion = FilterConversionType::CONSTANT;
1537
14.3k
}
1538
1539
Status TableColumnMapper::_create_mapping_for_column(const ColumnDefinition& table_column,
1540
                                                     GlobalIndex global_index,
1541
316k
                                                     ColumnMapping* mapping) {
1542
316k
    DORIS_CHECK(mapping != nullptr);
1543
316k
    *mapping = ColumnMapping {};
1544
316k
    mapping->global_index = global_index;
1545
316k
    mapping->table_column_name = table_column.name;
1546
316k
    mapping->table_type = table_column.type;
1547
316k
    const auto row_lineage_type = row_lineage_virtual_column_type(table_column, _options.mode);
1548
316k
    if (const auto* partition_value = find_partition_value(table_column, _partition_values);
1549
316k
        table_column.is_partition_key && partition_value != nullptr) {
1550
        // Partition values are split constants and must take precedence over defaults.
1551
10.6k
        _set_constant_mapping(mapping, VExprContext::create_shared(VLiteral::create_shared(
1552
10.6k
                                               mapping->table_type, *partition_value)));
1553
306k
    } else if (_options.mode == TableColumnMappingMode::BY_INDEX &&
1554
306k
               !table_column.is_partition_key && table_column.has_identifier_field_id()) {
1555
        // BY_INDEX interprets ColumnDefinition::identifier as physical file position.
1556
719
        RETURN_IF_ERROR(_create_by_index_mapping(table_column, _file_schema, mapping));
1557
305k
    } else if (const auto* file_field = _find_file_field(table_column, _file_schema)) {
1558
        // Normal physical file column mapping.
1559
301k
        RETURN_IF_ERROR(_create_direct_mapping(table_column, *file_field, mapping));
1560
301k
        if (row_lineage_type != TableVirtualColumnType::INVALID) {
1561
            // Iceberg v3 rewritten files may physically contain row lineage metadata fields.
1562
            // File non-null values must be preserved, while file NULLs still inherit from data file
1563
            // metadata in IcebergTableReader. Therefore the mapping has a real file source plus a
1564
            // virtual post-materialization step, and filters must wait for finalize output.
1565
72
            mapping->virtual_column_type = row_lineage_type;
1566
72
            mapping->filter_conversion = FilterConversionType::FINALIZE_ONLY;
1567
72
        }
1568
301k
    } else if (row_lineage_type != TableVirtualColumnType::INVALID) {
1569
        // Iceberg row lineage metadata fields are optional in data files. Missing fields are exposed
1570
        // as all-NULL table columns first; IcebergTableReader fills inherited values only when the
1571
        // split carries first_row_id / last_updated_sequence_number metadata.
1572
        // FE may attach a default_expr to these hidden metadata columns, but the Iceberg v3
1573
        // inheritance rule must take precedence over the generic missing-column default path.
1574
397
        mapping->virtual_column_type = row_lineage_type;
1575
3.52k
    } else if (table_column.name == BeConsts::ICEBERG_ROWID_COL) {
1576
        // Doris internal Iceberg row locator is never a physical Iceberg data column. It is built
1577
        // from file path, row position and partition metadata for delete/update/merge.
1578
98
        mapping->virtual_column_type = TableVirtualColumnType::ICEBERG_ROWID;
1579
3.58k
    } else if (table_column.default_expr != nullptr) {
1580
        // Missing schema-evolution column with an explicit default expression.
1581
3.58k
        _set_constant_mapping(mapping, table_column.default_expr);
1582
18.4E
    } else {
1583
18.4E
        if (table_column.is_partition_key) {
1584
0
            return Status::InvalidArgument(
1585
0
                    "Table column '{}' (global_index={}) does not have a matching partition value",
1586
0
                    table_column.name, mapping->global_index.value());
1587
0
        }
1588
18.4E
    }
1589
316k
    return Status::OK();
1590
316k
}
1591
1592
Status TableColumnMapper::_create_hidden_filter_mapping(const ColumnDefinition& table_column,
1593
                                                        GlobalIndex global_index,
1594
2
                                                        ColumnMapping* mapping) {
1595
2
    auto status = _create_mapping_for_column(table_column, global_index, mapping);
1596
2
    if (mapping->file_local_id.has_value() || mapping->constant_index.has_value() ||
1597
2
        mapping->virtual_column_type != TableVirtualColumnType::INVALID) {
1598
0
        return Status::OK();
1599
0
    }
1600
2
    if (_options.mode == TableColumnMappingMode::BY_NAME) {
1601
0
        return status;
1602
0
    }
1603
1604
    // Predicate-only slot refs carry the table name/type but do not carry the table-format field
1605
    // id used by BY_FIELD_ID or the file position used by BY_INDEX. Use a name fallback only for
1606
    // hidden filter localization; projected columns still obey the requested mapping mode.
1607
2
    const auto* file_field =
1608
2
            matcher_for_mode(TableColumnMappingMode::BY_NAME).find(table_column, _file_schema);
1609
2
    if (file_field == nullptr) {
1610
0
        return status;
1611
0
    }
1612
2
    ColumnMapping fallback_mapping;
1613
2
    fallback_mapping.global_index = global_index;
1614
2
    fallback_mapping.table_column_name = table_column.name;
1615
2
    fallback_mapping.table_type = table_column.type;
1616
2
    RETURN_IF_ERROR(_create_direct_mapping(table_column, *file_field, &fallback_mapping));
1617
2
    *mapping = std::move(fallback_mapping);
1618
2
    return Status::OK();
1619
2
}
1620
1621
Status TableColumnMapper::_build_hidden_filter_mappings(
1622
38.6k
        const std::vector<TableFilter>& table_filters) {
1623
38.6k
    _hidden_mappings.clear();
1624
1625
38.6k
    std::map<GlobalIndex, ColumnDefinition> filter_columns;
1626
38.6k
    for (const auto& table_filter : table_filters) {
1627
23.0k
        if (table_filter.conjunct != nullptr) {
1628
23.0k
            collect_top_level_slot_columns(table_filter.conjunct->root(), &filter_columns);
1629
23.0k
        }
1630
23.0k
    }
1631
1632
    // TableColumnPredicates only carry GlobalIndex and predicate objects. They do not provide the
1633
    // top-level column name/type needed to build a hidden mapping, so a predicate-only column can
1634
    // be hidden-mapped only when the same root slot also appears in a conjunct.
1635
38.6k
    for (const auto& [global_index, table_column] : filter_columns) {
1636
17.6k
        if (_find_mapping(global_index) != nullptr) {
1637
            // Ignore columns that are already mapped by the projected columns
1638
17.6k
            continue;
1639
17.6k
        }
1640
8
        ColumnMapping mapping;
1641
8
        RETURN_IF_ERROR(_create_hidden_filter_mapping(table_column, global_index, &mapping));
1642
8
        if (mapping.file_local_id.has_value() || mapping.constant_index.has_value() ||
1643
8
            mapping.virtual_column_type != TableVirtualColumnType::INVALID) {
1644
2
            _hidden_mappings.push_back(std::move(mapping));
1645
2
        }
1646
8
    }
1647
38.6k
    return Status::OK();
1648
38.6k
}
1649
1650
Status TableColumnMapper::create_mapping(const std::vector<ColumnDefinition>& projected_columns,
1651
                                         const std::map<std::string, Field>& partition_values,
1652
38.7k
                                         const std::vector<ColumnDefinition>& file_schema) {
1653
38.7k
    clear();
1654
38.7k
    _partition_values = partition_values;
1655
38.7k
    _file_schema = file_schema;
1656
355k
    for (size_t column_idx = 0; column_idx < projected_columns.size(); ++column_idx) {
1657
317k
        ColumnMapping mapping;
1658
317k
        RETURN_IF_ERROR(_create_mapping_for_column(projected_columns[column_idx],
1659
317k
                                                   GlobalIndex(column_idx), &mapping));
1660
317k
        _mappings.push_back(std::move(mapping));
1661
317k
    }
1662
38.7k
    return Status::OK();
1663
38.7k
}
1664
1665
116k
std::vector<ColumnMapping> TableColumnMapper::_filter_visible_mappings() const {
1666
116k
    std::vector<ColumnMapping> mappings;
1667
116k
    mappings.reserve(_mappings.size() + _hidden_mappings.size());
1668
116k
    mappings.insert(mappings.end(), _mappings.begin(), _mappings.end());
1669
116k
    mappings.insert(mappings.end(), _hidden_mappings.begin(), _hidden_mappings.end());
1670
116k
    return mappings;
1671
116k
}
1672
1673
38.6k
Status TableColumnMapper::_build_filter_entries(const FileScanRequest& file_request) {
1674
38.6k
    _filter_entries.clear();
1675
38.6k
    const auto mappings = _filter_visible_mappings();
1676
317k
    for (const auto& mapping : mappings) {
1677
317k
        FilterEntry entry;
1678
317k
        if (mapping.constant_index.has_value()) {
1679
14.3k
            entry = FilterEntry::constant(*mapping.constant_index);
1680
302k
        } else if (mapping.file_local_id.has_value() &&
1681
302k
                   filter_conversion_has_local_source(mapping.filter_conversion)) {
1682
302k
            const auto local_position_it =
1683
302k
                    file_request.local_positions.find(LocalColumnId(*mapping.file_local_id));
1684
302k
            if (local_position_it != file_request.local_positions.end()) {
1685
302k
                entry = FilterEntry::local(local_position_it->second);
1686
302k
            }
1687
302k
        }
1688
317k
        _filter_entries.emplace(mapping.global_index, entry);
1689
317k
    }
1690
38.6k
    return Status::OK();
1691
38.6k
}
1692
1693
Status TableColumnMapper::create_scan_request(
1694
        const std::vector<TableFilter>& table_filters,
1695
        const TableColumnPredicates& table_column_predicates,
1696
        const std::vector<ColumnDefinition>& projected_columns, FileScanRequest* file_request,
1697
38.6k
        RuntimeState* runtime_state) {
1698
    // FileReader evaluates expressions against a file-local block. This mapper owns the
1699
    // table-column to file-column conversion, so it also owns the file-local block positions.
1700
38.6k
    file_request->predicate_columns.clear();
1701
38.6k
    file_request->non_predicate_columns.clear();
1702
38.6k
    file_request->local_positions.clear();
1703
38.6k
    file_request->conjuncts.clear();
1704
38.6k
    file_request->delete_conjuncts.clear();
1705
38.6k
    file_request->column_predicate_filters.clear();
1706
38.6k
    _filter_entries.clear();
1707
    // 1. Build referenced non-predicate columns
1708
355k
    for (size_t column_idx = 0; column_idx < projected_columns.size(); ++column_idx) {
1709
316k
        const auto global_index = GlobalIndex(column_idx);
1710
316k
        auto* mapping = _find_mapping(global_index);
1711
316k
        if (mapping != nullptr && mapping->file_local_id.has_value()) {
1712
            // A file column can be read lazily as a non-predicate column only when it is not used
1713
            // by row-level expression filters. Single-column ColumnPredicate filters are pruning
1714
            // hints only and must not force row-level predicate materialization.
1715
302k
            bool used_by_filter = false;
1716
302k
            for (const auto& table_filter : table_filters) {
1717
47.2k
                const auto& global_indices = table_filter.global_indices;
1718
47.2k
                if (std::find(global_indices.begin(), global_indices.end(), global_index) !=
1719
47.2k
                            global_indices.end() &&
1720
47.2k
                    filter_conversion_has_local_source(mapping->filter_conversion)) {
1721
13.5k
                    used_by_filter = true;
1722
13.5k
                    break;
1723
13.5k
                }
1724
47.2k
            }
1725
302k
            if (!used_by_filter || !enable_lazy_materialization()) {
1726
288k
                RETURN_IF_ERROR(add_scan_column(file_request, mapping, false,
1727
288k
                                                force_full_complex_scan_projection()));
1728
288k
            }
1729
302k
        }
1730
316k
    }
1731
    // 2. Build referenced predicate columns
1732
    // Hidden filter mappings must be built before localizing filters, so that they can be localized together with visible mappings and referenced by localized filter expressions.
1733
38.6k
    RETURN_IF_ERROR(_build_hidden_filter_mappings(table_filters));
1734
38.6k
    RETURN_IF_ERROR(
1735
38.6k
            localize_filters(table_filters, table_column_predicates, file_request, runtime_state));
1736
    // 3. Rebuild output projection expressions for projected columns. localize_filters() has
1737
    // already applied the final scan projection to mapping.file_type/projected_file_children before
1738
    // rewriting filter expressions.
1739
317k
    for (auto& mapping : _mappings) {
1740
317k
        if (!mapping.file_local_id.has_value()) {
1741
14.8k
            continue;
1742
14.8k
        }
1743
302k
        auto position_it =
1744
302k
                file_request->local_positions.find(LocalColumnId(*mapping.file_local_id));
1745
302k
        DORIS_CHECK(position_it != file_request->local_positions.end())
1746
49
                << file_request->local_positions.size() << " " << *mapping.file_local_id << " "
1747
49
                << mapping.file_column_name;
1748
302k
        rebuild_projection(&mapping, position_it->second);
1749
302k
    }
1750
38.6k
    return Status::OK();
1751
38.6k
}
1752
1753
480k
ColumnMapping* TableColumnMapper::_find_mapping(GlobalIndex global_index) {
1754
8.55M
    for (auto& mapping : _mappings) {
1755
8.55M
        if (mapping.global_index == global_index) {
1756
480k
            return &mapping;
1757
480k
        }
1758
8.55M
    }
1759
112
    return nullptr;
1760
480k
}
1761
1762
145k
ColumnMapping* TableColumnMapper::_find_filter_mapping(GlobalIndex global_index) {
1763
145k
    if (auto* mapping = _find_mapping(global_index); mapping != nullptr) {
1764
145k
        return mapping;
1765
145k
    }
1766
74
    for (auto& mapping : _hidden_mappings) {
1767
2
        if (mapping.global_index == global_index) {
1768
2
            return &mapping;
1769
2
        }
1770
2
    }
1771
72
    return nullptr;
1772
74
}
1773
1774
Status TableColumnMapper::localize_filters(const std::vector<TableFilter>& table_filters,
1775
                                           const TableColumnPredicates& table_column_predicates,
1776
                                           FileScanRequest* file_request,
1777
38.7k
                                           RuntimeState* runtime_state) {
1778
38.7k
    FilterProjectionMap filter_projections;
1779
38.7k
    auto filter_mappings = _filter_visible_mappings();
1780
38.7k
    RETURN_IF_ERROR(build_nested_struct_filter_projection_map(table_filters, filter_mappings,
1781
38.7k
                                                              &filter_projections));
1782
38.7k
    for (const auto& table_filter : table_filters) {
1783
23.6k
        for (const auto& global_index : table_filter.global_indices) {
1784
23.6k
            auto* mapping = _find_filter_mapping(global_index);
1785
23.6k
            if (mapping == nullptr || !mapping->file_local_id.has_value() ||
1786
23.6k
                !filter_conversion_has_local_source(mapping->filter_conversion)) {
1787
6.64k
                continue;
1788
6.64k
            }
1789
16.9k
            RETURN_IF_ERROR(add_scan_column(file_request, mapping, enable_lazy_materialization(),
1790
16.9k
                                            force_full_complex_scan_projection(),
1791
16.9k
                                            &filter_projections));
1792
16.9k
        }
1793
23.0k
    }
1794
    // Rebuild the file type for every scan-local mapping before expression rewrite. Predicate-only
1795
    // hidden mappings must see the same projected file type as the file reader will produce.
1796
317k
    for (auto& mapping : _mappings) {
1797
317k
        if (mapping.file_local_id.has_value() &&
1798
317k
            file_request->local_positions.contains(LocalColumnId(*mapping.file_local_id))) {
1799
302k
            RETURN_IF_ERROR(apply_scan_projection_to_mapping_file_type(*file_request, &mapping));
1800
302k
        }
1801
317k
    }
1802
38.7k
    for (auto& mapping : _hidden_mappings) {
1803
2
        if (mapping.file_local_id.has_value() &&
1804
2
            file_request->local_positions.contains(LocalColumnId(*mapping.file_local_id))) {
1805
2
            RETURN_IF_ERROR(apply_scan_projection_to_mapping_file_type(*file_request, &mapping));
1806
2
        }
1807
2
    }
1808
38.7k
    RETURN_IF_ERROR(_build_filter_entries(*file_request));
1809
1810
    // Build the complete table-slot rewrite map after all predicate columns have been assigned.
1811
    // This keeps expression localization independent from filter iteration order.
1812
38.7k
    filter_mappings = _filter_visible_mappings();
1813
38.7k
    const auto global_to_file_slot = build_file_slot_rewrite_map(filter_mappings, _filter_entries);
1814
38.7k
    for (const auto& table_filter : table_filters) {
1815
23.0k
        if (table_filter.conjunct != nullptr &&
1816
23.0k
            table_filter_has_only_local_entries(table_filter, _filter_entries)) {
1817
16.5k
            RewriteContext rewrite_context {.runtime_state = runtime_state};
1818
16.5k
            VExprSPtr rewrite_root;
1819
16.5k
            Status clone_status;
1820
16.5k
            try {
1821
16.5k
                clone_status = clone_table_expr_tree(table_filter.conjunct->root(), &rewrite_root);
1822
16.5k
            } catch (const Exception& e) {
1823
                // Some table filters contain complex intermediate values, for example
1824
                // `element_at(MAP_VALUES(m)[1], 'age') > 30`. The current file-local rewrite only
1825
                // understands top-level slots and struct-element paths rooted at top-level slots;
1826
                // cloning such expressions can hit the generic TExpr complex-type limitation.
1827
                // Leave them above TableReader, where Scanner evaluates the original table-level
1828
                // conjunct after final materialization.
1829
0
#ifndef NDEBUG
1830
0
                return Status::InternalError(
1831
0
                        "Failed to clone table filter for file-local rewrite: {}, expr={}",
1832
0
                        e.to_string(), table_filter.conjunct->root()->debug_string());
1833
#else
1834
                continue;
1835
#endif
1836
0
            } catch (const std::exception& e) {
1837
0
#ifndef NDEBUG
1838
0
                return Status::InternalError(
1839
0
                        "Failed to clone table filter for file-local rewrite: {}, expr={}",
1840
0
                        e.what(), table_filter.conjunct->root()->debug_string());
1841
#else
1842
                continue;
1843
#endif
1844
0
            }
1845
16.5k
            if (!clone_status.ok()) {
1846
0
#ifndef NDEBUG
1847
0
                return Status::InternalError(
1848
0
                        "Failed to clone table filter for file-local rewrite: {}, expr={}",
1849
0
                        clone_status.to_string(), table_filter.conjunct->root()->debug_string());
1850
#else
1851
                continue;
1852
#endif
1853
0
            }
1854
16.5k
            bool can_localize = true;
1855
16.5k
            auto localized_root = rewrite_table_expr_to_file_expr(rewrite_root, global_to_file_slot,
1856
16.5k
                                                                  filter_mappings, &rewrite_context,
1857
16.5k
                                                                  &can_localize);
1858
16.5k
            if (!can_localize) {
1859
1.28k
                continue;
1860
1.28k
            }
1861
15.2k
            auto localized_conjunct = VExprContext::create_shared(std::move(localized_root));
1862
15.2k
            RETURN_IF_ERROR(rewrite_context.prepare_created_exprs(localized_conjunct.get()));
1863
15.2k
            file_request->conjuncts.push_back(std::move(localized_conjunct));
1864
15.2k
        }
1865
23.0k
    }
1866
38.7k
    if (enable_column_predicate_filters()) {
1867
122k
        for (const auto& [global_index, predicates] : table_column_predicates) {
1868
122k
            const auto* mapping = _find_filter_mapping(global_index);
1869
122k
            const auto entry_it = _filter_entries.find(global_index);
1870
122k
            if (mapping == nullptr || !mapping->file_local_id.has_value() || predicates.empty() ||
1871
122k
                entry_it == _filter_entries.end() || !entry_it->second.is_local() ||
1872
122k
                !column_predicate_can_use_local_source(mapping->filter_conversion) ||
1873
122k
                mapping->file_type == nullptr) {
1874
114k
                continue;
1875
114k
            }
1876
7.91k
            FileColumnPredicateFilter column_predicate_filter;
1877
7.91k
            column_predicate_filter.file_column_id = LocalColumnId(*mapping->file_local_id);
1878
7.91k
            column_predicate_filter.target =
1879
7.91k
                    FileNestedPredicateTarget(column_predicate_filter.file_column_id);
1880
7.91k
            const auto file_primitive_type =
1881
7.91k
                    remove_nullable(mapping->file_type)->get_primitive_type();
1882
10.5k
            for (const auto& predicate : predicates) {
1883
10.5k
                DORIS_CHECK(predicate != nullptr);
1884
10.5k
                if (predicate->primitive_type() == file_primitive_type) {
1885
7.27k
                    column_predicate_filter.predicates.push_back(predicate);
1886
7.27k
                }
1887
10.5k
            }
1888
7.91k
            if (column_predicate_filter.predicates.empty()) {
1889
2.66k
                continue;
1890
2.66k
            }
1891
5.24k
            file_request->column_predicate_filters.push_back(std::move(column_predicate_filter));
1892
5.24k
        }
1893
32.5k
        for (const auto& table_filter : table_filters) {
1894
22.1k
            if (table_filter.conjunct == nullptr ||
1895
22.1k
                !table_filter_has_only_local_entries(table_filter, _filter_entries)) {
1896
5.93k
                continue;
1897
5.93k
            }
1898
16.2k
            std::vector<FileColumnPredicateFilter> nested_column_predicate_filters;
1899
16.2k
            collect_nested_column_predicate_filters(table_filter.conjunct->root(), filter_mappings,
1900
16.2k
                                                    &nested_column_predicate_filters);
1901
16.2k
            for (auto& column_predicate_filter : nested_column_predicate_filters) {
1902
350
                merge_column_predicate_filter(std::move(column_predicate_filter),
1903
350
                                              &file_request->column_predicate_filters);
1904
350
            }
1905
16.2k
        }
1906
32.5k
    }
1907
38.7k
    return Status::OK();
1908
38.7k
}
1909
1910
const ColumnDefinition* TableColumnMapper::_find_file_field(
1911
        const ColumnDefinition& table_column,
1912
305k
        const std::vector<ColumnDefinition>& file_schema) const {
1913
305k
    if (table_column.name.starts_with(BeConsts::GLOBAL_ROWID_COL)) {
1914
14.9k
        const auto field_it = std::ranges::find_if(file_schema, [](const ColumnDefinition& field) {
1915
14.9k
            return field.column_type == ColumnType::GLOBAL_ROWID;
1916
14.9k
        });
1917
2.13k
        return field_it == file_schema.end() ? nullptr : &*field_it;
1918
2.13k
    }
1919
303k
    return matcher_for_mode(_options.mode).find(table_column, file_schema);
1920
305k
}
1921
1922
Status TableColumnMapper::_create_direct_mapping(const ColumnDefinition& table_column,
1923
                                                 const ColumnDefinition& file_field,
1924
554k
                                                 ColumnMapping* mapping) const {
1925
554k
    DORIS_CHECK(mapping != nullptr);
1926
554k
    DORIS_CHECK(file_field.local_id >= 0 || file_field.local_id == GLOBAL_ROWID_COLUMN_ID);
1927
554k
    mapping->file_local_id = file_field.local_id;
1928
554k
    mapping->table_column_name = table_column.name;
1929
554k
    mapping->file_column_name = file_field.name;
1930
554k
    mapping->original_file_type = file_field.type;
1931
554k
    mapping->original_file_children = file_field.children;
1932
554k
    mapping->projected_file_children = file_field.children;
1933
554k
    mapping->file_type = file_field.type;
1934
554k
    mapping->is_trivial = mapping_can_use_file_column_directly(*mapping);
1935
554k
    mapping->filter_conversion = mapping->is_trivial ? FilterConversionType::COPY_DIRECTLY
1936
554k
                                                     : FilterConversionType::CAST_FILTER;
1937
554k
    mapping->child_mappings.clear();
1938
1939
554k
    auto table_children = table_column.children;
1940
554k
    const auto nested_table_type = remove_nullable(mapping->table_type);
1941
    // Some scan paths, especially SELECT *, only carry the complete complex DataType for a table
1942
    // column and leave ColumnDefinition::children empty. If the file type is an older complex
1943
    // schema, treating this as a leaf mapping would make TableReader fall back to a plain CAST.
1944
    // That is invalid for evolved structs with different field counts.
1945
    //
1946
    // Example:
1947
    //   table column type: Map(String, Struct(age, full_name, gender))
1948
    //   old file type:    Map(String, Struct(age, name))
1949
    //   table children:   empty
1950
    //
1951
    // Synthesize key/value/struct-field children from the table type so the normal recursive
1952
    // mapping path can rematerialize `name -> full_name` and fill missing `gender` with defaults,
1953
    // instead of trying to CAST Struct(age, name) to Struct(age, full_name, gender).
1954
554k
    const bool synthesized_table_children =
1955
554k
            table_children.empty() && is_complex_type(nested_table_type->get_primitive_type()) &&
1956
554k
            !mapping->table_type->equals(*mapping->file_type);
1957
554k
    if (synthesized_table_children) {
1958
4
        table_children = synthesize_complex_children_from_type(mapping->table_type);
1959
554k
    } else if (!table_children.empty() && !mapping->table_type->equals(*mapping->file_type)) {
1960
6.64k
        complete_required_complex_children_from_type(mapping->table_type, &table_children);
1961
6.64k
    }
1962
1963
554k
    if (!table_children.empty()) {
1964
166k
        if (!is_complex_type(remove_nullable(mapping->file_type)->get_primitive_type())) {
1965
0
            return Status::NotSupported(
1966
0
                    "Cannot map complex table column '{}' to scalar parquet column '{}', table "
1967
0
                    "type={}, file type={}",
1968
0
                    table_column.name, file_field.name, mapping->table_type->get_name(),
1969
0
                    mapping->file_type->get_name());
1970
0
        }
1971
166k
        RETURN_IF_ERROR(validate_file_schema_children(file_field));
1972
166k
        std::vector<int32_t> synthesized_used_file_child_ids;
1973
421k
        for (size_t table_child_idx = 0; table_child_idx < table_children.size();
1974
254k
             ++table_child_idx) {
1975
254k
            const auto& table_child = table_children[table_child_idx];
1976
254k
            const auto* file_child =
1977
254k
                    find_file_child_for_mapping(table_child, file_field, _options.mode,
1978
254k
                                                table_child_idx, synthesized_table_children);
1979
254k
            if (synthesized_table_children && file_child != nullptr) {
1980
8
                const auto file_child_id = file_child->file_local_id();
1981
8
                if (std::ranges::find(synthesized_used_file_child_ids, file_child_id) !=
1982
8
                    synthesized_used_file_child_ids.end()) {
1983
2
                    file_child = nullptr;
1984
2
                    for (const auto& candidate : file_field.children) {
1985
2
                        const auto candidate_id = candidate.file_local_id();
1986
2
                        if (std::ranges::find(synthesized_used_file_child_ids, candidate_id) ==
1987
2
                            synthesized_used_file_child_ids.end()) {
1988
2
                            file_child = &candidate;
1989
2
                            break;
1990
2
                        }
1991
2
                    }
1992
2
                }
1993
8
                if (file_child != nullptr) {
1994
8
                    synthesized_used_file_child_ids.push_back(file_child->file_local_id());
1995
8
                }
1996
8
            }
1997
254k
            if (file_child == nullptr) {
1998
2.50k
                ColumnMapping child_mapping;
1999
2.50k
                child_mapping.table_column_name = table_child.name;
2000
2.50k
                child_mapping.file_column_name = table_child.name;
2001
2.50k
                child_mapping.table_type = table_child.type;
2002
2.50k
                child_mapping.file_type = table_child.type;
2003
2.50k
                child_mapping.filter_conversion = FilterConversionType::FINALIZE_ONLY;
2004
2.50k
                mapping->child_mappings.push_back(std::move(child_mapping));
2005
2.50k
                continue;
2006
2.50k
            }
2007
252k
            ColumnMapping child_mapping;
2008
252k
            child_mapping.table_column_name = table_child.name;
2009
252k
            child_mapping.table_type = table_child.type;
2010
252k
            RETURN_IF_ERROR(_create_direct_mapping(table_child, *file_child, &child_mapping));
2011
252k
            mapping->child_mappings.push_back(std::move(child_mapping));
2012
252k
        }
2013
166k
        if (needs_projected_file_type_rebuild(*mapping)) {
2014
            // If complex projection prunes some children, we have to rebuild the projected file type to make sure the reader expression can find the correct child types by name.
2015
6.95k
            RETURN_IF_ERROR(rebuild_projected_file_children_and_type(
2016
6.95k
                    mapping->file_type, mapping->original_file_children, mapping->child_mappings,
2017
6.95k
                    &mapping->projected_file_children, &mapping->file_type));
2018
6.95k
            DCHECK(mapping->table_type != nullptr);
2019
6.95k
            mapping->is_trivial = mapping_can_use_file_column_directly(*mapping);
2020
            // TODO: ? READER_EXPRESSION
2021
6.95k
            mapping->filter_conversion = mapping->is_trivial
2022
6.95k
                                                 ? FilterConversionType::COPY_DIRECTLY
2023
6.95k
                                                 : FilterConversionType::READER_EXPRESSION;
2024
6.95k
        }
2025
166k
    }
2026
554k
    return Status::OK();
2027
554k
}
2028
2029
} // namespace doris::format