Coverage Report

Created: 2026-03-12 17:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/parquet/schema_desc.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/parquet/schema_desc.h"
19
20
#include <ctype.h>
21
22
#include <algorithm>
23
#include <ostream>
24
#include <utility>
25
26
#include "common/cast_set.h"
27
#include "common/logging.h"
28
#include "core/data_type/data_type_array.h"
29
#include "core/data_type/data_type_factory.hpp"
30
#include "core/data_type/data_type_map.h"
31
#include "core/data_type/data_type_struct.h"
32
#include "core/data_type/define_primitive_type.h"
33
#include "format/table/table_format_reader.h"
34
#include "util/slice.h"
35
#include "util/string_util.h"
36
37
namespace doris {
38
#include "common/compile_check_begin.h"
39
40
4.74k
static bool is_group_node(const tparquet::SchemaElement& schema) {
41
4.74k
    return schema.num_children > 0;
42
4.74k
}
43
44
784
static bool is_list_node(const tparquet::SchemaElement& schema) {
45
784
    return schema.__isset.converted_type && schema.converted_type == tparquet::ConvertedType::LIST;
46
784
}
47
48
1.05k
static bool is_map_node(const tparquet::SchemaElement& schema) {
49
1.05k
    return schema.__isset.converted_type &&
50
1.05k
           (schema.converted_type == tparquet::ConvertedType::MAP ||
51
735
            schema.converted_type == tparquet::ConvertedType::MAP_KEY_VALUE);
52
1.05k
}
53
54
3.98k
static bool is_repeated_node(const tparquet::SchemaElement& schema) {
55
3.98k
    return schema.__isset.repetition_type &&
56
3.98k
           schema.repetition_type == tparquet::FieldRepetitionType::REPEATED;
57
3.98k
}
58
59
267
static bool is_required_node(const tparquet::SchemaElement& schema) {
60
267
    return schema.__isset.repetition_type &&
61
267
           schema.repetition_type == tparquet::FieldRepetitionType::REQUIRED;
62
267
}
63
64
4.19k
static bool is_optional_node(const tparquet::SchemaElement& schema) {
65
4.19k
    return schema.__isset.repetition_type &&
66
4.19k
           schema.repetition_type == tparquet::FieldRepetitionType::OPTIONAL;
67
4.19k
}
68
69
468
static int num_children_node(const tparquet::SchemaElement& schema) {
70
468
    return schema.__isset.num_children ? schema.num_children : 0;
71
468
}
72
73
/**
74
 * `repeated_parent_def_level` is the definition level of the first ancestor node whose repetition_type equals REPEATED.
75
 * Empty array/map values are not stored in doris columns, so have to use `repeated_parent_def_level` to skip the
76
 * empty or null values in ancestor node.
77
 *
78
 * For instance, considering an array of strings with 3 rows like the following:
79
 * null, [], [a, b, c]
80
 * We can store four elements in data column: null, a, b, c
81
 * and the offsets column is: 1, 1, 4
82
 * and the null map is: 1, 0, 0
83
 * For the i-th row in array column: range from `offsets[i - 1]` until `offsets[i]` represents the elements in this row,
84
 * so we can't store empty array/map values in doris data column.
85
 * As a comparison, spark does not require `repeated_parent_def_level`,
86
 * because the spark column stores empty array/map values , and use anther length column to indicate empty values.
87
 * Please reference: https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
88
 *
89
 * Furthermore, we can also avoid store null array/map values in doris data column.
90
 * The same three rows as above, We can only store three elements in data column: a, b, c
91
 * and the offsets column is: 0, 0, 3
92
 * and the null map is: 1, 0, 0
93
 *
94
 * Inherit the repetition and definition level from parent node, if the parent node is repeated,
95
 * we should set repeated_parent_def_level = definition_level, otherwise as repeated_parent_def_level.
96
 * @param parent parent node
97
 * @param repeated_parent_def_level the first ancestor node whose repetition_type equals REPEATED
98
 */
99
1.05k
static void set_child_node_level(FieldSchema* parent, int16_t repeated_parent_def_level) {
100
1.85k
    for (auto& child : parent->children) {
101
1.85k
        child.repetition_level = parent->repetition_level;
102
1.85k
        child.definition_level = parent->definition_level;
103
1.85k
        child.repeated_parent_def_level = repeated_parent_def_level;
104
1.85k
    }
105
1.05k
}
106
107
468
static bool is_struct_list_node(const tparquet::SchemaElement& schema) {
108
468
    const std::string& name = schema.name;
109
468
    static const Slice array_slice("array", 5);
110
468
    static const Slice tuple_slice("_tuple", 6);
111
468
    Slice slice(name);
112
468
    return slice == array_slice || slice.ends_with(tuple_slice);
113
468
}
114
115
0
std::string FieldSchema::debug_string() const {
116
0
    std::stringstream ss;
117
0
    ss << "FieldSchema(name=" << name << ", R=" << repetition_level << ", D=" << definition_level;
118
0
    if (children.size() > 0) {
119
0
        ss << ", type=" << data_type->get_name() << ", children=[";
120
0
        for (int i = 0; i < children.size(); ++i) {
121
0
            if (i != 0) {
122
0
                ss << ", ";
123
0
            }
124
0
            ss << children[i].debug_string();
125
0
        }
126
0
        ss << "]";
127
0
    } else {
128
0
        ss << ", physical_type=" << physical_type;
129
0
        ss << " , doris_type=" << data_type->get_name();
130
0
    }
131
0
    ss << ")";
132
0
    return ss.str();
133
0
}
134
135
284
Status FieldDescriptor::parse_from_thrift(const std::vector<tparquet::SchemaElement>& t_schemas) {
136
284
    if (t_schemas.size() == 0 || !is_group_node(t_schemas[0])) {
137
0
        return Status::InvalidArgument("Wrong parquet root schema element");
138
0
    }
139
284
    const auto& root_schema = t_schemas[0];
140
284
    _fields.resize(root_schema.num_children);
141
284
    _next_schema_pos = 1;
142
143
2.62k
    for (int i = 0; i < root_schema.num_children; ++i) {
144
2.33k
        RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos, &_fields[i]));
145
2.33k
        if (_name_to_field.find(_fields[i].name) != _name_to_field.end()) {
146
0
            return Status::InvalidArgument("Duplicated field name: {}", _fields[i].name);
147
0
        }
148
2.33k
        _name_to_field.emplace(_fields[i].name, &_fields[i]);
149
2.33k
    }
