Coverage Report

Created: 2026-05-17 18:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/parquet/schema_desc.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/parquet/schema_desc.h"
19
20
#include <algorithm>
21
#include <ostream>
22
#include <utility>
23
24
#include "common/cast_set.h"
25
#include "common/logging.h"
26
#include "core/data_type/data_type_array.h"
27
#include "core/data_type/data_type_factory.hpp"
28
#include "core/data_type/data_type_map.h"
29
#include "core/data_type/data_type_struct.h"
30
#include "core/data_type/data_type_variant.h"
31
#include "core/data_type/define_primitive_type.h"
32
#include "format/generic_reader.h"
33
#include "format/table/table_schema_change_helper.h"
34
#include "util/slice.h"
35
#include "util/string_util.h"
36
37
namespace doris {
38
39
2.58k
static bool is_group_node(const tparquet::SchemaElement& schema) {
40
2.58k
    return schema.num_children > 0;
41
2.58k
}
42
43
488
static bool is_list_node(const tparquet::SchemaElement& schema) {
44
488
    return schema.__isset.converted_type && schema.converted_type == tparquet::ConvertedType::LIST;
45
488
}
46
47
570
static bool is_map_node(const tparquet::SchemaElement& schema) {
48
570
    return schema.__isset.converted_type &&
49
570
           (schema.converted_type == tparquet::ConvertedType::MAP ||
50
244
            schema.converted_type == tparquet::ConvertedType::MAP_KEY_VALUE);
51
570
}
52
53
2.29k
static bool is_repeated_node(const tparquet::SchemaElement& schema) {
54
2.29k
    return schema.__isset.repetition_type &&
55
2.29k
           schema.repetition_type == tparquet::FieldRepetitionType::REPEATED;
56
2.29k
}
57
58
82
static bool is_required_node(const tparquet::SchemaElement& schema) {
59
82
    return schema.__isset.repetition_type &&
60
82
           schema.repetition_type == tparquet::FieldRepetitionType::REQUIRED;
61
82
}
62
63
2.38k
static bool is_optional_node(const tparquet::SchemaElement& schema) {
64
2.38k
    return schema.__isset.repetition_type &&
65
2.38k
           schema.repetition_type == tparquet::FieldRepetitionType::OPTIONAL;
66
2.38k
}
67
68
580
static bool is_variant_node(const tparquet::SchemaElement& schema) {
69
580
    return schema.__isset.logicalType && schema.logicalType.__isset.VARIANT;
70
580
}
71
72
31
static void mark_variant_subfields(FieldSchema* field) {
73
31
    field->is_in_variant = true;
74
31
    for (auto& child : field->children) {
75
26
        mark_variant_subfields(&child);
76
26
    }
77
31
}
78
79
18
static bool is_unannotated_binary_field(const FieldSchema& field) {
80
18
    return field.physical_type == tparquet::Type::BYTE_ARRAY &&
81
18
           !field.parquet_schema.__isset.logicalType &&
82
18
           !field.parquet_schema.__isset.converted_type;
83
18
}
84
85
162
static int num_children_node(const tparquet::SchemaElement& schema) {
86
162
    return schema.__isset.num_children ? schema.num_children : 0;
87
162
}
88
89
/**
90
 * `repeated_parent_def_level` is the definition level of the first ancestor node whose repetition_type equals REPEATED.
91
 * Empty array/map values are not stored in doris columns, so have to use `repeated_parent_def_level` to skip the
92
 * empty or null values in ancestor node.
93
 *
94
 * For instance, considering an array of strings with 3 rows like the following:
95
 * null, [], [a, b, c]
96
 * We can store four elements in data column: null, a, b, c
97
 * and the offsets column is: 1, 1, 4
98
 * and the null map is: 1, 0, 0
99
 * For the i-th row in array column: range from `offsets[i - 1]` until `offsets[i]` represents the elements in this row,
100
 * so we can't store empty array/map values in doris data column.
101
 * As a comparison, spark does not require `repeated_parent_def_level`,
102
 * because the spark column stores empty array/map values , and use anther length column to indicate empty values.
103
 * Please reference: https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
104
 *
105
 * Furthermore, we can also avoid store null array/map values in doris data column.
106
 * The same three rows as above, We can only store three elements in data column: a, b, c
107
 * and the offsets column is: 0, 0, 3
108
 * and the null map is: 1, 0, 0
109
 *
110
 * Inherit the repetition and definition level from parent node, if the parent node is repeated,
111
 * we should set repeated_parent_def_level = definition_level, otherwise as repeated_parent_def_level.
112
 * @param parent parent node
113
 * @param repeated_parent_def_level the first ancestor node whose repetition_type equals REPEATED
114
 */
115
580
static void set_child_node_level(FieldSchema* parent, int16_t repeated_parent_def_level) {
116
1.20k
    for (auto& child : parent->children) {
117
1.20k
        child.repetition_level = parent->repetition_level;
118
1.20k
        child.definition_level = parent->definition_level;
119
1.20k
        child.repeated_parent_def_level = repeated_parent_def_level;
120
1.20k
    }
121
580
}
122
123
162
static bool is_struct_list_node(const tparquet::SchemaElement& schema) {
124
162
    const std::string& name = schema.name;
125
162
    static const Slice array_slice("array", 5);
126
162
    static const Slice tuple_slice("_tuple", 6);
127
162
    Slice slice(name);
128
162
    return slice == array_slice || slice.ends_with(tuple_slice);
129
162
}
130
131
0
std::string FieldSchema::debug_string() const {
132
0
    std::stringstream ss;
133
0
    ss << "FieldSchema(name=" << name << ", R=" << repetition_level << ", D=" << definition_level;
134
0
    if (children.size() > 0) {
135
0
        ss << ", type=" << data_type->get_name() << ", children=[";
136
0
        for (int i = 0; i < children.size(); ++i) {
137
0
            if (i != 0) {
138
0
                ss << ", ";
139
0
            }
140
0
            ss << children[i].debug_string();
141
0
        }
142
0
        ss << "]";
143
0
    } else {
144
0
        ss << ", physical_type=" << physical_type;
145
0
        ss << " , doris_type=" << data_type->get_name();
146
0
    }
147
0
    ss << ")";
148
0
    return ss.str();
149
0
}
150
151
120
Status FieldDescriptor::parse_from_thrift(const std::vector<tparquet::SchemaElement>& t_schemas) {
152
120
    if (t_schemas.size() == 0 || !is_group_node(t_schemas[0])) {
153
0
        return Status::InvalidArgument("Wrong parquet root schema element");
154
0
    }
155
120
    const auto& root_schema = t_schemas[0];
156
120
    _fields.resize(root_schema.num_children);
157
120
    _next_schema_pos = 1;
158
159
1.29k
    for (int i = 0; i < root_schema.num_children; ++i) {
160
1.18k
        RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos, &_fields[i]));
161
1.17k
        if (_name_to_field.find(_fields[i].name) != _name_to_field.end()) {
162
0
            return Status::InvalidArgument("Duplicated field name: {}", _fields[i].name);
163
0
        }
164
1.17k
        _name_to_field.emplace(_fields[i].name, &_fields[i]);
165
1.17k
    }
