Coverage Report

Created: 2026-05-13 13:56

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/parquet/schema_desc.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/parquet/schema_desc.h"
19
20
#include <ctype.h>
21
22
#include <algorithm>
23
#include <ostream>
24
#include <utility>
25
26
#include "common/cast_set.h"
27
#include "common/logging.h"
28
#include "core/data_type/data_type_array.h"
29
#include "core/data_type/data_type_factory.hpp"
30
#include "core/data_type/data_type_map.h"
31
#include "core/data_type/data_type_struct.h"
32
#include "core/data_type/data_type_variant.h"
33
#include "core/data_type/define_primitive_type.h"
34
#include "format/generic_reader.h"
35
#include "format/table/table_schema_change_helper.h"
36
#include "util/slice.h"
37
#include "util/string_util.h"
38
39
namespace doris {
40
41
2.40k
static bool is_group_node(const tparquet::SchemaElement& schema) {
42
2.40k
    return schema.num_children > 0;
43
2.40k
}
44
45
442
static bool is_list_node(const tparquet::SchemaElement& schema) {
46
442
    return schema.__isset.converted_type && schema.converted_type == tparquet::ConvertedType::LIST;
47
442
}
48
49
518
static bool is_map_node(const tparquet::SchemaElement& schema) {
50
518
    return schema.__isset.converted_type &&
51
518
           (schema.converted_type == tparquet::ConvertedType::MAP ||
52
228
            schema.converted_type == tparquet::ConvertedType::MAP_KEY_VALUE);
53
518
}
54
55
2.14k
static bool is_repeated_node(const tparquet::SchemaElement& schema) {
56
2.14k
    return schema.__isset.repetition_type &&
57
2.14k
           schema.repetition_type == tparquet::FieldRepetitionType::REPEATED;
58
2.14k
}
59
60
76
static bool is_required_node(const tparquet::SchemaElement& schema) {
61
76
    return schema.__isset.repetition_type &&
62
76
           schema.repetition_type == tparquet::FieldRepetitionType::REQUIRED;
63
76
}
64
65
2.22k
static bool is_optional_node(const tparquet::SchemaElement& schema) {
66
2.22k
    return schema.__isset.repetition_type &&
67
2.22k
           schema.repetition_type == tparquet::FieldRepetitionType::OPTIONAL;
68
2.22k
}
69
70
518
static bool is_variant_node(const tparquet::SchemaElement& schema) {
71
518
    return schema.__isset.logicalType && schema.logicalType.__isset.VARIANT;
72
518
}
73
74
152
static int num_children_node(const tparquet::SchemaElement& schema) {
75
152
    return schema.__isset.num_children ? schema.num_children : 0;
76
152
}
77
78
/**
79
 * `repeated_parent_def_level` is the definition level of the first ancestor node whose repetition_type equals REPEATED.
80
 * Empty array/map values are not stored in doris columns, so have to use `repeated_parent_def_level` to skip the
81
 * empty or null values in ancestor node.
82
 *
83
 * For instance, considering an array of strings with 3 rows like the following:
84
 * null, [], [a, b, c]
85
 * We can store four elements in data column: null, a, b, c
86
 * and the offsets column is: 1, 1, 4
87
 * and the null map is: 1, 0, 0
88
 * For the i-th row in array column: range from `offsets[i - 1]` until `offsets[i]` represents the elements in this row,
89
 * so we can't store empty array/map values in doris data column.
90
 * As a comparison, spark does not require `repeated_parent_def_level`,
91
 * because the spark column stores empty array/map values , and use anther length column to indicate empty values.
92
 * Please reference: https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
93
 *
94
 * Furthermore, we can also avoid store null array/map values in doris data column.
95
 * The same three rows as above, We can only store three elements in data column: a, b, c
96
 * and the offsets column is: 0, 0, 3
97
 * and the null map is: 1, 0, 0
98
 *
99
 * Inherit the repetition and definition level from parent node, if the parent node is repeated,
100
 * we should set repeated_parent_def_level = definition_level, otherwise as repeated_parent_def_level.
101
 * @param parent parent node
102
 * @param repeated_parent_def_level the first ancestor node whose repetition_type equals REPEATED
103
 */
104
518
static void set_child_node_level(FieldSchema* parent, int16_t repeated_parent_def_level) {
105
1.08k
    for (auto& child : parent->children) {
106
1.08k
        child.repetition_level = parent->repetition_level;
107
1.08k
        child.definition_level = parent->definition_level;
108
1.08k
        child.repeated_parent_def_level = repeated_parent_def_level;
109
1.08k
    }
110
518
}
111
112
152
static bool is_struct_list_node(const tparquet::SchemaElement& schema) {
113
152
    const std::string& name = schema.name;
114
152
    static const Slice array_slice("array", 5);
115
152
    static const Slice tuple_slice("_tuple", 6);
116
152
    Slice slice(name);
117
152
    return slice == array_slice || slice.ends_with(tuple_slice);
118
152
}
119
120
0
std::string FieldSchema::debug_string() const {
121
0
    std::stringstream ss;
122
0
    ss << "FieldSchema(name=" << name << ", R=" << repetition_level << ", D=" << definition_level;
123
0
    if (children.size() > 0) {
124
0
        ss << ", type=" << data_type->get_name() << ", children=[";
125
0
        for (int i = 0; i < children.size(); ++i) {
126
0
            if (i != 0) {
127
0
                ss << ", ";
128
0
            }
129
0
            ss << children[i].debug_string();
130
0
        }
131
0
        ss << "]";
132
0
    } else {
133
0
        ss << ", physical_type=" << physical_type;
134
0
        ss << " , doris_type=" << data_type->get_name();
135
0
    }
136
0
    ss << ")";
137
0
    return ss.str();
138
0
}
139
140
107
Status FieldDescriptor::parse_from_thrift(const std::vector<tparquet::SchemaElement>& t_schemas) {
141
107
    if (t_schemas.size() == 0 || !is_group_node(t_schemas[0])) {
142
0
        return Status::InvalidArgument("Wrong parquet root schema element");
143
0
    }
144
107
    const auto& root_schema = t_schemas[0];
145
107
    _fields.resize(root_schema.num_children);
146
107
    _next_schema_pos = 1;
147
148
1.24k
    for (int i = 0; i < root_schema.num_children; ++i) {
149
1.14k
        RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos, &_fields[i]));
150
1.14k
        if (_name_to_field.find(_fields[i].name) != _name_to_field.end()) {
151
0
            return Status::InvalidArgument("Duplicated field name: {}", _fields[i].name);
152
0
        }
153
1.14k
        _name_to_field.emplace(_fields[i].name, &_fields[i]);
154
1.14k
    }
