Coverage Report

Created: 2026-06-09 13:53

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/file_scanner_v2.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/file_scanner_v2.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Exprs_types.h>
22
#include <gen_cpp/ExternalTableSchema_types.h>
23
#include <gen_cpp/PlanNodes_types.h>
24
25
#include <algorithm>
26
#include <charconv>
27
#include <map>
28
#include <memory>
29
#include <optional>
30
#include <string>
31
#include <string_view>
32
#include <utility>
33
34
#include "common/cast_set.h"
35
#include "common/config.h"
36
#include "common/status.h"
37
#include "core/assert_cast.h"
38
#include "core/block/column_with_type_and_name.h"
39
#include "core/column/column.h"
40
#include "core/data_type/data_type.h"
41
#include "core/data_type/data_type_array.h"
42
#include "core/data_type/data_type_map.h"
43
#include "core/data_type/data_type_nullable.h"
44
#include "core/data_type/data_type_struct.h"
45
#include "core/data_type_serde/data_type_serde.h"
46
#include "core/string_ref.h"
47
#include "exec/common/util.hpp"
48
#include "exec/operator/scan_operator.h"
49
#include "exprs/vexpr.h"
50
#include "exprs/vexpr_context.h"
51
#include "exprs/vslot_ref.h"
52
#include "format/format_common.h"
53
#include "format_v2/column_mapper.h"
54
#include "format_v2/expr/slot_ref.h"
55
#include "format_v2/table/hive_reader.h"
56
#include "format_v2/table/paimon_reader.h"
57
#include "format_v2/table_reader.h"
58
#include "format_v2/table/iceberg_reader.h"
59
#include "io/io_common.h"
60
#include "runtime/descriptors.h"
61
#include "runtime/runtime_state.h"
62
63
namespace doris {
64
namespace {
65
66
6
std::string table_format_name(const TFileRangeDesc& range) {
67
6
    return range.__isset.table_format_params ? range.table_format_params.table_format_type
68
6
                                             : "NotSet";
69
6
}
70
71
TFileFormatType::type get_range_format_type(const TFileScanRangeParams& params,
72
8
                                            const TFileRangeDesc& range) {
73
8
    return range.__isset.format_type ? range.format_type : params.format_type;
74
8
}
75
76
6
bool is_supported_table_format(const TFileRangeDesc& range) {
77
6
    const auto table_format = table_format_name(range);
78
6
    return table_format == "NotSet" || table_format == "tvf" || table_format == "hive" ||
79
6
           table_format == "iceberg" || table_format == "paimon";
80
6
}
81
82
0
bool is_partition_slot(const TFileScanSlotInfo& slot_info) {
83
0
    return slot_info.__isset.category ? slot_info.category == TColumnCategory::PARTITION_KEY
84
0
                                      : !slot_info.is_file_slot;
85
0
}
86
87
2
bool parse_non_negative_int(std::string_view value, int32_t* result) {
88
2
    DORIS_CHECK(result != nullptr);
89
2
    int32_t parsed = -1;
90
2
    const auto* begin = value.data();
91
2
    const auto* end = begin + value.size();
92
2
    const auto [ptr, ec] = std::from_chars(begin, end, parsed);
93
2
    if (ec != std::errc() || ptr != end || parsed < 0) {
94
0
        return false;
95
0
    }
96
2
    *result = parsed;
97
2
    return true;
98
2
}
99
100
0
std::string access_path_to_string(const std::vector<std::string>& path) {
101
0
    return fmt::format("{}", fmt::join(path, "."));
102
0
}
103
104
format::ColumnDefinition* find_or_add_child(format::ColumnDefinition* parent, int32_t id,
105
8
                                            std::string name, DataTypePtr type) {
106
8
    DORIS_CHECK(parent != nullptr);
107
8
    for (auto& child : parent->children) {
108
2
        if ((child.has_identifier_field_id() && child.get_identifier_field_id() == id) ||
109
2
            child.name == name) {
110
0
            return &child;
111
0
        }
112
2
    }
113
8
    parent->children.push_back({
114
8
            .identifier = Field::create_field<TYPE_INT>(id),
115
8
            .name = std::move(name),
116
8
            .type = std::move(type),
117
8
            .children = {},
118
8
            .default_expr = nullptr,
119
8
            .is_partition_key = false,
120
8
    });
121
8
    return &parent->children.back();
122
8
}
123
124
0
const schema::external::TField* get_field_ptr(const schema::external::TFieldPtr& field_ptr) {
125
0
    if (!field_ptr.__isset.field_ptr || field_ptr.field_ptr == nullptr) {
126
0
        return nullptr;
127
0
    }
128
0
    return field_ptr.field_ptr.get();
129
0
}
130
131
0
bool external_field_matches_name(const schema::external::TField& field, const std::string& name) {
132
0
    if (field.__isset.name && to_lower(field.name) == to_lower(name)) {
133
0
        return true;
134
0
    }
135
0
    return field.__isset.name_mapping &&
136
0
           std::ranges::any_of(field.name_mapping, [&](const std::string& alias) {
137
0
               return to_lower(alias) == to_lower(name);
138
0
           });
139
0
}
140
141
DataTypePtr find_struct_child_type_by_name(const DataTypeStruct& struct_type,
142
0
                                           const std::string& field_name) {
143
0
    for (size_t field_idx = 0; field_idx < struct_type.get_elements().size(); ++field_idx) {
144
0
        if (to_lower(struct_type.get_element_name(field_idx)) == to_lower(field_name)) {
145
0
            return struct_type.get_element(field_idx);
146
0
        }
147
0
    }
148
0
    return nullptr;
149
0
}
150
151
format::ColumnDefinition build_schema_column_from_external_field(
152
0
        const schema::external::TField& field, DataTypePtr type) {
153
0
    format::ColumnDefinition column {
154
0
            .identifier = field.__isset.id ? Field::create_field<TYPE_INT>(field.id) : Field {},
155
0
            .name = field.__isset.name ? field.name : "",
156
0
            .name_mapping = field.__isset.name_mapping ? field.name_mapping
157
0
                                                       : std::vector<std::string> {},
158
0
            .type = std::move(type),
159
0
            .children = {},
160
0
            .default_expr = nullptr,
161
0
            .is_partition_key = false,
162
0
    };
163
0
    if (column.type == nullptr || !field.__isset.nestedField) {
164
0
        return column;
165
0
    }
166
167
0
    const auto nested_type = remove_nullable(column.type);
168
0
    switch (nested_type->get_primitive_type()) {
169
0
    case TYPE_STRUCT: {
170
0
        if (!field.nestedField.__isset.struct_field ||
171
0
            !field.nestedField.struct_field.__isset.fields) {
172
0
            return column;
173
0
        }
174
0
        const auto& struct_type = assert_cast<const DataTypeStruct&>(*nested_type);
175
0
        for (const auto& child_ptr : field.nestedField.struct_field.fields) {
176
0
            const auto* child_field = get_field_ptr(child_ptr);
177
0
            if (child_field == nullptr || !child_field->__isset.name) {
178
0
                continue;
179
0
            }
180
0
            auto child_type = find_struct_child_type_by_name(struct_type, child_field->name);
181
0
            if (child_type == nullptr) {
182
0
                continue;
183
0
            }
184
0
            column.children.push_back(
185
0
                    build_schema_column_from_external_field(*child_field, child_type));
186
0
        }
187
0
        break;
188
0
    }
189
0
    case TYPE_ARRAY: {
190
0
        if (!field.nestedField.__isset.array_field ||
191
0
            !field.nestedField.array_field.__isset.item_field) {
192
0
            return column;
193
0
        }
194
0
        const auto* item_field = get_field_ptr(field.nestedField.array_field.item_field);
195
0
        if (item_field == nullptr) {
196
0
            return column;
197
0
        }
198
0
        const auto& array_type = assert_cast<const DataTypeArray&>(*nested_type);
199
0
        column.children.push_back(
200
0
                build_schema_column_from_external_field(*item_field, array_type.get_nested_type()));
201
0
        break;
202
0
    }
203
0
    case TYPE_MAP: {
204
0
        if (!field.nestedField.__isset.map_field ||
205
0
            !field.nestedField.map_field.__isset.key_field ||
206
0
            !field.nestedField.map_field.__isset.value_field) {
207
0
            return column;
208
0
        }
209
0
        const auto& map_type = assert_cast<const DataTypeMap&>(*nested_type);
210
0
        const auto* key_field = get_field_ptr(field.nestedField.map_field.key_field);
211
0
        if (key_field != nullptr) {
212
0
            column.children.push_back(
213
0
                    build_schema_column_from_external_field(*key_field, map_type.get_key_type()));
214
0
        }
215
0
        const auto* value_field = get_field_ptr(field.nestedField.map_field.value_field);
216
0
        if (value_field != nullptr) {
217
0
            column.children.push_back(build_schema_column_from_external_field(
218
0
                    *value_field, map_type.get_value_type()));
219
0
        }
220
0
        break;
221
0
    }
222
0
    default:
223
0
        break;
224
0
    }
225
0
    return column;
226
0
}
227
228
const format::ColumnDefinition* find_schema_child_by_path(
229
4
        const format::ColumnDefinition* schema_column, const std::string& child_path) {
230
4
    if (schema_column == nullptr) {
231
3
        return nullptr;
232
3
    }
233
1
    int32_t parsed_field_id = -1;
234
1
    if (parse_non_negative_int(child_path, &parsed_field_id)) {
235
1
        const auto child_it = std::ranges::find_if(
236
2
                schema_column->children, [&](const format::ColumnDefinition& child) {
237
2
                    return child.has_identifier_field_id() &&
238
2
                           child.get_identifier_field_id() == parsed_field_id;
239
2
                });
240
1
        return child_it == schema_column->children.end() ? nullptr : &*child_it;
241
1
    }
242
0
    const auto child_it = std::ranges::find_if(schema_column->children, [&](const auto& child) {
243
0
        if (to_lower(child.name) == to_lower(child_path)) {
244
0
            return true;
245
0
        }
246
0
        return std::ranges::any_of(child.name_mapping, [&](const std::string& alias) {
247
0
            return to_lower(alias) == to_lower(child_path);
248
0
        });
249
0
    });
250
0
    return child_it == schema_column->children.end() ? nullptr : &*child_it;
251
1
}
252
253
4
int32_t schema_field_id(const format::ColumnDefinition* schema_column) {
254
4
    if (schema_column == nullptr || !schema_column->has_identifier_field_id()) {
255
3
        return -1;
256
3
    }
257
1
    return schema_column->get_identifier_field_id();
258
4
}
259
260
const schema::external::TField* find_external_root_field(const TFileScanRangeParams* params,
261
0
                                                         const format::ColumnDefinition& column) {
262
0
    if (params == nullptr || !params->__isset.history_schema_info ||
263
0
        params->history_schema_info.empty()) {
264
0
        return nullptr;
265
0
    }
266
0
    const auto* schema = &params->history_schema_info.front();
267
0
    if (params->__isset.current_schema_id) {
268
0
        for (const auto& candidate_schema : params->history_schema_info) {
269
0
            if (candidate_schema.__isset.schema_id &&
270
0
                candidate_schema.schema_id == params->current_schema_id) {
271
0
                schema = &candidate_schema;
272
0
                break;
273
0
            }
274
0
        }
275
0
    }
276
0
    if (!schema->__isset.root_field || !schema->root_field.__isset.fields) {
277
0
        return nullptr;
278
0
    }
279
0
    for (const auto& field_ptr : schema->root_field.fields) {
280
0
        const auto* field = get_field_ptr(field_ptr);
281
0
        if (field == nullptr) {
282
0
            continue;
283
0
        }
284
0
        if (external_field_matches_name(*field, column.name)) {
285
0
            return field;
286
0
        }
287
0
    }
288
0
    return nullptr;
289
0
}
290
291
struct AccessPathNode {
292
    bool project_all = false;
293
    std::map<std::string, AccessPathNode> children;
294
};
295
296
5
void merge_access_path_node(AccessPathNode* dst, const AccessPathNode& src) {
297
5
    DORIS_CHECK(dst != nullptr);
298
5
    if (dst->project_all) {
299
1
        return;
300
1
    }
301
4
    if (src.project_all) {
302
2
        dst->project_all = true;
303
2
        dst->children.clear();
304
2
        return;
305
2
    }
306
2
    for (const auto& [path, child] : src.children) {
307
2
        merge_access_path_node(&dst->children[path], child);
308
2
    }
309
2
}
310
311
void insert_access_path(AccessPathNode* root, const std::vector<std::string>& path,
312
15
                        size_t path_idx) {
313
15
    DORIS_CHECK(root != nullptr);
314
15
    if (root->project_all) {
315
1
        return;
316
1
    }
317
14
    if (path_idx >= path.size()) {
318
6
        root->project_all = true;
319
6
        root->children.clear();
320
6
        return;
321
6
    }
322
8
    insert_access_path(&root->children[path[path_idx]], path, path_idx + 1);
323
8
}
324
325
Status build_nested_children_from_access_node(format::ColumnDefinition* column,
326
                                              const DataTypePtr& type, const AccessPathNode& node,
327
                                              const std::string& path,
328
                                              const format::ColumnDefinition* schema_column);
329
330
Status build_struct_children_from_access_node(format::ColumnDefinition* column,
331
                                              const DataTypeStruct& struct_type,
332
                                              const AccessPathNode& node, const std::string& path,
333
3
                                              const format::ColumnDefinition* schema_column) {
334
3
    DORIS_CHECK(column != nullptr);
335
4
    for (const auto& [child_path, child_node] : node.children) {
336
        // Currently we do not support accessing struct children by position (e.g. "col.0") because it can be ambiguous and error-prone when the struct schema evolves. We only support accessing struct children by name (e.g. "col.child"). If needed, we can consider adding support for position-based access in the future with careful design and consideration.
337
4
        if (child_path == "OFFSET" || child_path == "*" || child_path == "KEYS" ||
338
4
            child_path == "VALUES") {
339
0
            return Status::NotSupported("FileScannerV2 does not support access path {} for slot {}",
340
0
                                        path + "." + child_path, column->name);
341
0
        }
342
343
        // Try to find the child field in the schema column first. If not found, fallback to find the child field in the struct type by name (case-insensitive).
344
4
        const auto* schema_child = find_schema_child_by_path(schema_column, child_path);
345
4
        int32_t field_id = schema_field_id(schema_child);
346
4
        std::string field_name = schema_child == nullptr ? child_path : schema_child->name;
347
4
        DataTypePtr field_type = schema_child == nullptr ? nullptr : schema_child->type;
348
4
        if (field_id < 0 || field_type == nullptr) {
349
4
            for (size_t field_idx = 0; field_idx < struct_type.get_elements().size(); ++field_idx) {
350
4
                if (to_lower(struct_type.get_element_name(field_idx)) == to_lower(field_name)) {
351
3
                    field_id = cast_set<int32_t>(field_idx);
352
3
                    field_name = struct_type.get_element_name(field_idx);
353
3
                    field_type = struct_type.get_element(field_idx);
354
3
                    break;
355
3
                }
356
4
            }
357
3
        }
358
359
4
        if (field_id < 0 || field_type == nullptr) {
360
0
            return Status::NotSupported("FileScannerV2 does not support access path {} for slot {}",
361
0
                                        path + "." + child_path, column->name);
362
0
        }
363
        // TODO: For TVF Parquet files without field ids, this fallback uses the struct ordinal as
364
        // the table child identifier. BY_NAME mapping should instead keep a string identifier and
365
        // let TableColumnMapper resolve the file-local child id from the Parquet schema.
366
4
        auto* child = find_or_add_child(column, field_id, field_name, field_type);
367
4
        RETURN_IF_ERROR(build_nested_children_from_access_node(
368
4
                child, child->type, child_node, path + "." + child_path, schema_child));
369
4
    }
370
3
    return Status::OK();
371
3
}
372
373
Status build_map_children_from_access_node(format::ColumnDefinition* column,
374
                                           const DataTypeMap& map_type, const AccessPathNode& node,
375
                                           const std::string& path,
376
1
                                           const format::ColumnDefinition* schema_column) {
377
1
    DORIS_CHECK(column != nullptr);
378
1
    AccessPathNode key_node;
379
1
    AccessPathNode value_node;
380
1
    bool need_key = false;
381
1
    bool need_value = false;
382
383
3
    for (const auto& [child_path, child_node] : node.children) {
384
3
        if (child_path == "OFFSET") {
385
0
            return Status::NotSupported("FileScannerV2 does not support access path {} for slot {}",
386
0
                                        path + "." + child_path, column->name);
387
0
        }
388
3
        if (child_path == "KEYS") {
389
1
            need_key = true;
390
1
            merge_access_path_node(&key_node, child_node);
391
1
            continue;
392
1
        }
393
2
        if (child_path == "VALUES") {
394
1
            need_key = true;
395
1
            key_node.project_all = true;
396
1
            key_node.children.clear();
397
1
            need_value = true;
398
1
            merge_access_path_node(&value_node, child_node);
399
1
            continue;
400
1
        }
401
1
        if (child_path == "*") {
402
1
            need_key = true;
403
1
            key_node.project_all = true;
404
1
            key_node.children.clear();
405
1
            need_value = true;
406
1
            merge_access_path_node(&value_node, child_node);
407
1
            continue;
408
1
        }
409
0
        return Status::NotSupported("FileScannerV2 does not support access path {} for slot {}",
410
0
                                    path + "." + child_path, column->name);
411
1
    }
412
1
    if (need_key && !need_value) {
413
        // Keep value readable until the downstream map materialization path can construct a table
414
        // Map column from keys only.
415
0
        need_value = true;
416
0
        value_node.project_all = true;
417
0
        value_node.children.clear();
418
0
    }
419
420
1
    DataTypes entry_child_types;
421
1
    Strings entry_child_names;
422
1
    if (need_key) {
423
1
        entry_child_types.push_back(map_type.get_key_type());
424
1
        entry_child_names.push_back("key");
425
1
    }
426
1
    if (need_value) {
427
1
        entry_child_types.push_back(map_type.get_value_type());
428
1
        entry_child_names.push_back("value");
429
1
    }
430
1
    if (entry_child_types.empty()) {
431
0
        return Status::OK();
432
0
    }
433
434
1
    auto entry_type = std::make_shared<DataTypeStruct>(entry_child_types, entry_child_names);
435
1
    auto* entry_child = find_or_add_child(column, 0, "entries", entry_type);
436
1
    const auto* key_schema = schema_column != nullptr && !schema_column->children.empty()
437
1
                                     ? &schema_column->children[0]
438
1
                                     : nullptr;
439
1
    const auto* value_schema = schema_column != nullptr && schema_column->children.size() > 1
440
1
                                       ? &schema_column->children[1]
441
1
                                       : nullptr;
442
1
    if (need_key) {
443
1
        auto* key_child = find_or_add_child(entry_child, 0, "key", map_type.get_key_type());
444
1
        RETURN_IF_ERROR(build_nested_children_from_access_node(key_child, key_child->type, key_node,
445
1
                                                               path + ".KEYS", key_schema));
446
1
    }
447
1
    if (need_value) {
448
1
        auto* value_child = find_or_add_child(entry_child, 1, "value", map_type.get_value_type());
449
1
        RETURN_IF_ERROR(build_nested_children_from_access_node(
450
1
                value_child, value_child->type, value_node, path + ".VALUES", value_schema));
451
1
    }
452
1
    return Status::OK();
453
1
}
454
455
Status build_nested_children_from_access_node(format::ColumnDefinition* column,
456
                                              const DataTypePtr& type, const AccessPathNode& node,
457
                                              const std::string& path,
458
11
                                              const format::ColumnDefinition* schema_column) {
459
11
    DORIS_CHECK(column != nullptr);
460
11
    if (node.project_all || node.children.empty()) {
461
        // If project_all is true or there is no specific child path, we need to project all children of the complex type.
462
6
        return Status::OK();
463
6
    }
464
465
5
    const auto nested_type = remove_nullable(type);
466
5
    switch (nested_type->get_primitive_type()) {
467
3
    case TYPE_STRUCT:
468
3
        return build_struct_children_from_access_node(
469
3
                column, assert_cast<const DataTypeStruct&>(*nested_type), node, path,
470
3
                schema_column);
471
1
    case TYPE_ARRAY: {
472
1
        if (node.children.size() != 1 || !node.children.contains("*")) {
473
0
            return Status::NotSupported("FileScannerV2 does not support access path {} for slot {}",
474
0
                                        path, column->name);
475
0
        }
476
1
        const auto& array_type = assert_cast<const DataTypeArray&>(*nested_type);
477
1
        auto* child = find_or_add_child(column, 0, "element", array_type.get_nested_type());
478
1
        const auto* element_schema = schema_column != nullptr && !schema_column->children.empty()
479
1
                                             ? &schema_column->children[0]
480
1
                                             : nullptr;
481
1
        return build_nested_children_from_access_node(child, child->type, node.children.at("*"),
482
1
                                                      path + ".*", element_schema);
483
1
    }
484
1
    case TYPE_MAP:
485
1
        return build_map_children_from_access_node(
486
1
                column, assert_cast<const DataTypeMap&>(*nested_type), node, path, schema_column);
487
0
    default:
488
0
        return Status::NotSupported("FileScannerV2 does not support access path {} for slot {}",
489
0
                                    path, column->name);
490
5
    }
491
5
}
492
493
Status build_nested_children_from_access_paths(format::ColumnDefinition* column,
494
                                               const TColumnAccessPaths& access_paths,
495
4
                                               const format::ColumnDefinition* schema_column) {
496
4
    DORIS_CHECK(column != nullptr);
497
4
    if (!is_complex_type(remove_nullable(column->type)->get_primitive_type())) {
498
0
        return Status::OK();
499
0
    }
500
501
4
    AccessPathNode root;
502
    // Build tree for AccessPathNode.
503
    // For example, for access paths ["a.b", "a.c", "d"], the tree will be:
504
    // root
505
    // ├── a
506
    // │   ├── b
507
    // │   └── c
508
    // └── d
509
7
    for (const auto& access_path : access_paths) {
510
        // TODO: Support META access paths if needed. Currently FileScannerV2 only supports DATA access paths.
511
7
        if (access_path.type != TAccessPathType::DATA || !access_path.__isset.data_access_path) {
512
0
            return Status::NotSupported("FileScannerV2 only supports DATA access paths for slot {}",
513
0
                                        column->name);
514
0
        }
515
7
        const auto& path = access_path.data_access_path.path;
516
7
        if (path.empty()) {
517
0
            insert_access_path(&root, path, 0);
518
0
            continue;
519
0
        }
520
7
        int32_t top_level_id = -1;
521
7
        if (to_lower(path.front()) != to_lower(column->name) &&
522
7
            (!parse_non_negative_int(path.front(), &top_level_id) ||
523
1
             !column->has_identifier_field_id() ||
524
1
             top_level_id != column->get_identifier_field_id())) {
525
0
            return Status::NotSupported("FileScannerV2 access path {} does not match slot {}",
526
0
                                        access_path_to_string(path), column->name);
527
0
        }
528
7
        insert_access_path(&root, path, 1);
529
7
    }
530
    // Recursively build nested children for the column based on the AccessPathNode tree.
531
4
    return build_nested_children_from_access_node(column, column->type, root, column->name,
532
4
                                                  schema_column);
533
4
}
534
535
Status build_nested_children_from_access_paths(format::ColumnDefinition* column,
536
                                               const SlotDescriptor* slot_desc,
537
0
                                               const format::ColumnDefinition* schema_column) {
538
0
    DORIS_CHECK(column != nullptr);
539
0
    DORIS_CHECK(slot_desc != nullptr);
540
0
    return build_nested_children_from_access_paths(column, slot_desc->all_access_paths(),
541
0
                                                   schema_column);
542
0
}
543
544
Status rewrite_slot_refs_to_global_index(
545
        VExprSPtr* expr,
546
0
        const std::unordered_map<int32_t, format::GlobalIndex>& slot_id_to_global_index) {
547
0
    DORIS_CHECK(expr != nullptr);
548
0
    if (*expr == nullptr) {
549
0
        return Status::OK();
550
0
    }
551
0
    if ((*expr)->is_slot_ref()) {
552
0
        const auto* slot_ref = assert_cast<const VSlotRef*>(expr->get());
553
0
        const auto global_index_it = slot_id_to_global_index.find(slot_ref->slot_id());
554
0
        if (global_index_it == slot_id_to_global_index.end()) {
555
0
            DORIS_CHECK(slot_ref->slot_id() >= 0);
556
0
            const auto global_index = format::GlobalIndex(cast_set<size_t>(slot_ref->slot_id()));
557
0
            *expr = TableSlotRef::create_shared(cast_set<int>(global_index.value()),
558
0
                                                cast_set<int>(global_index.value()), -1,
559
0
                                                slot_ref->data_type(), slot_ref->column_name());
560
0
            RETURN_IF_ERROR(expr->get()->prepare(nullptr, RowDescriptor(), nullptr));
561
0
            return Status::OK();
562
0
        }
563
0
        const auto global_index = global_index_it->second;
564
0
        *expr = TableSlotRef::create_shared(cast_set<int>(global_index.value()),
565
0
                                            cast_set<int>(global_index.value()), -1,
566
0
                                            slot_ref->data_type(), slot_ref->column_name());
567
0
        RETURN_IF_ERROR(expr->get()->prepare(nullptr, RowDescriptor(), nullptr));
568
0
        return Status::OK();
569
0
    }
570
0
    auto children = (*expr)->children();
571
0
    for (auto& child : children) {
572
0
        if (child == nullptr) {
573
0
            continue;
574
0
        }
575
0
        RETURN_IF_ERROR(rewrite_slot_refs_to_global_index(&child, slot_id_to_global_index));
576
0
    }
577
0
    (*expr)->set_children(std::move(children));
578
0
    return Status::OK();
579
0
}
580
581
} // namespace
582
583
#ifdef BE_TEST
584
Status FileScannerV2::TEST_build_nested_children_from_access_paths(
585
3
        format::ColumnDefinition* column, const TColumnAccessPaths& access_paths) {
586
3
    return build_nested_children_from_access_paths(column, access_paths, nullptr);
587
3
}
588
589
Status FileScannerV2::TEST_build_nested_children_from_access_paths(
590
        format::ColumnDefinition* column, const TColumnAccessPaths& access_paths,
591
1
        const format::ColumnDefinition* schema_column) {
592
1
    return build_nested_children_from_access_paths(column, access_paths, schema_column);
593
1
}
594
#endif
595
596
// TODO: Only support parquet format now
597
8
bool FileScannerV2::is_supported(const TFileScanRangeParams& params, const TFileRangeDesc& range) {
598
8
    return get_range_format_type(params, range) == TFileFormatType::FORMAT_PARQUET &&
599
8
           is_supported_table_format(range);
600
8
}
601
602
FileScannerV2::FileScannerV2(RuntimeState* state, FileScanLocalState* local_state, int64_t limit,
603
                             std::shared_ptr<SplitSourceConnector> split_source,
604
                             RuntimeProfile* profile, ShardedKVCache* kv_cache,
605
                             const std::unordered_map<std::string, int>* colname_to_slot_id)
606
0
        : Scanner(state, local_state, limit, profile),
607
0
          _split_source(std::move(split_source)),
608
0
          _kv_cache(kv_cache) {
609
0
    (void)colname_to_slot_id;
610
0
    if (state->get_query_ctx() != nullptr &&
611
0
        state->get_query_ctx()->file_scan_range_params_map.count(local_state->parent_id()) > 0) {
612
0
        _params = &(state->get_query_ctx()->file_scan_range_params_map[local_state->parent_id()]);
613
0
    } else {
614
0
        _params = _split_source->get_params();
615
0
    }
616
0
}
617
618
0
Status FileScannerV2::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) {
619
0
    RETURN_IF_ERROR(Scanner::init(state, conjuncts));
620
0
    _get_block_timer =
621
0
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerV2GetBlockTime", 1);
622
0
    _file_counter =
623
0
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT, 1);
624
0
    _file_read_bytes_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