166
167
115
    if (_next_schema_pos != t_schemas.size()) {
168
0
        return Status::InvalidArgument("Remaining {} unparsed schema elements",
169
0
                                       t_schemas.size() - _next_schema_pos);
170
0
    }
171
172
115
    return Status::OK();
173
115
}
174
175
Status FieldDescriptor::parse_node_field(const std::vector<tparquet::SchemaElement>& t_schemas,
176
2.38k
                                         size_t curr_pos, FieldSchema* node_field) {
177
2.38k
    if (curr_pos >= t_schemas.size()) {
178
0
        return Status::InvalidArgument("Out-of-bounds index of schema elements");
179
0
    }
180
2.38k
    auto& t_schema = t_schemas[curr_pos];
181
2.38k
    if (is_group_node(t_schema)) {
182
        // nested structure or nullable list
183
580
        return parse_group_field(t_schemas, curr_pos, node_field);
184
580
    }
185
1.80k
    if (is_repeated_node(t_schema)) {
186
        // repeated <primitive-type> <name> (LIST)
187
        // produce required list<element>
188
0
        node_field->repetition_level++;
189
0
        node_field->definition_level++;
190
0
        node_field->children.resize(1);
191
0
        set_child_node_level(node_field, node_field->definition_level);
192
0
        auto child = &node_field->children[0];
193
0
        parse_physical_field(t_schema, false, child);
194
195
0
        node_field->name = t_schema.name;
196
0
        node_field->lower_case_name = to_lower(t_schema.name);
197
0
        node_field->data_type = std::make_shared<DataTypeArray>(make_nullable(child->data_type));
198
0
        _next_schema_pos = curr_pos + 1;
199
0
        node_field->field_id = t_schema.__isset.field_id ? t_schema.field_id : -1;
200
1.80k
    } else {
201
1.80k
        bool is_optional = is_optional_node(t_schema);
202
1.80k
        if (is_optional) {
203
1.67k
            node_field->definition_level++;
204
1.67k
        }
205
1.80k
        parse_physical_field(t_schema, is_optional, node_field);
206
1.80k
        _next_schema_pos = curr_pos + 1;
207
1.80k
    }
208
1.80k
    return Status::OK();
209
2.38k
}
210
211
void FieldDescriptor::parse_physical_field(const tparquet::SchemaElement& physical_schema,
212
1.80k
                                           bool is_nullable, FieldSchema* physical_field) {
213
1.80k
    physical_field->name = physical_schema.name;
214
1.80k
    physical_field->lower_case_name = to_lower(physical_field->name);
215
1.80k
    physical_field->parquet_schema = physical_schema;
216
1.80k
    physical_field->physical_type = physical_schema.type;
217
1.80k
    physical_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
218
1.80k
    _physical_fields.push_back(physical_field);
219
1.80k
    physical_field->physical_column_index = cast_set<int>(_physical_fields.size() - 1);
220
1.80k
    auto type = get_doris_type(physical_schema, is_nullable);
221
1.80k
    physical_field->data_type = type.first;
222
1.80k
    physical_field->is_type_compatibility = type.second;
223
1.80k
    physical_field->field_id = physical_schema.__isset.field_id ? physical_schema.field_id : -1;
224
1.80k
}
225
226
std::pair<DataTypePtr, bool> FieldDescriptor::get_doris_type(
227
1.80k
        const tparquet::SchemaElement& physical_schema, bool nullable) {
228
1.80k
    std::pair<DataTypePtr, bool> ans = {std::make_shared<DataTypeNothing>(), false};
229
1.80k
    try {
230
1.80k
        if (physical_schema.__isset.logicalType) {
231
761
            ans = convert_to_doris_type(physical_schema.logicalType, nullable);
232
1.04k
        } else if (physical_schema.__isset.converted_type) {
233
226
            ans = convert_to_doris_type(physical_schema, nullable);
234
226
        }
235
1.80k
    } catch (...) {
236
        // now the Not supported exception are ignored
237
        // so those byte_array maybe be treated as varbinary(now) : string(before)
238
0
    }
239
1.80k
    if (ans.first->get_primitive_type() == PrimitiveType::INVALID_TYPE) {
240
817
        switch (physical_schema.type) {
241
96
        case tparquet::Type::BOOLEAN:
242
96
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_BOOLEAN, nullable);
243
96
            break;
244
198
        case tparquet::Type::INT32:
245
198
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_INT, nullable);
246
198
            break;
247
135
        case tparquet::Type::INT64:
248
135
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_BIGINT, nullable);
249
135
            break;