155
156
107
    if (_next_schema_pos != t_schemas.size()) {
157
0
        return Status::InvalidArgument("Remaining {} unparsed schema elements",
158
0
                                       t_schemas.size() - _next_schema_pos);
159
0
    }
160
161
107
    return Status::OK();
162
107
}
163
164
Status FieldDescriptor::parse_node_field(const std::vector<tparquet::SchemaElement>& t_schemas,
165
2.22k
                                         size_t curr_pos, FieldSchema* node_field) {
166
2.22k
    if (curr_pos >= t_schemas.size()) {
167
0
        return Status::InvalidArgument("Out-of-bounds index of schema elements");
168
0
    }
169
2.22k
    auto& t_schema = t_schemas[curr_pos];
170
2.22k
    if (is_group_node(t_schema)) {
171
        // nested structure or nullable list
172
518
        return parse_group_field(t_schemas, curr_pos, node_field);
173
518
    }
174
1.70k
    if (is_repeated_node(t_schema)) {
175
        // repeated <primitive-type> <name> (LIST)
176
        // produce required list<element>
177
0
        node_field->repetition_level++;
178
0
        node_field->definition_level++;
179
0
        node_field->children.resize(1);
180
0
        set_child_node_level(node_field, node_field->definition_level);
181
0
        auto child = &node_field->children[0];
182
0
        parse_physical_field(t_schema, false, child);
183
184
0
        node_field->name = t_schema.name;
185
0
        node_field->lower_case_name = to_lower(t_schema.name);
186
0
        node_field->data_type = std::make_shared<DataTypeArray>(make_nullable(child->data_type));
187
0
        _next_schema_pos = curr_pos + 1;
188
0
        node_field->field_id = t_schema.__isset.field_id ? t_schema.field_id : -1;
189
1.70k
    } else {
190
1.70k
        bool is_optional = is_optional_node(t_schema);
191
1.70k
        if (is_optional) {
192
1.59k
            node_field->definition_level++;
193
1.59k
        }
194
1.70k
        parse_physical_field(t_schema, is_optional, node_field);
195
1.70k
        _next_schema_pos = curr_pos + 1;
196
1.70k
    }
197
1.70k
    return Status::OK();
198
2.22k
}
199
200
void FieldDescriptor::parse_physical_field(const tparquet::SchemaElement& physical_schema,
201
1.70k
                                           bool is_nullable, FieldSchema* physical_field) {
202
1.70k
    physical_field->name = physical_schema.name;
203
1.70k
    physical_field->lower_case_name = to_lower(physical_field->name);
204
1.70k
    physical_field->parquet_schema = physical_schema;
205
1.70k
    physical_field->physical_type = physical_schema.type;
206
1.70k
    physical_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
207
1.70k
    _physical_fields.push_back(physical_field);
208
1.70k
    physical_field->physical_column_index = cast_set<int>(_physical_fields.size() - 1);
209
1.70k
    auto type = get_doris_type(physical_schema, is_nullable);
210
1.70k
    physical_field->data_type = type.first;
211
1.70k
    physical_field->is_type_compatibility = type.second;
212
1.70k
    physical_field->field_id = physical_schema.__isset.field_id ? physical_schema.field_id : -1;
213
1.70k
}
214
215
std::pair<DataTypePtr, bool> FieldDescriptor::get_doris_type(
216
1.70k
        const tparquet::SchemaElement& physical_schema, bool nullable) {
217
1.70k
    std::pair<DataTypePtr, bool> ans = {std::make_shared<DataTypeNothing>(), false};
218
1.70k
    try {
219
1.70k
        if (physical_schema.__isset.logicalType) {
220
716
            ans = convert_to_doris_type(physical_schema.logicalType, nullable);
221
986
        } else if (physical_schema.__isset.converted_type) {
222
226
            ans = convert_to_doris_type(physical_schema, nullable);
223
226
        }
224
1.70k
    } catch (...) {
225
        // now the Not supported exception are ignored
226
        // so those byte_array maybe be treated as varbinary(now) : string(before)
227
0
    }
228
1.70k
    if (ans.first->get_primitive_type() == PrimitiveType::INVALID_TYPE) {
229
760
        switch (physical_schema.type) {
230
93
        case tparquet::Type::BOOLEAN:
231
93
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_BOOLEAN, nullable);
232
93
            break;
233
189
        case tparquet::Type::INT32:
234
189
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_INT, nullable);
235
189
            break;
236
123
        case tparquet::Type::INT64:
237
123
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_BIGINT, nullable);
238
123
            break;