150
151
284
    if (_next_schema_pos != t_schemas.size()) {
152
0
        return Status::InvalidArgument("Remaining {} unparsed schema elements",
153
0
                                       t_schemas.size() - _next_schema_pos);
154
0
    }
155
156
284
    return Status::OK();
157
284
}
158
159
Status FieldDescriptor::parse_node_field(const std::vector<tparquet::SchemaElement>& t_schemas,
160
4.19k
                                         size_t curr_pos, FieldSchema* node_field) {
161
4.19k
    if (curr_pos >= t_schemas.size()) {
162
0
        return Status::InvalidArgument("Out-of-bounds index of schema elements");
163
0
    }
164
4.19k
    auto& t_schema = t_schemas[curr_pos];
165
4.19k
    if (is_group_node(t_schema)) {
166
        // nested structure or nullable list
167
1.05k
        return parse_group_field(t_schemas, curr_pos, node_field);
168
1.05k
    }
169
3.13k
    if (is_repeated_node(t_schema)) {
170
        // repeated <primitive-type> <name> (LIST)
171
        // produce required list<element>
172
0
        node_field->repetition_level++;
173
0
        node_field->definition_level++;
174
0
        node_field->children.resize(1);
175
0
        set_child_node_level(node_field, node_field->definition_level);
176
0
        auto child = &node_field->children[0];
177
0
        parse_physical_field(t_schema, false, child);
178
179
0
        node_field->name = t_schema.name;
180
0
        node_field->lower_case_name = to_lower(t_schema.name);
181
0
        node_field->data_type = std::make_shared<DataTypeArray>(make_nullable(child->data_type));
182
0
        _next_schema_pos = curr_pos + 1;
183
0
        node_field->field_id = t_schema.__isset.field_id ? t_schema.field_id : -1;
184
3.13k
    } else {
185
3.13k
        bool is_optional = is_optional_node(t_schema);
186
3.13k
        if (is_optional) {
187
2.59k
            node_field->definition_level++;
188
2.59k
        }
189
3.13k
        parse_physical_field(t_schema, is_optional, node_field);
190
3.13k
        _next_schema_pos = curr_pos + 1;
191
3.13k
    }
192
3.13k
    return Status::OK();
193
4.19k
}
194
195
void FieldDescriptor::parse_physical_field(const tparquet::SchemaElement& physical_schema,
196
3.13k
                                           bool is_nullable, FieldSchema* physical_field) {
197
3.13k
    physical_field->name = physical_schema.name;
198
3.13k
    physical_field->lower_case_name = to_lower(physical_field->name);
199
3.13k
    physical_field->parquet_schema = physical_schema;
200
3.13k
    physical_field->physical_type = physical_schema.type;
201
3.13k
    physical_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
202
3.13k
    _physical_fields.push_back(physical_field);
203
3.13k
    physical_field->physical_column_index = cast_set<int>(_physical_fields.size() - 1);
204
3.13k
    auto type = get_doris_type(physical_schema, is_nullable);
205
3.13k
    physical_field->data_type = type.first;
206
3.13k
    physical_field->is_type_compatibility = type.second;
207
3.13k
    physical_field->field_id = physical_schema.__isset.field_id ? physical_schema.field_id : -1;
208
3.13k
}
209
210
std::pair<DataTypePtr, bool> FieldDescriptor::get_doris_type(
211
3.13k
        const tparquet::SchemaElement& physical_schema, bool nullable) {
212
3.13k
    std::pair<DataTypePtr, bool> ans = {std::make_shared<DataTypeNothing>(), false};
213
3.13k
    try {
214
3.13k
        if (physical_schema.__isset.logicalType) {
215
1.39k
            ans = convert_to_doris_type(physical_schema.logicalType, nullable);
216
1.74k
        } else if (physical_schema.__isset.converted_type) {
217
439
            ans = convert_to_doris_type(physical_schema, nullable);
218
439
        }
219
3.13k
    } catch (...) {
220
        // now the Not supported exception are ignored
221
        // so those byte_array maybe be treated as varbinary(now) : string(before)
222
0
    }
223
3.13k
    if (ans.first->get_primitive_type() == PrimitiveType::INVALID_TYPE) {
224
1.30k
        switch (physical_schema.type) {
225
147
        case tparquet::Type::BOOLEAN:
226
147
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_BOOLEAN, nullable);
227
147
            break;
228
436
        case tparquet::Type::INT32:
229
436
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_INT, nullable);
230
436
            break;
231
225
        case tparquet::Type::INT64:
232
225
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_BIGINT, nullable);
233
225
            break;