250
14
        case tparquet::Type::INT96:
251
14
            if (_enable_mapping_timestamp_tz) {
252
                // treat INT96 as TIMESTAMPTZ
253
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_TIMESTAMPTZ, nullable,
254
0
                                                                         0, 6);
255
14
            } else {
256
                // in most cases, it's a nano timestamp
257
14
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_DATETIMEV2, nullable,
258
14
                                                                         0, 6);
259
14
            }
260
14
            break;
261
96
        case tparquet::Type::FLOAT:
262
96
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_FLOAT, nullable);
263
96
            break;
264
217
        case tparquet::Type::DOUBLE:
265
217
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_DOUBLE, nullable);
266
217
            break;
267
61
        case tparquet::Type::BYTE_ARRAY:
268
61
            if (_enable_mapping_varbinary) {
269
                // if physical_schema not set logicalType and converted_type,
270
                // we treat BYTE_ARRAY as VARBINARY by default, so that we can read all data directly.
271
22
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_VARBINARY, nullable);
272
39
            } else {
273
39
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
274
39
            }
275
61
            break;
276
0
        case tparquet::Type::FIXED_LEN_BYTE_ARRAY:
277
0
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
278
0
            break;
279
0
        default:
280
0
            throw Exception(Status::InternalError("Not supported parquet logicalType{}",
281
0
                                                  physical_schema.type));
282
0
            break;
283
817
        }
284
817
    }
285
1.80k
    return ans;
286
1.80k
}
287
288
std::pair<DataTypePtr, bool> FieldDescriptor::convert_to_doris_type(
289
761
        tparquet::LogicalType logicalType, bool nullable) {
290
761
    std::pair<DataTypePtr, bool> ans = {std::make_shared<DataTypeNothing>(), false};
291
761
    bool& is_type_compatibility = ans.second;
292
761
    if (logicalType.__isset.STRING) {
293
509
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
294
509
    } else if (logicalType.__isset.DECIMAL) {
295
118
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_DECIMAL128I, nullable,
296
118
                                                                 logicalType.DECIMAL.precision,
297
118
                                                                 logicalType.DECIMAL.scale);
298
134
    } else if (logicalType.__isset.DATE) {
299
59
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_DATEV2, nullable);
300
75
    } else if (logicalType.__isset.INTEGER) {
301
0
        if (logicalType.INTEGER.isSigned) {
302
0
            if (logicalType.INTEGER.bitWidth <= 8) {
303
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_TINYINT, nullable);
304
0
            } else if (logicalType.INTEGER.bitWidth <= 16) {
305
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_SMALLINT, nullable);
306
0
            } else if (logicalType.INTEGER.bitWidth <= 32) {
307
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_INT, nullable);
308
0
            } else {
309
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_BIGINT, nullable);
310
0
            }