239
12
        case tparquet::Type::INT96:
240
12
            if (_enable_mapping_timestamp_tz) {
241
                // treat INT96 as TIMESTAMPTZ
242
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_TIMESTAMPTZ, nullable,
243
0
                                                                         0, 6);
244
12
            } else {
245
                // in most cases, it's a nano timestamp
246
12
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_DATETIMEV2, nullable,
247
12
                                                                         0, 6);
248
12
            }
249
12
            break;
250
93
        case tparquet::Type::FLOAT:
251
93
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_FLOAT, nullable);
252
93
            break;
253
205
        case tparquet::Type::DOUBLE:
254
205
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_DOUBLE, nullable);
255
205
            break;
256
45
        case tparquet::Type::BYTE_ARRAY:
257
45
            if (_enable_mapping_varbinary) {
258
                // if physical_schema not set logicalType and converted_type,
259
                // we treat BYTE_ARRAY as VARBINARY by default, so that we can read all data directly.
260
22
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_VARBINARY, nullable);
261
23
            } else {
262
23
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
263
23
            }
264
45
            break;
265
0
        case tparquet::Type::FIXED_LEN_BYTE_ARRAY:
266
0
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
267
0
            break;
268
0
        default:
269
0
            throw Exception(Status::InternalError("Not supported parquet logicalType{}",
270
0
                                                  physical_schema.type));
271
0
            break;
272
760
        }
273
760
    }
274
1.70k
    return ans;