625
0
                                                      "FileReadBytes", TUnit::BYTES, 1);
626
0
    _file_read_calls_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
627
0
                                                      "FileReadCalls", TUnit::UNIT, 1);
628
0
    _file_read_time_counter =
629
0
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileReadTime", 1);
630
0
    _file_cache_statistics = std::make_unique<io::FileCacheStatistics>();
631
0
    _file_reader_stats = std::make_unique<io::FileReaderStats>();
632
0
    RETURN_IF_ERROR(_init_io_ctx());
633
0
    _io_ctx->file_cache_stats = _file_cache_statistics.get();
634
0
    _io_ctx->file_reader_stats = _file_reader_stats.get();
635
0
    _io_ctx->is_disposable = _state->query_options().disable_file_cache;
636
0
    return Status::OK();
637
0
}
638
639
0
Status FileScannerV2::_open_impl(RuntimeState* state) {
640
0
    RETURN_IF_CANCELLED(state);
641
0
    RETURN_IF_ERROR(Scanner::_open_impl(state));
642
0
    RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range));
643
0
    if (_first_scan_range) {
644
0
        RETURN_IF_ERROR(_init_expr_ctxes());
645
0
    }
646
0
    return Status::OK();
647
0
}
648
649
0
Status FileScannerV2::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
650
0
    while (true) {
651
0
        RETURN_IF_CANCELLED(state);
652
0
        if (_table_reader == nullptr) {
653
0
            RETURN_IF_ERROR(_prepare_next_split(eof));
654
0
            if (*eof) {
655
0
                return Status::OK();
656
0
            }
657
0
        }
658
659
0
        {
660
0
            SCOPED_TIMER(_get_block_timer);
661
0
            RETURN_IF_ERROR(_table_reader->get_block(block, eof));
662
0
        }
663
0
        if (*eof) {
664
0
            RETURN_IF_ERROR(_table_reader->close());
665
0
            _table_reader.reset();
666
0
            _state->update_num_finished_scan_range(1);
667
0
            *eof = false;
668
0
            continue;
669
0
        }
670
0
        return Status::OK();
671
0
    }