311
0
        } else {
312
0
            is_type_compatibility = true;
313
0
            if (logicalType.INTEGER.bitWidth <= 8) {
314
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_SMALLINT, nullable);
315
0
            } else if (logicalType.INTEGER.bitWidth <= 16) {
316
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_INT, nullable);
317
0
            } else if (logicalType.INTEGER.bitWidth <= 32) {
318
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_BIGINT, nullable);
319
0
            } else {
320
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_LARGEINT, nullable);
321
0
            }
322
0
        }
323
75
    } else if (logicalType.__isset.TIME) {
324
0
        ans.first = DataTypeFactory::instance().create_data_type(
325
0
                TYPE_TIMEV2, nullable, 0, logicalType.TIME.unit.__isset.MILLIS ? 3 : 6);
326
75
    } else if (logicalType.__isset.TIMESTAMP) {
327
71
        if (_enable_mapping_timestamp_tz) {
328
0
            if (logicalType.TIMESTAMP.isAdjustedToUTC) {
329
                // treat TIMESTAMP with isAdjustedToUTC as TIMESTAMPTZ
330
0
                ans.first = DataTypeFactory::instance().create_data_type(
331
0
                        TYPE_TIMESTAMPTZ, nullable, 0,
332
0
                        logicalType.TIMESTAMP.unit.__isset.MILLIS ? 3 : 6);
333
0
                return ans;
334
0
            }
335
0
        }
336
71
        ans.first = DataTypeFactory::instance().create_data_type(
337
71
                TYPE_DATETIMEV2, nullable, 0, logicalType.TIMESTAMP.unit.__isset.MILLIS ? 3 : 6);
338
71
    } else if (logicalType.__isset.JSON) {
339
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
340
4
    } else if (logicalType.__isset.UUID) {
341
4
        if (_enable_mapping_varbinary) {
342
3
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_VARBINARY, nullable, -1,
343
3
                                                                     -1, 16);
344
3
        } else {
345
1
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
346
1
        }
347
4
    } else if (logicalType.__isset.FLOAT16) {
348
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_FLOAT, nullable);
349
0
    } else {
350
0
        throw Exception(Status::InternalError("Not supported parquet logicalType"));
351
0
    }
352
761
    return ans;
353
761
}
354
355
std::pair<DataTypePtr, bool> FieldDescriptor::convert_to_doris_type(
356
226
        const tparquet::SchemaElement& physical_schema, bool nullable) {
357
226
    std::pair<DataTypePtr, bool> ans = {std::make_shared<DataTypeNothing>(), false};
358
226
    bool& is_type_compatibility = ans.second;
359
226
    switch (physical_schema.converted_type) {
360
106
    case tparquet::ConvertedType::type::UTF8:
361
106
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
362
106
        break;
363
24
    case tparquet::ConvertedType::type::DECIMAL:
364
24
        ans.first = DataTypeFactory::instance().create_data_type(
365
24
                TYPE_DECIMAL128I, nullable, physical_schema.precision, physical_schema.scale);
366
24
        break;
367
24
    case tparquet::ConvertedType::type::DATE:
368
24
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_DATEV2, nullable);
369
24
        break;
370
0
    case tparquet::ConvertedType::type::TIME_MILLIS:
371
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_TIMEV2, nullable, 0, 3);
372
0
        break;
373
0
    case tparquet::ConvertedType::type::TIME_MICROS:
374
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_TIMEV2, nullable, 0, 6);
375
0
        break;
376
0
    case tparquet::ConvertedType::type::TIMESTAMP_MILLIS:
377
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_DATETIMEV2, nullable, 0, 3);
378
0
        break;
379
24
    case tparquet::ConvertedType::type::TIMESTAMP_MICROS:
380
24
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_DATETIMEV2, nullable, 0, 6);
381
24
        break;
382
24
    case tparquet::ConvertedType::type::INT_8:
383
24
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_TINYINT, nullable);
384
24
        break;
385
0
    case tparquet::ConvertedType::type::UINT_8:
386
0
        is_type_compatibility = true;
387
0
        [[fallthrough]];
388
24
    case tparquet::ConvertedType::type::INT_16:
389
24
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_SMALLINT, nullable);
390
24
        break;
391
0
    case tparquet::ConvertedType::type::UINT_16:
392
0
        is_type_compatibility = true;
393
0
        [[fallthrough]];