275
1.70k
}
276
277
std::pair<DataTypePtr, bool> FieldDescriptor::convert_to_doris_type(
278
716
        tparquet::LogicalType logicalType, bool nullable) {
279
716
    std::pair<DataTypePtr, bool> ans = {std::make_shared<DataTypeNothing>(), false};
280
716
    bool& is_type_compatibility = ans.second;
281
716
    if (logicalType.__isset.STRING) {
282
472
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
283
472
    } else if (logicalType.__isset.DECIMAL) {
284
114
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_DECIMAL128I, nullable,
285
114
                                                                 logicalType.DECIMAL.precision,
286
114
                                                                 logicalType.DECIMAL.scale);
287
130
    } else if (logicalType.__isset.DATE) {
288
57
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_DATEV2, nullable);
289
73
    } else if (logicalType.__isset.INTEGER) {
290
0
        if (logicalType.INTEGER.isSigned) {
291
0
            if (logicalType.INTEGER.bitWidth <= 8) {
292
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_TINYINT, nullable);
293
0
            } else if (logicalType.INTEGER.bitWidth <= 16) {
294
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_SMALLINT, nullable);
295
0
            } else if (logicalType.INTEGER.bitWidth <= 32) {
296
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_INT, nullable);
297
0
            } else {
298
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_BIGINT, nullable);
299
0
            }
300
0
        } else {
301
0
            is_type_compatibility = true;
302
0
            if (logicalType.INTEGER.bitWidth <= 8) {
303
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_SMALLINT, nullable);
304
0
            } else if (logicalType.INTEGER.bitWidth <= 16) {
305
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_INT, nullable);
306
0
            } else if (logicalType.INTEGER.bitWidth <= 32) {
307
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_BIGINT, nullable);
308
0
            } else {
309
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_LARGEINT, nullable);
310
0
            }
311
0
        }
312
73
    } else if (logicalType.__isset.TIME) {
313
0
        ans.first = DataTypeFactory::instance().create_data_type(
314
0
                TYPE_TIMEV2, nullable, 0, logicalType.TIME.unit.__isset.MILLIS ? 3 : 6);
315
73
    } else if (logicalType.__isset.TIMESTAMP) {
316
69
        if (_enable_mapping_timestamp_tz) {
317
0
            if (logicalType.TIMESTAMP.isAdjustedToUTC) {
318
                // treat TIMESTAMP with isAdjustedToUTC as TIMESTAMPTZ
319
0
                ans.first = DataTypeFactory::instance().create_data_type(
320
0
                        TYPE_TIMESTAMPTZ, nullable, 0,
321
0
                        logicalType.TIMESTAMP.unit.__isset.MILLIS ? 3 : 6);
322
0
                return ans;
323
0
            }
324
0
        }
325
69
        ans.first = DataTypeFactory::instance().create_data_type(
326
69
                TYPE_DATETIMEV2, nullable, 0, logicalType.TIMESTAMP.unit.__isset.MILLIS ? 3 : 6);
327
69
    } else if (logicalType.__isset.JSON) {
328
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
329
4
    } else if (logicalType.__isset.UUID) {
330
4
        if (_enable_mapping_varbinary) {
331
3
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_VARBINARY, nullable, -1,
332
3
                                                                     -1, 16);
333
3
        } else {
334
1
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
335
1
        }
336
4
    } else if (logicalType.__isset.FLOAT16) {
337
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_FLOAT, nullable);
338
0
    } else {
339
0
        throw Exception(Status::InternalError("Not supported parquet logicalType"));
340
0
    }
341
716
    return ans;
342
716
}
343
344
std::pair<DataTypePtr, bool> FieldDescriptor::convert_to_doris_type(
345
226
        const tparquet::SchemaElement& physical_schema, bool nullable) {
346
226
    std::pair<DataTypePtr, bool> ans = {std::make_shared<DataTypeNothing>(), false};
347
226
    bool& is_type_compatibility = ans.second;
348
226
    switch (physical_schema.converted_type) {
349
106
    case tparquet::ConvertedType::type::UTF8:
350
106
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
351
106
        break;
352
24
    case tparquet::ConvertedType::type::DECIMAL:
353
24
        ans.first = DataTypeFactory::instance().create_data_type(
354
24
                TYPE_DECIMAL128I, nullable, physical_schema.precision, physical_schema.scale);
355
24
        break;
356
24
    case tparquet::ConvertedType::type::DATE:
357
24
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_DATEV2, nullable);
358
24
        break;
359
0
    case tparquet::ConvertedType::type::TIME_MILLIS:
360
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_TIMEV2, nullable, 0, 3);
361
0
        break;