234
66
        case tparquet::Type::INT96:
235
66
            if (_enable_mapping_timestamp_tz) {
236
                // treat INT96 as TIMESTAMPTZ
237
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_TIMESTAMPTZ, nullable,
238
0
                                                                         0, 6);
239
66
            } else {
240
                // in most cases, it's a nano timestamp
241
66
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_DATETIMEV2, nullable,
242
66
                                                                         0, 6);
243
66
            }
244
66
            break;
245
128
        case tparquet::Type::FLOAT:
246
128
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_FLOAT, nullable);
247
128
            break;
248
251
        case tparquet::Type::DOUBLE:
249
251
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_DOUBLE, nullable);
250
251
            break;
251
52
        case tparquet::Type::BYTE_ARRAY:
252
52
            if (_enable_mapping_varbinary) {
253
                // if physical_schema not set logicalType and converted_type,
254
                // we treat BYTE_ARRAY as VARBINARY by default, so that we can read all data directly.
255
22
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_VARBINARY, nullable);
256
30
            } else {
257
30
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
258
30
            }
259
52
            break;
260
0
        case tparquet::Type::FIXED_LEN_BYTE_ARRAY:
261
0
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
262
0
            break;
263
0
        default:
264
0
            throw Exception(Status::InternalError("Not supported parquet logicalType{}",
265
0
                                                  physical_schema.type));