394
0
    case tparquet::ConvertedType::type::INT_32:
395
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_INT, nullable);
396
0
        break;
397
0
    case tparquet::ConvertedType::type::UINT_32:
398
0
        is_type_compatibility = true;
399
0
        [[fallthrough]];
400
0
    case tparquet::ConvertedType::type::INT_64:
401
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_BIGINT, nullable);
402
0
        break;
403
0
    case tparquet::ConvertedType::type::UINT_64:
404
0
        is_type_compatibility = true;
405
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_LARGEINT, nullable);
406
0
        break;
407
0
    case tparquet::ConvertedType::type::JSON:
408
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
409
0
        break;
410
0
    default:
411
0
        throw Exception(Status::InternalError("Not supported parquet ConvertedType: {}",
412
0
                                              physical_schema.converted_type));
413
226
    }
414
226
    return ans;
415
226
}
416
417
Status FieldDescriptor::parse_group_field(const std::vector<tparquet::SchemaElement>& t_schemas,
418
580
                                          size_t curr_pos, FieldSchema* group_field) {
419
580
    const auto& group_schema = t_schemas[curr_pos];
420
580
    if (is_variant_node(group_schema)) {
421
10
        return parse_variant_field(t_schemas, curr_pos, group_field);
422
10
    }
423
570
    if (is_map_node(group_schema)) {
424
        // the map definition:
425
        // optional group <name> (MAP) {
426
        //   repeated group map (MAP_KEY_VALUE) {
427
        //     required <type> key;
428
        //     optional <type> value;
429
        //   }
430
        // }
431
82
        return parse_map_field(t_schemas, curr_pos, group_field);
432
82
    }
433
488
    if (is_list_node(group_schema)) {
434
        // the list definition:
435
        // optional group <name> (LIST) {
436
        //   repeated group [bag | list] { // hive or spark
437
        //     optional <type> [array_element | element]; // hive or spark
438
        //   }
439
        // }
440
162
        return parse_list_field(t_schemas, curr_pos, group_field);
441
162
    }
442
443
326
    if (is_repeated_node(group_schema)) {
444
0
        group_field->repetition_level++;
445
0
        group_field->definition_level++;
446
0
        group_field->children.resize(1);
447
0
        set_child_node_level(group_field, group_field->definition_level);
448
0
        auto struct_field = &group_field->children[0];
449
        // the list of struct:
450
        // repeated group <name> (LIST) {
451
        //   optional/required <type> <name>;
452
        //   ...
453
        // }
454
        // produce a non-null list<struct>
455
0
        RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos, struct_field));
456
457
0
        group_field->name = group_schema.name;
458
0
        group_field->lower_case_name = to_lower(group_field->name);
459
0
        group_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
460
0
        group_field->data_type =
461
0
                std::make_shared<DataTypeArray>(make_nullable(struct_field->data_type));
462
0
        group_field->field_id = group_schema.__isset.field_id ? group_schema.field_id : -1;
463
326
    } else {
464
326
        RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos, group_field));
465
326
    }
466
467
326
    return Status::OK();
468
326
}
469
470
Status FieldDescriptor::parse_variant_field(const std::vector<tparquet::SchemaElement>& t_schemas,
471
10
                                            size_t curr_pos, FieldSchema* variant_field) {
472
10
    RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos, variant_field));
473
474
10
    bool has_metadata = false;
475
10
    bool metadata_required = false;
476
10
    bool has_value = false;
477
10
    bool has_typed_value = false;
478
24
    for (const auto& child : variant_field->children) {
479
24
        if (child.lower_case_name == "metadata") {
480
10
            if (has_metadata) {
481
0
                return Status::InvalidArgument(
482
0
                        "Parquet VARIANT field '{}' has duplicate metadata child",
483
0
                        variant_field->name);
484
0
            }
485
10
            if (!is_unannotated_binary_field(child)) {
486
1
                return Status::InvalidArgument(
487
1
                        "Parquet VARIANT field '{}' metadata child must be unannotated binary",
488
1
                        variant_field->name);
489
1
            }
490
9
            has_metadata = true;
491
9
            metadata_required = !child.data_type->is_nullable();
492
14
        } else if (child.lower_case_name == "value") {
493
9
            if (has_value) {
494
1
                return Status::InvalidArgument(
495
1
                        "Parquet VARIANT field '{}' has duplicate value child",
496
1
                        variant_field->name);
497
1
            }
498
8
            if (!is_unannotated_binary_field(child)) {
499
2
                return Status::InvalidArgument(
500
2
                        "Parquet VARIANT field '{}' value child must be unannotated binary",
501
2
                        variant_field->name);
502
2
            }
503
6
            has_value = true;
504
6
        } else if (child.lower_case_name == "typed_value") {
505
4
            if (has_typed_value) {
506
0
                return Status::InvalidArgument(
507
0
                        "Parquet VARIANT field '{}' has duplicate typed_value child",
508
0
                        variant_field->name);
509
0
            }
510
4
            has_typed_value = true;
511
4
        } else {
512
1
            return Status::InvalidArgument("Parquet VARIANT field '{}' has unexpected child '{}'",
513
1
                                           variant_field->name, child.name);
514
1
        }
515
24
    }