362
0
    case tparquet::ConvertedType::type::TIME_MICROS:
363
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_TIMEV2, nullable, 0, 6);
364
0
        break;
365
0
    case tparquet::ConvertedType::type::TIMESTAMP_MILLIS:
366
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_DATETIMEV2, nullable, 0, 3);
367
0
        break;
368
24
    case tparquet::ConvertedType::type::TIMESTAMP_MICROS:
369
24
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_DATETIMEV2, nullable, 0, 6);
370
24
        break;
371
24
    case tparquet::ConvertedType::type::INT_8:
372
24
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_TINYINT, nullable);
373
24
        break;
374
0
    case tparquet::ConvertedType::type::UINT_8:
375
0
        is_type_compatibility = true;
376
0
        [[fallthrough]];
377
24
    case tparquet::ConvertedType::type::INT_16:
378
24
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_SMALLINT, nullable);
379
24
        break;
380
0
    case tparquet::ConvertedType::type::UINT_16:
381
0
        is_type_compatibility = true;
382
0
        [[fallthrough]];
383
0
    case tparquet::ConvertedType::type::INT_32:
384
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_INT, nullable);
385
0
        break;
386
0
    case tparquet::ConvertedType::type::UINT_32:
387
0
        is_type_compatibility = true;
388
0
        [[fallthrough]];
389
0
    case tparquet::ConvertedType::type::INT_64:
390
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_BIGINT, nullable);
391
0
        break;
392
0
    case tparquet::ConvertedType::type::UINT_64:
393
0
        is_type_compatibility = true;
394
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_LARGEINT, nullable);
395
0
        break;
396
0
    case tparquet::ConvertedType::type::JSON:
397
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
398
0
        break;
399
0
    default:
400
0
        throw Exception(Status::InternalError("Not supported parquet ConvertedType: {}",
401
0
                                              physical_schema.converted_type));
402
226
    }
403
226
    return ans;
404
226
}
405
406
Status FieldDescriptor::parse_group_field(const std::vector<tparquet::SchemaElement>& t_schemas,
407
518
                                          size_t curr_pos, FieldSchema* group_field) {
408
518
    const auto& group_schema = t_schemas[curr_pos];
409
518
    if (is_variant_node(group_schema)) {
410
0
        return parse_variant_field(t_schemas, curr_pos, group_field);
411
0
    }
412
518
    if (is_map_node(group_schema)) {
413
        // the map definition:
414
        // optional group <name> (MAP) {
415
        //   repeated group map (MAP_KEY_VALUE) {
416
        //     required <type> key;
417
        //     optional <type> value;
418
        //   }
419
        // }
420
76
        return parse_map_field(t_schemas, curr_pos, group_field);
421
76
    }
422
442
    if (is_list_node(group_schema)) {
423
        // the list definition:
424
        // optional group <name> (LIST) {
425
        //   repeated group [bag | list] { // hive or spark
426
        //     optional <type> [array_element | element]; // hive or spark
427
        //   }
428
        // }
429
152
        return parse_list_field(t_schemas, curr_pos, group_field);
430
152
    }
431
432
290
    if (is_repeated_node(group_schema)) {
433
0
        group_field->repetition_level++;
434
0
        group_field->definition_level++;
435
0
        group_field->children.resize(1);
436
0
        set_child_node_level(group_field, group_field->definition_level);
437
0
        auto struct_field = &group_field->children[0];
438
        // the list of struct:
439
        // repeated group <name> (LIST) {
440
        //   optional/required <type> <name>;
441
        //   ...
442
        // }
443
        // produce a non-null list<struct>
444
0
        RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos, struct_field));
445
446
0
        group_field->name = group_schema.name;
447
0
        group_field->lower_case_name = to_lower(group_field->name);
448
0
        group_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
449
0
        group_field->data_type =
450
0
                std::make_shared<DataTypeArray>(make_nullable(struct_field->data_type));
451
0
        group_field->field_id = group_schema.__isset.field_id ? group_schema.field_id : -1;
452
290
    } else {
453
290
        RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos, group_field));
454
290
    }
455
456
290
    return Status::OK();
457
290
}
458
459
Status FieldDescriptor::parse_variant_field(const std::vector<tparquet::SchemaElement>& t_schemas,
460
0
                                            size_t curr_pos, FieldSchema* variant_field) {
461
0
    RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos, variant_field));