266
0
            break;
267
1.30k
        }
268
1.30k
    }
269
3.13k
    return ans;
270
3.13k
}
271
272
std::pair<DataTypePtr, bool> FieldDescriptor::convert_to_doris_type(
273
1.39k
        tparquet::LogicalType logicalType, bool nullable) {
274
1.39k
    std::pair<DataTypePtr, bool> ans = {std::make_shared<DataTypeNothing>(), false};
275
1.39k
    bool& is_type_compatibility = ans.second;
276
1.39k
    if (logicalType.__isset.STRING) {
277
927
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
278
927
    } else if (logicalType.__isset.DECIMAL) {
279
152
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_DECIMAL128I, nullable,
280
152
                                                                 logicalType.DECIMAL.precision,
281
152
                                                                 logicalType.DECIMAL.scale);
282
316
    } else if (logicalType.__isset.DATE) {
283
96
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_DATEV2, nullable);
284
220
    } else if (logicalType.__isset.INTEGER) {
285
122
        if (logicalType.INTEGER.isSigned) {
286
122
            if (logicalType.INTEGER.bitWidth <= 8) {
287
30
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_TINYINT, nullable);
288
92
            } else if (logicalType.INTEGER.bitWidth <= 16) {
289
92
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_SMALLINT, nullable);
290
92
            } else if (logicalType.INTEGER.bitWidth <= 32) {
291
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_INT, nullable);
292
0
            } else {
293
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_BIGINT, nullable);
294
0
            }
295
122
        } else {
296
0
            is_type_compatibility = true;
297
0
            if (logicalType.INTEGER.bitWidth <= 8) {
298
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_SMALLINT, nullable);
299
0
            } else if (logicalType.INTEGER.bitWidth <= 16) {
300
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_INT, nullable);
301
0
            } else if (logicalType.INTEGER.bitWidth <= 32) {
302
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_BIGINT, nullable);
303
0
            } else {
304
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_LARGEINT, nullable);
305
0
            }
306
0
        }
307
122
    } else if (logicalType.__isset.TIME) {
308
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_TIMEV2, nullable);
309
98
    } else if (logicalType.__isset.TIMESTAMP) {
310
94
        if (_enable_mapping_timestamp_tz) {
311
0
            if (logicalType.TIMESTAMP.isAdjustedToUTC) {
312
                // treat TIMESTAMP with isAdjustedToUTC as TIMESTAMPTZ
313
0
                ans.first = DataTypeFactory::instance().create_data_type(
314
0
                        TYPE_TIMESTAMPTZ, nullable, 0,
315
0
                        logicalType.TIMESTAMP.unit.__isset.MILLIS ? 3 : 6);
316
0
                return ans;
317
0
            }
318
0
        }
319
94
        ans.first = DataTypeFactory::instance().create_data_type(
320
94
                TYPE_DATETIMEV2, nullable, 0, logicalType.TIMESTAMP.unit.__isset.MILLIS ? 3 : 6);
321
94
    } else if (logicalType.__isset.JSON) {
322
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
323
4
    } else if (logicalType.__isset.UUID) {
324
4
        if (_enable_mapping_varbinary) {
325
3
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_VARBINARY, nullable, -1,
326
3
                                                                     -1, 16);
327
3
        } else {
328
1
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
329
1
        }
330
4
    } else if (logicalType.__isset.FLOAT16) {
331
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_FLOAT, nullable);
332
0
    } else {
333
0
        throw Exception(Status::InternalError("Not supported parquet logicalType"));
334
0
    }
335
1.39k
    return ans;