672
0
}
673
674
0
Status FileScannerV2::_prepare_next_split(bool* eos) {
675
0
    if (_table_reader != nullptr) {
676
0
        RETURN_IF_ERROR(_table_reader->close());
677
0
        _table_reader.reset();
678
0
        _state->update_num_finished_scan_range(1);
679
0
    }
680
681
0
    bool has_next = _first_scan_range;
682
0
    if (!_first_scan_range) {
683
0
        RETURN_IF_ERROR(_split_source->get_next(&has_next, &_current_range));
684
0
    }
685
0
    _first_scan_range = false;
686
0
    if (!has_next || _should_stop) {
687
0
        *eos = true;
688
0
        return Status::OK();
689
0
    }
690
0
    _current_range_path = _current_range.path;
691
0
    RETURN_IF_ERROR(_create_table_reader(_current_range));
692
0
    RETURN_IF_ERROR(_prepare_table_reader_split(_current_range));
693
0
    COUNTER_UPDATE(_file_counter, 1);
694
0
    *eos = false;
695
0
    return Status::OK();
696
0
}
697
698
0
Status FileScannerV2::_create_table_reader(const TFileRangeDesc& range) {
699
0
    const auto format_type = _get_current_format_type();
700
0
    format::FileFormat file_format;
701
0
    RETURN_IF_ERROR(_to_file_format(format_type, &file_format));
702
0
    RETURN_IF_ERROR(_create_table_reader_for_format(range));
703
0
    DORIS_CHECK(_table_reader != nullptr);
704
705
0
    format::TableColumnPredicates table_column_predicates;
706
0
    RETURN_IF_ERROR(_build_table_column_predicates(&table_column_predicates));
707
0
    VExprContextSPtrs table_conjuncts;
708
0
    RETURN_IF_ERROR(_build_table_conjuncts(&table_conjuncts));
709
0
    RETURN_IF_ERROR(_table_reader->init({
710
0
            .projected_columns = _projected_columns,
711
0
            .column_predicates = std::move(table_column_predicates),
712
0
            .conjuncts = std::move(table_conjuncts),
713
0
            .format = file_format,
714
0
            .scan_params = const_cast<TFileScanRangeParams*>(_params),
715
0
            .io_ctx = _io_ctx,
716
0
            .runtime_state = _state,
717
0
            .scanner_profile = _local_state->scanner_profile(),
718
0
            .allow_missing_columns = false, // TODO
719
0
            .push_down_agg_type = _local_state->get_push_down_agg_type(),
720
0
            .profile = nullptr, // TODO
721
0
    }));
722
0
    return Status::OK();
723
0
}
724
725
0
Status FileScannerV2::_create_table_reader_for_format(const TFileRangeDesc& range) {
726
0
    const auto table_format = table_format_name(range);
727
0
    if (table_format == "NotSet" || table_format == "tvf") {
728
0
        _table_reader = std::make_unique<format::TableReader>();
729
0
    } else if (table_format == "hive") {
730
0
        _table_reader = hive::HiveReader::create_unique();
731
0
    } else if (table_format == "iceberg") {
732
0
        _table_reader = std::make_unique<iceberg::IcebergTableReader>();
733
0
    } else if (table_format == "paimon") {
734
0
        _table_reader = paimon::PaimonReader::create_unique();
735
0
    } else {
736
0
        return Status::NotSupported("FileScannerV2 does not support table format {}", table_format);
737
0
    }
738
0
    return Status::OK();
739
0
}
740
741
0
Status FileScannerV2::_prepare_table_reader_split(const TFileRangeDesc& range) {
742
0
    std::map<std::string, Field> partition_values;
743
0
    RETURN_IF_ERROR(_generate_partition_values(range, &partition_values));
744
0
    RETURN_IF_ERROR(_table_reader->prepare_split({
745
0
            .partition_values = std::move(partition_values),
746
0
            .cache = _kv_cache,
747
0
            .current_range = range,
748
0
    }));
749
0
    return Status::OK();
750
0
}
751
752
Status FileScannerV2::_generate_partition_values(
753
0
        const TFileRangeDesc& range, std::map<std::string, Field>* partition_values) const {
754
0
    DORIS_CHECK(partition_values != nullptr);
755
0
    partition_values->clear();
756
0
    if (!range.__isset.columns_from_path_keys || !range.__isset.columns_from_path) {
757
0
        return Status::OK();
758
0
    }
759
0
    DORIS_CHECK(range.columns_from_path_keys.size() == range.columns_from_path.size());
760
0
    for (size_t idx = 0; idx < range.columns_from_path_keys.size(); ++idx) {
761
0
        const auto& key = range.columns_from_path_keys[idx];
762
0
        const auto it = _partition_slot_descs.find(key);
763
0
        if (it == _partition_slot_descs.end()) {
764
0
            continue;
765
0
        }
766
0
        const auto& value = range.columns_from_path[idx];
767
0
        const bool is_null = range.__isset.columns_from_path_is_null &&
768
0
                             idx < range.columns_from_path_is_null.size() &&
769
0
                             range.columns_from_path_is_null[idx];
770
0
        Field field;
771
0
        DORIS_CHECK(it->second.slot_desc != nullptr);
772
0
        RETURN_IF_ERROR(_parse_partition_value(it->second.slot_desc, value, is_null, &field));
773
0
        partition_values->emplace(it->second.canonical_name, std::move(field));
774
0
    }
775
0
    return Status::OK();
776
0
}
777
778
Status FileScannerV2::_parse_partition_value(const SlotDescriptor* slot_desc,
779
                                             const std::string& value, bool is_null,
780
0
                                             Field* field) const {
781
0
    DORIS_CHECK(slot_desc != nullptr);
782
0
    DORIS_CHECK(field != nullptr);
783
0
    if (is_null) {
784
0
        *field = Field::create_field<TYPE_NULL>(Null());
785
0
        return Status::OK();
786
0
    }
787
0
    const auto data_type = remove_nullable(slot_desc->get_data_type_ptr());
788
0
    auto column = data_type->create_column();
789
0
    auto serde = data_type->get_serde();
790
0
    DataTypeSerDe::FormatOptions options;
791
0
    options.converted_from_string = true;
792
0
    StringRef ref(value.data(), value.size());
793
0
    RETURN_IF_ERROR(serde->from_string(ref, *column, options));
794
0
    DORIS_CHECK(column->size() == 1);
795
0
    *field = (*column)[0];
796
0
    return Status::OK();
797
0
}
798
799
0
Status FileScannerV2::_init_expr_ctxes() {
800
0
    _slot_id_to_desc.clear();
801
0
    _slot_id_to_global_index.clear();
802
0
    _partition_slot_descs.clear();
803
0
    for (const auto* slot_desc : _output_tuple_desc->slots()) {
804
0
        _slot_id_to_desc.emplace(slot_desc->id(), slot_desc);
805
0
    }
806
0
    RETURN_IF_ERROR(_build_projected_columns());
807
0
    return Status::OK();
808
0
}
809
810
0
Status FileScannerV2::_build_projected_columns() {
811
0
    _projected_columns.clear();
812
0
    _projected_columns.reserve(_params->required_slots.size());
813
814
0
    for (size_t slot_idx = 0; slot_idx < _params->required_slots.size(); ++slot_idx) {
815
0
        const auto& slot_info = _params->required_slots[slot_idx];
816
0
        const auto it = _slot_id_to_desc.find(slot_info.slot_id);
817
0
        if (it == _slot_id_to_desc.end()) {
818
0
            return Status::InternalError("Unknown source slot descriptor, slot_id={}",
819
0
                                         slot_info.slot_id);
820
0
        }
821
0
        auto column = _build_table_column(it->second);
822
0
        RETURN_IF_ERROR(_build_default_expr(slot_info, &column.default_expr));
823
0
        std::optional<format::ColumnDefinition> schema_column;
824
0
        if (const auto* schema_field = find_external_root_field(_params, column);
825
0
            schema_field != nullptr) {
826
            // If the column has a matching root field in the schema, use the schema field to build the column's nested children.
827
            // NOTICE: The nested `schema_column` is completed without projection.
828
0
            schema_column = build_schema_column_from_external_field(*schema_field, column.type);
829
0
            column.identifier = schema_column->identifier;
830
0
            column.name_mapping = schema_column->name_mapping;
831
0
        }
832
        // Build the column's nested children based on the column's access paths and the schema column (if exists).
833
        // The access paths are generated from the slot's access path expressions which means a projected column can have a subset of the schema column's nested children.
834
0
        RETURN_IF_ERROR(build_nested_children_from_access_paths(
835
0
                &column, it->second, schema_column.has_value() ? &*schema_column : nullptr));
836
0
        if (is_partition_slot(slot_info)) {
837
0
            column.is_partition_key = true;
838
0
            _partition_slot_descs.emplace(
839
0
                    column.name, PartitionSlotInfo {.slot_desc = it->second,
840
0
                                                    .canonical_name = column.name});
841
0
            for (const auto& alias : column.name_mapping) {
842
0
                _partition_slot_descs.emplace(
843
0
                        alias, PartitionSlotInfo {.slot_desc = it->second,
844
0
                                                  .canonical_name = column.name});
845
0
            }
846
0
        }
847
0
        const auto global_index = format::GlobalIndex(slot_idx);
848
0
        _slot_id_to_global_index.emplace(slot_info.slot_id, global_index);
849
0
        _projected_columns.push_back(std::move(column));
850
0
    }
851
0
    return Status::OK();
852
0
}
853
854
Status FileScannerV2::_build_default_expr(const TFileScanSlotInfo& slot_info,
855
0
                                          VExprContextSPtr* ctx) const {
856
0
    DORIS_CHECK(ctx != nullptr);
857
0
    if (slot_info.__isset.default_value_expr && !slot_info.default_value_expr.nodes.empty()) {
858
0
        return VExpr::create_expr_tree(slot_info.default_value_expr, *ctx);
859
0
    }
860
861
0
    if (_params->__isset.default_value_of_src_slot) {
862
0
        const auto it = _params->default_value_of_src_slot.find(slot_info.slot_id);
863
0
        if (it != _params->default_value_of_src_slot.end() && !it->second.nodes.empty()) {
864
0
            return VExpr::create_expr_tree(it->second, *ctx);
865
0
        }
866
0
    }
867
0
    return Status::OK();
868
0
}
869
870
0
format::ColumnDefinition FileScannerV2::_build_table_column(const SlotDescriptor* slot_desc) {
871
0
    DORIS_CHECK(slot_desc != nullptr);
872
0
    format::ColumnDefinition column;
873
    // TODO(gabriel): why always BY_NAME here?
874
0
    column.identifier = Field::create_field<TYPE_STRING>(slot_desc->col_name());
875
0
    column.name = slot_desc->col_name();
876
0
    column.type = slot_desc->get_data_type_ptr();
877
0
    return column;
878
0
}
879
880
Status FileScannerV2::_build_table_column_predicates(
881
0
        format::TableColumnPredicates* predicates) const {
882
0
    DORIS_CHECK(predicates != nullptr);
883
0
    predicates->clear();
884
0
    const auto& slot_predicates = _local_state->cast<FileScanLocalState>()._slot_id_to_predicates;
885
0
    for (const auto& [slot_id, slot_predicate_list] : slot_predicates) {
886
0
        const auto it = _slot_id_to_desc.find(slot_id);
887
0
        if (it == _slot_id_to_desc.end()) {
888
0
            continue;
889
0
        }
890
0
        const auto global_index_it = _slot_id_to_global_index.find(slot_id);
891
0
        if (global_index_it == _slot_id_to_global_index.end()) {
892
0
            continue;
893
0
        }
894
0
        (*predicates)[global_index_it->second] = slot_predicate_list;
895
0
    }
896
0
    return Status::OK();
897
0
}
898
899
0
Status FileScannerV2::_build_table_conjuncts(VExprContextSPtrs* conjuncts) const {
900
0
    DORIS_CHECK(conjuncts != nullptr);
901
0
    conjuncts->clear();
902
0
    conjuncts->reserve(_conjuncts.size());
903
0
    for (const auto& conjunct : _conjuncts) {
904
0
        VExprSPtr root;
905
0
        RETURN_IF_ERROR(format::clone_table_expr_tree(conjunct->root(), &root));
906
0
        RETURN_IF_ERROR(rewrite_slot_refs_to_global_index(&root, _slot_id_to_global_index));
907
0
        conjuncts->push_back(VExprContext::create_shared(std::move(root)));
908
0
    }
909
0
    return Status::OK();
910
0
}
911
912
0
TFileFormatType::type FileScannerV2::_get_current_format_type() const {
913
0
    return get_range_format_type(*_params, _current_range);
914
0
}
915
916
Status FileScannerV2::_to_file_format(TFileFormatType::type format_type,
917
0
                                      format::FileFormat* file_format) {
918
0
    DORIS_CHECK(file_format != nullptr);
919
0
    switch (format_type) {
920
0
    case TFileFormatType::FORMAT_PARQUET:
921
0
        *file_format = format::FileFormat::PARQUET;
922
0
        return Status::OK();
923
0
    default:
924
0
        return Status::NotSupported("FileScannerV2 does not support file format {}",
925
0
                                    to_string(format_type));
926
0
    }
927
0
}
928
929
0
Status FileScannerV2::_init_io_ctx() {
930
0
    _io_ctx = std::make_shared<io::IOContext>();
931
0
    _io_ctx->query_id = &_state->query_id();
932
0
    return Status::OK();
933
0
}
934
935
0
Status FileScannerV2::close(RuntimeState* state) {
936
0
    if (!_try_close()) {
937
0
        return Status::OK();
938
0
    }
939
0
    if (_table_reader != nullptr) {
940
0
        RETURN_IF_ERROR(_table_reader->close());
941
0
        _table_reader.reset();
942
0
    }
943
0
    return Scanner::close(state);
944
0
}
945
946
0
void FileScannerV2::try_stop() {
947
0
    _should_stop = true;
948
0
    if (_table_reader != nullptr) {
949
0
        static_cast<void>(_table_reader->close());
950
0
    }
951
0
}
952
953
0
void FileScannerV2::update_realtime_counters() {
954
0
    if (_file_reader_stats == nullptr) {
955
0
        return;
956
0
    }
957
0
    const int64_t bytes_read = _file_reader_stats->read_bytes;
958
0
    COUNTER_SET(_file_read_bytes_counter, bytes_read);
959
0
    COUNTER_SET(_file_read_calls_counter, cast_set<int64_t>(_file_reader_stats->read_calls));
960
0
    COUNTER_SET(_file_read_time_counter, cast_set<int64_t>(_file_reader_stats->read_time_ns));
961
0
}
962
963
} // namespace doris