462
463
0
    bool has_metadata = false;
464
0
    bool has_value = false;
465
0
    bool has_typed_value = false;
466
0
    for (const auto& child : variant_field->children) {
467
0
        if (child.lower_case_name == "metadata") {
468
0
            has_metadata = child.physical_type == tparquet::Type::BYTE_ARRAY;
469
0
        } else if (child.lower_case_name == "value") {
470
0
            has_value = child.physical_type == tparquet::Type::BYTE_ARRAY;
471
0
        } else if (child.lower_case_name == "typed_value") {
472
0
            has_typed_value = true;
473
0
        }
474
0
    }
475
0
    if (!has_metadata || (!has_value && !has_typed_value)) {
476
0
        return Status::InvalidArgument(
477
0
                "Parquet VARIANT field '{}' must contain binary metadata and value or typed_value "
478
0
                "fields",
479
0
                variant_field->name);
480
0
    }
481
482
0
    variant_field->data_type = std::make_shared<DataTypeVariant>(0, false);
483
0
    if (is_optional_node(t_schemas[curr_pos])) {
484
0
        variant_field->data_type = make_nullable(variant_field->data_type);
485
0
    }
486
0
    return Status::OK();
487
0
}
488
489
Status FieldDescriptor::parse_list_field(const std::vector<tparquet::SchemaElement>& t_schemas,
490
152
                                         size_t curr_pos, FieldSchema* list_field) {
491
    // the list definition:
492
    // spark and hive have three level schemas but with different schema name
493
    // spark: <column-name> - "list" - "element"
494
    // hive: <column-name> - "bag" - "array_element"
495
    // parse three level schemas to two level primitive like: LIST<INT>,
496
    // or nested structure like: LIST<MAP<INT, INT>>
497
152
    auto& first_level = t_schemas[curr_pos];
498
152
    if (first_level.num_children != 1) {
499
0
        return Status::InvalidArgument("List element should have only one child");
500
0
    }
501
502
152
    if (curr_pos + 1 >= t_schemas.size()) {
503
0
        return Status::InvalidArgument("List element should have the second level schema");
504
0
    }
505
506
152
    if (first_level.repetition_type == tparquet::FieldRepetitionType::REPEATED) {
507
0
        return Status::InvalidArgument("List element can't be a repeated schema");
508
0
    }
509
510
    // the repeated schema element
511
152
    auto& second_level = t_schemas[curr_pos + 1];
512
152
    if (second_level.repetition_type != tparquet::FieldRepetitionType::REPEATED) {
513
0
        return Status::InvalidArgument("The second level of list element should be repeated");
514
0
    }
515
516
    // This indicates if this list is nullable.
517
152
    bool is_optional = is_optional_node(first_level);
518
152
    if (is_optional) {
519
116
        list_field->definition_level++;
520
116
    }
521
152
    list_field->repetition_level++;
522
152
    list_field->definition_level++;
523
152
    list_field->children.resize(1);
524
152
    FieldSchema* list_child = &list_field->children[0];
525
526
152
    size_t num_children = num_children_node(second_level);
527
152
    if (num_children > 0) {
528
152
        if (num_children == 1 && !is_struct_list_node(second_level)) {
529
            // optional field, and the third level element is the nested structure in list
530
            // produce nested structure like: LIST<INT>, LIST<MAP>, LIST<LIST<...>>
531
            // skip bag/list, it's a repeated element.
532
152
            set_child_node_level(list_field, list_field->definition_level);
533
152
            RETURN_IF_ERROR(parse_node_field(t_schemas, curr_pos + 2, list_child));
534
152
        } else {
535
            // required field, produce the list of struct
536
0
            set_child_node_level(list_field, list_field->definition_level);
537
0
            RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos + 1, list_child));
538
0
        }
539
152
    } else if (num_children == 0) {
540
        // required two level list, for compatibility reason.
541
0
        set_child_node_level(list_field, list_field->definition_level);
542
0
        parse_physical_field(second_level, false, list_child);
543
0
        _next_schema_pos = curr_pos + 2;
544
0
    }
545
546
152
    list_field->name = first_level.name;
547
152
    list_field->lower_case_name = to_lower(first_level.name);
548
152
    list_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
549
152
    list_field->data_type =
550
152
            std::make_shared<DataTypeArray>(make_nullable(list_field->children[0].data_type));