336
1.39k
}
337
338
std::pair<DataTypePtr, bool> FieldDescriptor::convert_to_doris_type(
339
439
        const tparquet::SchemaElement& physical_schema, bool nullable) {
340
439
    std::pair<DataTypePtr, bool> ans = {std::make_shared<DataTypeNothing>(), false};
341
439
    bool& is_type_compatibility = ans.second;
342
439
    switch (physical_schema.converted_type) {
343
178
    case tparquet::ConvertedType::type::UTF8:
344
178
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
345
178
        break;
346
78
    case tparquet::ConvertedType::type::DECIMAL:
347
78
        ans.first = DataTypeFactory::instance().create_data_type(
348
78
                TYPE_DECIMAL128I, nullable, physical_schema.precision, physical_schema.scale);
349
78
        break;
350
72
    case tparquet::ConvertedType::type::DATE:
351
72
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_DATEV2, nullable);
352
72
        break;
353
0
    case tparquet::ConvertedType::type::TIME_MILLIS:
354
0
        [[fallthrough]];
355
0
    case tparquet::ConvertedType::type::TIME_MICROS:
356
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_TIMEV2, nullable);
357
0
        break;
358
0
    case tparquet::ConvertedType::type::TIMESTAMP_MILLIS:
359
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_DATETIMEV2, nullable, 0, 3);
360
0
        break;
361
24
    case tparquet::ConvertedType::type::TIMESTAMP_MICROS:
362
24
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_DATETIMEV2, nullable, 0, 6);
363
24
        break;
364
42
    case tparquet::ConvertedType::type::INT_8:
365
42
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_TINYINT, nullable);
366
42
        break;
367
0
    case tparquet::ConvertedType::type::UINT_8:
368
0
        is_type_compatibility = true;
369
0
        [[fallthrough]];
370
45
    case tparquet::ConvertedType::type::INT_16:
371
45
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_SMALLINT, nullable);
372
45
        break;
373
0
    case tparquet::ConvertedType::type::UINT_16:
374
0
        is_type_compatibility = true;
375
0
        [[fallthrough]];
376
0
    case tparquet::ConvertedType::type::INT_32:
377
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_INT, nullable);
378
0
        break;
379
0
    case tparquet::ConvertedType::type::UINT_32:
380
0
        is_type_compatibility = true;
381
0
        [[fallthrough]];
382
0
    case tparquet::ConvertedType::type::INT_64:
383
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_BIGINT, nullable);
384
0
        break;
385
0
    case tparquet::ConvertedType::type::UINT_64:
386
0
        is_type_compatibility = true;
387
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_LARGEINT, nullable);
388
0
        break;
389
0
    case tparquet::ConvertedType::type::JSON:
390
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
391
0
        break;
392
0
    default:
393
0
        throw Exception(Status::InternalError("Not supported parquet ConvertedType: {}",
394
0
                                              physical_schema.converted_type));
395
439
    }
396
439
    return ans;
397
439
}
398
399
Status FieldDescriptor::parse_group_field(const std::vector<tparquet::SchemaElement>& t_schemas,
400
1.05k
                                          size_t curr_pos, FieldSchema* group_field) {
401
1.05k
    auto& group_schema = t_schemas[curr_pos];
402
1.05k
    if (is_map_node(group_schema)) {
403
        // the map definition:
404
        // optional group <name> (MAP) {
405
        //   repeated group map (MAP_KEY_VALUE) {
406
        //     required <type> key;
407
        //     optional <type> value;
408
        //   }
409
        // }
410
267
        return parse_map_field(t_schemas, curr_pos, group_field);
411
267
    }
412
784
    if (is_list_node(group_schema)) {
413
        // the list definition:
414
        // optional group <name> (LIST) {
415
        //   repeated group [bag | list] { // hive or spark
416
        //     optional <type> [array_element | element]; // hive or spark
417
        //   }
418
        // }
419
468
        return parse_list_field(t_schemas, curr_pos, group_field);
420
468
    }
421
422
316
    if (is_repeated_node(group_schema)) {
423
0
        group_field->repetition_level++;
424
0
        group_field->definition_level++;
425
0
        group_field->children.resize(1);
426
0
        set_child_node_level(group_field, group_field->definition_level);
427
0
        auto struct_field = &group_field->children[0];
428
        // the list of struct:
429
        // repeated group <name> (LIST) {
430
        //   optional/required <type> <name>;
431
        //   ...
432
        // }
433
        // produce a non-null list<struct>
434
0
        RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos, struct_field));