516
5
    if (!has_metadata || !metadata_required || (!has_value && !has_typed_value)) {
517
0
        return Status::InvalidArgument(
518
0
                "Parquet VARIANT field '{}' must contain required binary metadata and at least one "
519
0
                "binary value or typed_value field",
520
0
                variant_field->name);
521
0
    }
522
523
5
    variant_field->data_type = std::make_shared<DataTypeVariant>(0, false);
524
5
    if (is_optional_node(t_schemas[curr_pos])) {
525
3
        variant_field->data_type = make_nullable(variant_field->data_type);
526
3
    }
527
5
    mark_variant_subfields(variant_field);
528
5
    return Status::OK();
529
5
}
530
531
Status FieldDescriptor::parse_list_field(const std::vector<tparquet::SchemaElement>& t_schemas,
532
162
                                         size_t curr_pos, FieldSchema* list_field) {
533
    // the list definition:
534
    // spark and hive have three level schemas but with different schema name
535
    // spark: <column-name> - "list" - "element"
536
    // hive: <column-name> - "bag" - "array_element"
537
    // parse three level schemas to two level primitive like: LIST<INT>,
538
    // or nested structure like: LIST<MAP<INT, INT>>
539
162
    auto& first_level = t_schemas[curr_pos];
540
162
    if (first_level.num_children != 1) {
541
0
        return Status::InvalidArgument("List element should have only one child");
542
0
    }
543
544
162
    if (curr_pos + 1 >= t_schemas.size()) {
545
0
        return Status::InvalidArgument("List element should have the second level schema");
546
0
    }
547
548
162
    if (first_level.repetition_type == tparquet::FieldRepetitionType::REPEATED) {
549
0
        return Status::InvalidArgument("List element can't be a repeated schema");
550
0
    }
551
552
    // the repeated schema element
553
162
    auto& second_level = t_schemas[curr_pos + 1];
554
162
    if (second_level.repetition_type != tparquet::FieldRepetitionType::REPEATED) {
555
0
        return Status::InvalidArgument("The second level of list element should be repeated");
556
0
    }
557
558
    // This indicates if this list is nullable.
559
162
    bool is_optional = is_optional_node(first_level);
560
162
    if (is_optional) {
561
123
        list_field->definition_level++;
562
123
    }
563
162
    list_field->repetition_level++;
564
162
    list_field->definition_level++;
565
162
    list_field->children.resize(1);
566
162
    FieldSchema* list_child = &list_field->children[0];
567
568
162
    size_t num_children = num_children_node(second_level);
569
162
    if (num_children > 0) {
570
162
        if (num_children == 1 && !is_struct_list_node(second_level)) {
571
            // optional field, and the third level element is the nested structure in list
572
            // produce nested structure like: LIST<INT>, LIST<MAP>, LIST<LIST<...>>
573
            // skip bag/list, it's a repeated element.
574
162
            set_child_node_level(list_field, list_field->definition_level);
575
162
            RETURN_IF_ERROR(parse_node_field(t_schemas, curr_pos + 2, list_child));
576
162
        } else {
577
            // required field, produce the list of struct
578
0
            set_child_node_level(list_field, list_field->definition_level);
579
0
            RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos + 1, list_child));
580
0
        }
581
162
    } else if (num_children == 0) {
582
        // required two level list, for compatibility reason.
583
0
        set_child_node_level(list_field, list_field->definition_level);
584
0
        parse_physical_field(second_level, false, list_child);
585
0
        _next_schema_pos = curr_pos + 2;
586
0
    }
587
588
162
    list_field->name = first_level.name;
589
162
    list_field->lower_case_name = to_lower(first_level.name);
590
162
    list_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
591
162
    list_field->data_type =
592
162
            std::make_shared<DataTypeArray>(make_nullable(list_field->children[0].data_type));
593
162
    if (is_optional) {
594
123
        list_field->data_type = make_nullable(list_field->data_type);
595
123
    }
596
162
    list_field->field_id = first_level.__isset.field_id ? first_level.field_id : -1;
597
598
162
    return Status::OK();