551
152
    if (is_optional) {
552
116
        list_field->data_type = make_nullable(list_field->data_type);
553
116
    }
554
152
    list_field->field_id = first_level.__isset.field_id ? first_level.field_id : -1;
555
556
152
    return Status::OK();
557
152
}
558
559
Status FieldDescriptor::parse_map_field(const std::vector<tparquet::SchemaElement>& t_schemas,
560
76
                                        size_t curr_pos, FieldSchema* map_field) {
561
    // the map definition in parquet:
562
    // optional group <name> (MAP) {
563
    //   repeated group map (MAP_KEY_VALUE) {
564
    //     required <type> key;
565
    //     optional <type> value;
566
    //   }
567
    // }
568
    // Map value can be optional, the map without values is a SET
569
76
    if (curr_pos + 2 >= t_schemas.size()) {
570
0
        return Status::InvalidArgument("Map element should have at least three levels");
571
0
    }
572
76
    auto& map_schema = t_schemas[curr_pos];
573
76
    if (map_schema.num_children != 1) {
574
0
        return Status::InvalidArgument(
575
0
                "Map element should have only one child(name='map', type='MAP_KEY_VALUE')");
576
0
    }
577
76
    if (is_repeated_node(map_schema)) {
578
0
        return Status::InvalidArgument("Map element can't be a repeated schema");
579
0
    }
580
76
    auto& map_key_value = t_schemas[curr_pos + 1];
581
76
    if (!is_group_node(map_key_value) || !is_repeated_node(map_key_value)) {
582
0
        return Status::InvalidArgument(
583
0
                "the second level in map must be a repeated group(key and value)");
584
0
    }
585
76
    auto& map_key = t_schemas[curr_pos + 2];
586
76
    if (!is_required_node(map_key)) {
587
0
        LOG(WARNING) << "Filed " << map_schema.name << " is map type, but with nullable key column";
588
0
    }
589
590
76
    if (map_key_value.num_children == 1) {
591
        // The map with three levels is a SET
592
0
        return parse_list_field(t_schemas, curr_pos, map_field);
593
0
    }
594
76
    if (map_key_value.num_children != 2) {
595
        // A standard map should have four levels
596
0
        return Status::InvalidArgument(
597
0
                "the second level in map(MAP_KEY_VALUE) should have two children");
598
0
    }
599
    // standard map
600
76
    bool is_optional = is_optional_node(map_schema);
601
76
    if (is_optional) {
602
52
        map_field->definition_level++;
603
52
    }
604
76
    map_field->repetition_level++;
605
76
    map_field->definition_level++;
606
607
    // Directly create key and value children instead of intermediate key_value node
608
76
    map_field->children.resize(2);
609
    // map is a repeated node, we should set the `repeated_parent_def_level` of its children as `definition_level`
610
76
    set_child_node_level(map_field, map_field->definition_level);
611
612
76
    auto key_field = &map_field->children[0];
613
76
    auto value_field = &map_field->children[1];
614
615
    // Parse key and value fields directly from the key_value group's children
616
76
    _next_schema_pos = curr_pos + 2; // Skip key_value group, go directly to key
617
76
    RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos, key_field));
618
76
    RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos, value_field));
619
620
76
    map_field->name = map_schema.name;
621
76
    map_field->lower_case_name = to_lower(map_field->name);
622
76
    map_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
623
76
    map_field->data_type = std::make_shared<DataTypeMap>(make_nullable(key_field->data_type),
624
76
                                                         make_nullable(value_field->data_type));
625
76
    if (is_optional) {
626
52
        map_field->data_type = make_nullable(map_field->data_type);
627
52
    }
628
76
    map_field->field_id = map_schema.__isset.field_id ? map_schema.field_id : -1;
629
630
76
    return Status::OK();
631
76
}
632
633
Status FieldDescriptor::parse_struct_field(const std::vector<tparquet::SchemaElement>& t_schemas,
634
290
                                           size_t curr_pos, FieldSchema* struct_field) {
635
    // the nested column in parquet, parse group to struct.
636
290
    auto& struct_schema = t_schemas[curr_pos];
637
290
    bool is_optional = is_optional_node(struct_schema);
638
290
    if (is_optional) {
639
276
        struct_field->definition_level++;
640
276
    }
641
290
    auto num_children = struct_schema.num_children;
642
290
    struct_field->children.resize(num_children);
643
290
    set_child_node_level(struct_field, struct_field->repeated_parent_def_level);
644
290
    _next_schema_pos = curr_pos + 1;
645
1.06k
    for (int i = 0; i < num_children; ++i) {
646
776
        RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos, &struct_field->children[i]));