435
436
0
        group_field->name = group_schema.name;
437
0
        group_field->lower_case_name = to_lower(group_field->name);
438
0
        group_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
439
0
        group_field->data_type =
440
0
                std::make_shared<DataTypeArray>(make_nullable(struct_field->data_type));
441
0
        group_field->field_id = group_schema.__isset.field_id ? group_schema.field_id : -1;
442
316
    } else {
443
316
        RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos, group_field));
444
316
    }
445
446
316
    return Status::OK();
447
316
}
448
449
Status FieldDescriptor::parse_list_field(const std::vector<tparquet::SchemaElement>& t_schemas,
450
468
                                         size_t curr_pos, FieldSchema* list_field) {
451
    // the list definition:
452
    // spark and hive have three level schemas but with different schema name
453
    // spark: <column-name> - "list" - "element"
454
    // hive: <column-name> - "bag" - "array_element"
455
    // parse three level schemas to two level primitive like: LIST<INT>,
456
    // or nested structure like: LIST<MAP<INT, INT>>
457
468
    auto& first_level = t_schemas[curr_pos];
458
468
    if (first_level.num_children != 1) {
459
0
        return Status::InvalidArgument("List element should have only one child");
460
0
    }
461
462
468
    if (curr_pos + 1 >= t_schemas.size()) {
463
0
        return Status::InvalidArgument("List element should have the second level schema");
464
0
    }
465
466
468
    if (first_level.repetition_type == tparquet::FieldRepetitionType::REPEATED) {
467
0
        return Status::InvalidArgument("List element can't be a repeated schema");
468
0
    }
469
470
    // the repeated schema element
471
468
    auto& second_level = t_schemas[curr_pos + 1];
472
468
    if (second_level.repetition_type != tparquet::FieldRepetitionType::REPEATED) {
473
0
        return Status::InvalidArgument("The second level of list element should be repeated");
474
0
    }
475
476
    // This indicates if this list is nullable.
477
468
    bool is_optional = is_optional_node(first_level);
478
468
    if (is_optional) {
479
421
        list_field->definition_level++;
480
421
    }
481
468
    list_field->repetition_level++;
482
468
    list_field->definition_level++;
483
468
    list_field->children.resize(1);
484
468
    FieldSchema* list_child = &list_field->children[0];
485
486
468
    size_t num_children = num_children_node(second_level);
487
468
    if (num_children > 0) {
488
468
        if (num_children == 1 && !is_struct_list_node(second_level)) {
489
            // optional field, and the third level element is the nested structure in list
490
            // produce nested structure like: LIST<INT>, LIST<MAP>, LIST<LIST<...>>
491
            // skip bag/list, it's a repeated element.
492
468
            set_child_node_level(list_field, list_field->definition_level);
493
468
            RETURN_IF_ERROR(parse_node_field(t_schemas, curr_pos + 2, list_child));
494
468
        } else {
495
            // required field, produce the list of struct
496
0
            set_child_node_level(list_field, list_field->definition_level);
497
0
            RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos + 1, list_child));
498
0
        }
499
468
    } else if (num_children == 0) {
500
        // required two level list, for compatibility reason.
501
0
        set_child_node_level(list_field, list_field->definition_level);
502
0
        parse_physical_field(second_level, false, list_child);
503
0
        _next_schema_pos = curr_pos + 2;
504
0
    }
505
506
468
    list_field->name = first_level.name;
507
468
    list_field->lower_case_name = to_lower(first_level.name);
508
468
    list_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
509
468
    list_field->data_type =
510
468
            std::make_shared<DataTypeArray>(make_nullable(list_field->children[0].data_type));
511
468
    if (is_optional) {
512
421
        list_field->data_type = make_nullable(list_field->data_type);
513
421
    }
514
468
    list_field->field_id = first_level.__isset.field_id ? first_level.field_id : -1;
515
516
468
    return Status::OK();