599
162
}
600
601
Status FieldDescriptor::parse_map_field(const std::vector<tparquet::SchemaElement>& t_schemas,
602
82
                                        size_t curr_pos, FieldSchema* map_field) {
603
    // the map definition in parquet:
604
    // optional group <name> (MAP) {
605
    //   repeated group map (MAP_KEY_VALUE) {
606
    //     required <type> key;
607
    //     optional <type> value;
608
    //   }
609
    // }
610
    // Map value can be optional, the map without values is a SET
611
82
    if (curr_pos + 2 >= t_schemas.size()) {
612
0
        return Status::InvalidArgument("Map element should have at least three levels");
613
0
    }
614
82
    auto& map_schema = t_schemas[curr_pos];
615
82
    if (map_schema.num_children != 1) {
616
0
        return Status::InvalidArgument(
617
0
                "Map element should have only one child(name='map', type='MAP_KEY_VALUE')");
618
0
    }
619
82
    if (is_repeated_node(map_schema)) {
620
0
        return Status::InvalidArgument("Map element can't be a repeated schema");
621
0
    }
622
82
    auto& map_key_value = t_schemas[curr_pos + 1];
623
82
    if (!is_group_node(map_key_value) || !is_repeated_node(map_key_value)) {
624
0
        return Status::InvalidArgument(
625
0
                "the second level in map must be a repeated group(key and value)");
626
0
    }
627
82
    auto& map_key = t_schemas[curr_pos + 2];
628
82
    if (!is_required_node(map_key)) {
629
0
        LOG(WARNING) << "Filed " << map_schema.name << " is map type, but with nullable key column";
630
0
    }
631
632
82
    if (map_key_value.num_children == 1) {
633
        // The map with three levels is a SET
634
0
        return parse_list_field(t_schemas, curr_pos, map_field);
635
0
    }
636
82
    if (map_key_value.num_children != 2) {
637
        // A standard map should have four levels
638
0
        return Status::InvalidArgument(
639
0
                "the second level in map(MAP_KEY_VALUE) should have two children");
640
0
    }
641
    // standard map
642
82
    bool is_optional = is_optional_node(map_schema);
643
82
    if (is_optional) {
644
56
        map_field->definition_level++;
645
56
    }
646
82
    map_field->repetition_level++;
647
82
    map_field->definition_level++;
648
649
    // Directly create key and value children instead of intermediate key_value node
650
82
    map_field->children.resize(2);
651
    // map is a repeated node, we should set the `repeated_parent_def_level` of its children as `definition_level`
652
82
    set_child_node_level(map_field, map_field->definition_level);
653
654
82
    auto key_field = &map_field->children[0];
655
82
    auto value_field = &map_field->children[1];
656
657
    // Parse key and value fields directly from the key_value group's children
658
82
    _next_schema_pos = curr_pos + 2; // Skip key_value group, go directly to key
659
82
    RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos, key_field));
660
82
    RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos, value_field));
661
662
82
    map_field->name = map_schema.name;
663
82
    map_field->lower_case_name = to_lower(map_field->name);
664
82
    map_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
665
82
    map_field->data_type = std::make_shared<DataTypeMap>(make_nullable(key_field->data_type),
666
82
                                                         make_nullable(value_field->data_type));
667
82
    if (is_optional) {
668
56
        map_field->data_type = make_nullable(map_field->data_type);
669
56
    }
670
82
    map_field->field_id = map_schema.__isset.field_id ? map_schema.field_id : -1;
671
672
82
    return Status::OK();
673
82
}
674
675
Status FieldDescriptor::parse_struct_field(const std::vector<tparquet::SchemaElement>& t_schemas,
676
336
                                           size_t curr_pos, FieldSchema* struct_field) {
677
    // the nested column in parquet, parse group to struct.
678
336
    auto& struct_schema = t_schemas[curr_pos];
679
336
    bool is_optional = is_optional_node(struct_schema);
680
336
    if (is_optional) {
681
308
        struct_field->definition_level++;
682
308
    }
683
336
    auto num_children = struct_schema.num_children;
684
336
    struct_field->children.resize(num_children);
685
336
    set_child_node_level(struct_field, struct_field->repeated_parent_def_level);
686
336
    _next_schema_pos = curr_pos + 1;
687
1.21k
    for (int i = 0; i < num_children; ++i) {
688
878
        RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos, &struct_field->children[i]));
689
878
    }
690
336
    struct_field->name = struct_schema.name;
691
336
    struct_field->lower_case_name = to_lower(struct_field->name);
692
336
    struct_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
693
694
336
    struct_field->field_id = struct_schema.__isset.field_id ? struct_schema.field_id : -1;
695
336
    DataTypes res_data_types;
696
336
    std::vector<String> names;
697
1.21k
    for (int i = 0; i < num_children; ++i) {
698
878
        res_data_types.push_back(make_nullable(struct_field->children[i].data_type));
699
878
        names.push_back(struct_field->children[i].name);
700
878
    }