647
776
    }
648
290
    struct_field->name = struct_schema.name;
649
290
    struct_field->lower_case_name = to_lower(struct_field->name);
650
290
    struct_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
651
652
290
    struct_field->field_id = struct_schema.__isset.field_id ? struct_schema.field_id : -1;
653
290
    DataTypes res_data_types;
654
290
    std::vector<String> names;
655
1.06k
    for (int i = 0; i < num_children; ++i) {
656
776
        res_data_types.push_back(make_nullable(struct_field->children[i].data_type));
657
776
        names.push_back(struct_field->children[i].name);
658
776
    }
659
290
    struct_field->data_type = std::make_shared<DataTypeStruct>(res_data_types, names);
660
290
    if (is_optional) {
661
276
        struct_field->data_type = make_nullable(struct_field->data_type);
662
276
    }
663
290
    return Status::OK();
664
290
}
665
666
5
int FieldDescriptor::get_column_index(const std::string& column) const {
667
15
    for (int32_t i = 0; i < _fields.size(); i++) {
668
15
        if (_fields[i].name == column) {
669
5
            return i;
670
5
        }
671
15
    }
672
0
    return -1;
673
5
}
674
675
1.02k
FieldSchema* FieldDescriptor::get_column(const std::string& name) const {
676
1.02k
    auto it = _name_to_field.find(name);
677
1.02k
    if (it != _name_to_field.end()) {
678
1.02k
        return it->second;
679
1.02k
    }
680
0
    throw Exception(Status::InternalError("Name {} not found in FieldDescriptor!", name));
681
0
    return nullptr;
682
1.02k
}
683
684
14
void FieldDescriptor::get_column_names(std::unordered_set<std::string>* names) const {
685
14
    names->clear();
686
210
    for (const FieldSchema& f : _fields) {
687
210
        names->emplace(f.name);
688
210
    }
689
14
}
690
691
0
std::string FieldDescriptor::debug_string() const {
692
0
    std::stringstream ss;
693
0
    ss << "fields=[";
694
0
    for (int i = 0; i < _fields.size(); ++i) {
695
0
        if (i != 0) {
696
0
            ss << ", ";
697
0
        }
698
0
        ss << _fields[i].debug_string();
699
0
    }
700
0
    ss << "]";
701
0
    return ss.str();
702
0
}
703
704
40
void FieldDescriptor::assign_ids() {
705
40
    uint64_t next_id = 1;
706
371
    for (auto& field : _fields) {
707
371
        field.assign_ids(next_id);
708
371
    }
709
40
}
710
711
0
const FieldSchema* FieldDescriptor::find_column_by_id(uint64_t column_id) const {
712
0
    for (const auto& field : _fields) {
713
0
        if (auto result = field.find_column_by_id(column_id)) {
714
0
            return result;
715
0
        }
716
0
    }
717
0
    return nullptr;
718
0
}
719
720
1.91k
void FieldSchema::assign_ids(uint64_t& next_id) {
721
1.91k
    column_id = next_id++;
722
723
1.91k
    for (auto& child : children) {
724
1.54k
        child.assign_ids(next_id);
725
1.54k
    }
726
727
1.91k
    max_column_id = next_id - 1;
728
1.91k
}
729
730
0
const FieldSchema* FieldSchema::find_column_by_id(uint64_t target_id) const {
731
0
    if (column_id == target_id) {
732
0
        return this;
733
0
    }
734
735
0
    for (const auto& child : children) {
736
0
        if (auto result = child.find_column_by_id(target_id)) {
737
0
            return result;
738
0
        }
739
0
    }
740
741
0
    return nullptr;
742
0
}
743
744
413
uint64_t FieldSchema::get_column_id() const {
745
413
    return column_id;
746
413
}
747
748
0
void FieldSchema::set_column_id(uint64_t id) {
749
0
    column_id = id;
750
0
}
751
752
88
uint64_t FieldSchema::get_max_column_id() const {
753
88
    return max_column_id;
754
88
}
755
756
} // namespace doris