517
468
}
518
519
Status FieldDescriptor::parse_map_field(const std::vector<tparquet::SchemaElement>& t_schemas,
520
267
                                        size_t curr_pos, FieldSchema* map_field) {
521
    // the map definition in parquet:
522
    // optional group <name> (MAP) {
523
    //   repeated group map (MAP_KEY_VALUE) {
524
    //     required <type> key;
525
    //     optional <type> value;
526
    //   }
527
    // }
528
    // Map value can be optional, the map without values is a SET
529
267
    if (curr_pos + 2 >= t_schemas.size()) {
530
0
        return Status::InvalidArgument("Map element should have at least three levels");
531
0
    }
532
267
    auto& map_schema = t_schemas[curr_pos];
533
267
    if (map_schema.num_children != 1) {
534
0
        return Status::InvalidArgument(
535
0
                "Map element should have only one child(name='map', type='MAP_KEY_VALUE')");
536
0
    }
537
267
    if (is_repeated_node(map_schema)) {
538
0
        return Status::InvalidArgument("Map element can't be a repeated schema");
539
0
    }
540
267
    auto& map_key_value = t_schemas[curr_pos + 1];
541
267
    if (!is_group_node(map_key_value) || !is_repeated_node(map_key_value)) {
542
0
        return Status::InvalidArgument(
543
0
                "the second level in map must be a repeated group(key and value)");
544
0
    }
545
267
    auto& map_key = t_schemas[curr_pos + 2];
546
267
    if (!is_required_node(map_key)) {
547
0
        LOG(WARNING) << "Filed " << map_schema.name << " is map type, but with nullable key column";
548
0
    }
549
550
267
    if (map_key_value.num_children == 1) {
551
        // The map with three levels is a SET
552
0
        return parse_list_field(t_schemas, curr_pos, map_field);
553
0
    }
554
267
    if (map_key_value.num_children != 2) {
555
        // A standard map should have four levels
556
0
        return Status::InvalidArgument(
557
0
                "the second level in map(MAP_KEY_VALUE) should have two children");
558
0
    }
559
    // standard map
560
267
    bool is_optional = is_optional_node(map_schema);
561
267
    if (is_optional) {
562
239
        map_field->definition_level++;
563
239
    }
564
267
    map_field->repetition_level++;
565
267
    map_field->definition_level++;
566
567
    // Directly create key and value children instead of intermediate key_value node
568
267
    map_field->children.resize(2);
569
    // map is a repeated node, we should set the `repeated_parent_def_level` of its children as `definition_level`
570
267
    set_child_node_level(map_field, map_field->definition_level);
571
572
267
    auto key_field = &map_field->children[0];
573
267
    auto value_field = &map_field->children[1];
574
575
    // Parse key and value fields directly from the key_value group's children
576
267
    _next_schema_pos = curr_pos + 2; // Skip key_value group, go directly to key
577
267
    RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos, key_field));
578
267
    RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos, value_field));
579
580
267
    map_field->name = map_schema.name;
581
267
    map_field->lower_case_name = to_lower(map_field->name);
582
267
    map_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
583
267
    map_field->data_type = std::make_shared<DataTypeMap>(make_nullable(key_field->data_type),
584
267
                                                         make_nullable(value_field->data_type));
585
267
    if (is_optional) {
586
239
        map_field->data_type = make_nullable(map_field->data_type);
587
239
    }
588
267
    map_field->field_id = map_schema.__isset.field_id ? map_schema.field_id : -1;
589
590
267
    return Status::OK();
591
267
}
592
593
Status FieldDescriptor::parse_struct_field(const std::vector<tparquet::SchemaElement>& t_schemas,
594
316
                                           size_t curr_pos, FieldSchema* struct_field) {
595
    // the nested column in parquet, parse group to struct.
596
316
    auto& struct_schema = t_schemas[curr_pos];
597
316
    bool is_optional = is_optional_node(struct_schema);
598
316
    if (is_optional) {
599
298
        struct_field->definition_level++;
600
298
    }
601
316
    auto num_children = struct_schema.num_children;
602
316
    struct_field->children.resize(num_children);
603
316
    set_child_node_level(struct_field, struct_field->repeated_parent_def_level);
604
316
    _next_schema_pos = curr_pos + 1;
605
1.16k
    for (int i = 0; i < num_children; ++i) {
606
852
        RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos, &struct_field->children[i]));