701
336
    struct_field->data_type = std::make_shared<DataTypeStruct>(res_data_types, names);
702
336
    if (is_optional) {
703
308
        struct_field->data_type = make_nullable(struct_field->data_type);
704
308
    }
705
336
    return Status::OK();
706
336
}
707
708
5
int FieldDescriptor::get_column_index(const std::string& column) const {
709
15
    for (int32_t i = 0; i < _fields.size(); i++) {
710
15
        if (_fields[i].name == column) {
711
5
            return i;
712
5
        }
713
15
    }
714
0
    return -1;
715
5
}
716
717
1.03k
FieldSchema* FieldDescriptor::get_column(const std::string& name) const {
718
1.03k
    auto it = _name_to_field.find(name);
719
1.03k
    if (it != _name_to_field.end()) {
720
1.03k
        return it->second;
721
1.03k
    }
722
0
    throw Exception(Status::InternalError("Name {} not found in FieldDescriptor!", name));
723
0
    return nullptr;
724
1.03k
}
725
726
namespace {
727
728
4.83k
void collect_physical_fields(FieldSchema* field, std::vector<FieldSchema*>* physical_fields) {
729
4.83k
    if (field->children.empty()) {
730
3.21k
        if (field->physical_column_index >= 0) {
731
3.21k
            field->physical_column_index = cast_set<int>(physical_fields->size());
732
3.21k
            physical_fields->push_back(field);
733
3.21k
        }
734
3.21k
        return;
735
3.21k
    }
736
3.38k
    for (auto& child : field->children) {
737
3.38k
        collect_physical_fields(&child, physical_fields);
738
3.38k
    }
739
1.62k
}
740
741
} // namespace
742
743
157
void FieldDescriptor::rebuild_indexes() {
744
157
    _physical_fields.clear();
745
157
    _name_to_field.clear();
746
1.44k
    for (auto& field : _fields) {
747
1.44k
        _name_to_field.emplace(field.name, &field);
748
1.44k
        collect_physical_fields(&field, &_physical_fields);
749
1.44k
    }
750
157
}
751
752
14
void FieldDescriptor::get_column_names(std::unordered_set<std::string>* names) const {
753
14
    names->clear();
754
210
    for (const FieldSchema& f : _fields) {
755
210
        names->emplace(f.name);
756
210
    }
757
14
}
758
759
0
std::string FieldDescriptor::debug_string() const {
760
0
    std::stringstream ss;
761
0
    ss << "fields=[";
762
0
    for (int i = 0; i < _fields.size(); ++i) {
763
0
        if (i != 0) {
764
0
            ss << ", ";
765
0
        }
766
0
        ss << _fields[i].debug_string();
767
0
    }
768
0
    ss << "]";
769
0
    return ss.str();
770
0
}
771
772
67
void FieldDescriptor::assign_ids() {
773
67
    uint64_t next_id = 1;
774
610
    for (auto& field : _fields) {
775
610
        field.assign_ids(next_id);
776
610
    }
777
67
}
778
779
67
FieldDescriptor FieldDescriptor::copy_with_assigned_ids() const {
780
67
    FieldDescriptor copy = *this;
781
67
    copy.rebuild_indexes();
782
67
    copy.assign_ids();
783
67
    return copy;
784
67
}
785
786
0
const FieldSchema* FieldDescriptor::find_column_by_id(uint64_t column_id) const {
787
0
    for (const auto& field : _fields) {
788
0
        if (auto result = field.find_column_by_id(column_id)) {
789
0
            return result;
790
0
        }
791
0
    }
792
0
    return nullptr;
793
0
}
794
795
2.56k
void FieldSchema::assign_ids(uint64_t& next_id) {
796
2.56k
    column_id = next_id++;
797
798
2.56k
    for (auto& child : children) {
799
1.90k
        child.assign_ids(next_id);
800
1.90k
    }
801
802
2.56k
    max_column_id = next_id - 1;
803
2.56k
}
804
805
0
const FieldSchema* FieldSchema::find_column_by_id(uint64_t target_id) const {
806
0
    if (column_id == target_id) {
807
0
        return this;
808
0
    }
809
810
0
    for (const auto& child : children) {
811
0
        if (auto result = child.find_column_by_id(target_id)) {
812
0
            return result;
813
0
        }
814
0
    }
815
816
0
    return nullptr;
817
0
}
818
819
902
uint64_t FieldSchema::get_column_id() const {
820
902
    return column_id;
821
902
}
822
823
0
void FieldSchema::set_column_id(uint64_t id) {
824
0
    column_id = id;
825
0
}
826
827
149
uint64_t FieldSchema::get_max_column_id() const {
828
149
    return max_column_id;
829
149
}
830
831
} // namespace doris