607
852
    }
608
316
    struct_field->name = struct_schema.name;
609
316
    struct_field->lower_case_name = to_lower(struct_field->name);
610
316
    struct_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
611
612
316
    struct_field->field_id = struct_schema.__isset.field_id ? struct_schema.field_id : -1;
613
316
    DataTypes res_data_types;
614
316
    std::vector<String> names;
615
1.16k
    for (int i = 0; i < num_children; ++i) {
616
852
        res_data_types.push_back(make_nullable(struct_field->children[i].data_type));
617
852
        names.push_back(struct_field->children[i].name);
618
852
    }
619
316
    struct_field->data_type = std::make_shared<DataTypeStruct>(res_data_types, names);
620
316
    if (is_optional) {
621
298
        struct_field->data_type = make_nullable(struct_field->data_type);
622
298
    }
623
316
    return Status::OK();
624
316
}
625
626
5
int FieldDescriptor::get_column_index(const std::string& column) const {
627
15
    for (int32_t i = 0; i < _fields.size(); i++) {
628
15
        if (_fields[i].name == column) {
629
5
            return i;
630
5
        }
631
15
    }
632
0
    return -1;
633
5
}
634
635
4.79k
FieldSchema* FieldDescriptor::get_column(const std::string& name) const {
636
4.79k
    auto it = _name_to_field.find(name);
637
4.79k
    if (it != _name_to_field.end()) {
638
4.79k
        return it->second;
639
4.79k
    }
640
0
    throw Exception(Status::InternalError("Name {} not found in FieldDescriptor!", name));
641
0
    return nullptr;
642
4.79k
}
643
644
127
void FieldDescriptor::get_column_names(std::unordered_set<std::string>* names) const {
645
127
    names->clear();
646
1.24k
    for (const FieldSchema& f : _fields) {
647
1.24k
        names->emplace(f.name);
648
1.24k
    }
649
127
}
650
651
0
std::string FieldDescriptor::debug_string() const {
652
0
    std::stringstream ss;
653
0
    ss << "fields=[";
654
0
    for (int i = 0; i < _fields.size(); ++i) {
655
0
        if (i != 0) {
656
0
            ss << ", ";
657
0
        }
658
0
        ss << _fields[i].debug_string();
659
0
    }
660
0
    ss << "]";
661
0
    return ss.str();
662
0
}
663
664
34
void FieldDescriptor::assign_ids() {
665
34
    uint64_t next_id = 1;
666
360
    for (auto& field : _fields) {
667
360
        field.assign_ids(next_id);
668
360
    }
669
34
}
670
671
0
const FieldSchema* FieldDescriptor::find_column_by_id(uint64_t column_id) const {
672
0
    for (const auto& field : _fields) {
673
0
        if (auto result = field.find_column_by_id(column_id)) {
674
0
            return result;
675
0
        }
676
0
    }
677
0
    return nullptr;
678
0
}
679
680
1.91k
void FieldSchema::assign_ids(uint64_t& next_id) {
681
1.91k
    column_id = next_id++;
682
683
1.91k
    for (auto& child : children) {
684
1.55k
        child.assign_ids(next_id);
685
1.55k
    }
686
687
1.91k
    max_column_id = next_id - 1;
688
1.91k
}
689
690
0
const FieldSchema* FieldSchema::find_column_by_id(uint64_t target_id) const {
691
0
    if (column_id == target_id) {
692
0
        return this;
693
0
    }
694
695
0
    for (const auto& child : children) {
696
0
        if (auto result = child.find_column_by_id(target_id)) {
697
0
            return result;
698
0
        }
699
0
    }
700
701
0
    return nullptr;
702
0
}
703
704
373
uint64_t FieldSchema::get_column_id() const {
705
373
    return column_id;
706
373
}
707
708
0
void FieldSchema::set_column_id(uint64_t id) {
709
0
    column_id = id;
710
0
}
711
712
92
uint64_t FieldSchema::get_max_column_id() const {
713
92
    return max_column_id;
714
92
}
715
716
#include "common/compile_check_end.h"
717
718
} // namespace doris