Coverage Report

Created: 2026-05-17 00:41

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/parquet/schema_desc.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/parquet/schema_desc.h"
19
20
#include <algorithm>
21
#include <ostream>
22
#include <utility>
23
24
#include "common/cast_set.h"
25
#include "common/logging.h"
26
#include "core/data_type/data_type_array.h"
27
#include "core/data_type/data_type_factory.hpp"
28
#include "core/data_type/data_type_map.h"
29
#include "core/data_type/data_type_struct.h"
30
#include "core/data_type/data_type_variant.h"
31
#include "core/data_type/define_primitive_type.h"
32
#include "format/generic_reader.h"
33
#include "format/table/table_schema_change_helper.h"
34
#include "util/slice.h"
35
#include "util/string_util.h"
36
37
namespace doris {
38
39
2.41k
static bool is_group_node(const tparquet::SchemaElement& schema) {
40
2.41k
    return schema.num_children > 0;
41
2.41k
}
42
43
443
static bool is_list_node(const tparquet::SchemaElement& schema) {
44
443
    return schema.__isset.converted_type && schema.converted_type == tparquet::ConvertedType::LIST;
45
443
}
46
47
519
static bool is_map_node(const tparquet::SchemaElement& schema) {
48
519
    return schema.__isset.converted_type &&
49
519
           (schema.converted_type == tparquet::ConvertedType::MAP ||
50
228
            schema.converted_type == tparquet::ConvertedType::MAP_KEY_VALUE);
51
519
}
52
53
2.15k
static bool is_repeated_node(const tparquet::SchemaElement& schema) {
54
2.15k
    return schema.__isset.repetition_type &&
55
2.15k
           schema.repetition_type == tparquet::FieldRepetitionType::REPEATED;
56
2.15k
}
57
58
76
static bool is_required_node(const tparquet::SchemaElement& schema) {
59
76
    return schema.__isset.repetition_type &&
60
76
           schema.repetition_type == tparquet::FieldRepetitionType::REQUIRED;
61
76
}
62
63
2.23k
static bool is_optional_node(const tparquet::SchemaElement& schema) {
64
2.23k
    return schema.__isset.repetition_type &&
65
2.23k
           schema.repetition_type == tparquet::FieldRepetitionType::OPTIONAL;
66
2.23k
}
67
68
522
static bool is_variant_node(const tparquet::SchemaElement& schema) {
69
522
    return schema.__isset.logicalType && schema.logicalType.__isset.VARIANT;
70
522
}
71
72
4
static void mark_variant_subfields(FieldSchema* field) {
73
4
    field->is_in_variant = true;
74
4
    for (auto& child : field->children) {
75
3
        mark_variant_subfields(&child);
76
3
    }
77
4
}
78
79
152
static int num_children_node(const tparquet::SchemaElement& schema) {
80
152
    return schema.__isset.num_children ? schema.num_children : 0;
81
152
}
82
83
/**
84
 * `repeated_parent_def_level` is the definition level of the first ancestor node whose repetition_type equals REPEATED.
85
 * Empty array/map values are not stored in doris columns, so have to use `repeated_parent_def_level` to skip the
86
 * empty or null values in ancestor node.
87
 *
88
 * For instance, considering an array of strings with 3 rows like the following:
89
 * null, [], [a, b, c]
90
 * We can store four elements in data column: null, a, b, c
91
 * and the offsets column is: 1, 1, 4
92
 * and the null map is: 1, 0, 0
93
 * For the i-th row in array column: range from `offsets[i - 1]` until `offsets[i]` represents the elements in this row,
94
 * so we can't store empty array/map values in doris data column.
95
 * As a comparison, spark does not require `repeated_parent_def_level`,
96
 * because the spark column stores empty array/map values , and use anther length column to indicate empty values.
97
 * Please reference: https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
98
 *
99
 * Furthermore, we can also avoid store null array/map values in doris data column.
100
 * The same three rows as above, We can only store three elements in data column: a, b, c
101
 * and the offsets column is: 0, 0, 3
102
 * and the null map is: 1, 0, 0
103
 *
104
 * Inherit the repetition and definition level from parent node, if the parent node is repeated,
105
 * we should set repeated_parent_def_level = definition_level, otherwise as repeated_parent_def_level.
106
 * @param parent parent node
107
 * @param repeated_parent_def_level the first ancestor node whose repetition_type equals REPEATED
108
 */
109
522
static void set_child_node_level(FieldSchema* parent, int16_t repeated_parent_def_level) {
110
1.08k
    for (auto& child : parent->children) {
111
1.08k
        child.repetition_level = parent->repetition_level;
112
1.08k
        child.definition_level = parent->definition_level;
113
1.08k
        child.repeated_parent_def_level = repeated_parent_def_level;
114
1.08k
    }
115
522
}
116
117
152
static bool is_struct_list_node(const tparquet::SchemaElement& schema) {
118
152
    const std::string& name = schema.name;
119
152
    static const Slice array_slice("array", 5);
120
152
    static const Slice tuple_slice("_tuple", 6);
121
152
    Slice slice(name);
122
152
    return slice == array_slice || slice.ends_with(tuple_slice);
123
152
}
124
125
0
std::string FieldSchema::debug_string() const {
126
0
    std::stringstream ss;
127
0
    ss << "FieldSchema(name=" << name << ", R=" << repetition_level << ", D=" << definition_level;
128
0
    if (children.size() > 0) {
129
0
        ss << ", type=" << data_type->get_name() << ", children=[";
130
0
        for (int i = 0; i < children.size(); ++i) {
131
0
            if (i != 0) {
132
0
                ss << ", ";
133
0
            }
134
0
            ss << children[i].debug_string();
135
0
        }
136
0
        ss << "]";
137
0
    } else {
138
0
        ss << ", physical_type=" << physical_type;
139
0
        ss << " , doris_type=" << data_type->get_name();
140
0
    }
141
0
    ss << ")";
142
0
    return ss.str();
143
0
}
144
145
110
Status FieldDescriptor::parse_from_thrift(const std::vector<tparquet::SchemaElement>& t_schemas) {
146
110
    if (t_schemas.size() == 0 || !is_group_node(t_schemas[0])) {
147
0
        return Status::InvalidArgument("Wrong parquet root schema element");
148
0
    }
149
110
    const auto& root_schema = t_schemas[0];
150
110
    _fields.resize(root_schema.num_children);
151
110
    _next_schema_pos = 1;
152
153
1.25k
    for (int i = 0; i < root_schema.num_children; ++i) {
154
1.14k
        RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos, &_fields[i]));
155
1.14k
        if (_name_to_field.find(_fields[i].name) != _name_to_field.end()) {
156
0
            return Status::InvalidArgument("Duplicated field name: {}", _fields[i].name);
157
0
        }
158
1.14k
        _name_to_field.emplace(_fields[i].name, &_fields[i]);
159
1.14k
    }
160
161
108
    if (_next_schema_pos != t_schemas.size()) {
162
0
        return Status::InvalidArgument("Remaining {} unparsed schema elements",
163
0
                                       t_schemas.size() - _next_schema_pos);
164
0
    }
165
166
108
    return Status::OK();
167
108
}
168
169
Status FieldDescriptor::parse_node_field(const std::vector<tparquet::SchemaElement>& t_schemas,
170
2.23k
                                         size_t curr_pos, FieldSchema* node_field) {
171
2.23k
    if (curr_pos >= t_schemas.size()) {
172
0
        return Status::InvalidArgument("Out-of-bounds index of schema elements");
173
0
    }
174
2.23k
    auto& t_schema = t_schemas[curr_pos];
175
2.23k
    if (is_group_node(t_schema)) {
176
        // nested structure or nullable list
177
522
        return parse_group_field(t_schemas, curr_pos, node_field);
178
522
    }
179
1.71k
    if (is_repeated_node(t_schema)) {
180
        // repeated <primitive-type> <name> (LIST)
181
        // produce required list<element>
182
0
        node_field->repetition_level++;
183
0
        node_field->definition_level++;
184
0
        node_field->children.resize(1);
185
0
        set_child_node_level(node_field, node_field->definition_level);
186
0
        auto child = &node_field->children[0];
187
0
        parse_physical_field(t_schema, false, child);
188
189
0
        node_field->name = t_schema.name;
190
0
        node_field->lower_case_name = to_lower(t_schema.name);
191
0
        node_field->data_type = std::make_shared<DataTypeArray>(make_nullable(child->data_type));
192
0
        _next_schema_pos = curr_pos + 1;
193
0
        node_field->field_id = t_schema.__isset.field_id ? t_schema.field_id : -1;
194
1.71k
    } else {
195
1.71k
        bool is_optional = is_optional_node(t_schema);
196
1.71k
        if (is_optional) {
197
1.59k
            node_field->definition_level++;
198
1.59k
        }
199
1.71k
        parse_physical_field(t_schema, is_optional, node_field);
200
1.71k
        _next_schema_pos = curr_pos + 1;
201
1.71k
    }
202
1.71k
    return Status::OK();
203
2.23k
}
204
205
void FieldDescriptor::parse_physical_field(const tparquet::SchemaElement& physical_schema,
206
1.71k
                                           bool is_nullable, FieldSchema* physical_field) {
207
1.71k
    physical_field->name = physical_schema.name;
208
1.71k
    physical_field->lower_case_name = to_lower(physical_field->name);
209
1.71k
    physical_field->parquet_schema = physical_schema;
210
1.71k
    physical_field->physical_type = physical_schema.type;
211
1.71k
    physical_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
212
1.71k
    _physical_fields.push_back(physical_field);
213
1.71k
    physical_field->physical_column_index = cast_set<int>(_physical_fields.size() - 1);
214
1.71k
    auto type = get_doris_type(physical_schema, is_nullable);
215
1.71k
    physical_field->data_type = type.first;
216
1.71k
    physical_field->is_type_compatibility = type.second;
217
1.71k
    physical_field->field_id = physical_schema.__isset.field_id ? physical_schema.field_id : -1;
218
1.71k
}
219
220
std::pair<DataTypePtr, bool> FieldDescriptor::get_doris_type(
221
1.71k
        const tparquet::SchemaElement& physical_schema, bool nullable) {
222
1.71k
    std::pair<DataTypePtr, bool> ans = {std::make_shared<DataTypeNothing>(), false};
223
1.71k
    try {
224
1.71k
        if (physical_schema.__isset.logicalType) {
225
716
            ans = convert_to_doris_type(physical_schema.logicalType, nullable);
226
994
        } else if (physical_schema.__isset.converted_type) {
227
226
            ans = convert_to_doris_type(physical_schema, nullable);
228
226
        }
229
1.71k
    } catch (...) {
230
        // now the Not supported exception are ignored
231
        // so those byte_array maybe be treated as varbinary(now) : string(before)
232
0
    }
233
1.71k
    if (ans.first->get_primitive_type() == PrimitiveType::INVALID_TYPE) {
234
768
        switch (physical_schema.type) {
235
93
        case tparquet::Type::BOOLEAN:
236
93
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_BOOLEAN, nullable);
237
93
            break;
238
191
        case tparquet::Type::INT32:
239
191
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_INT, nullable);
240
191
            break;
241
125
        case tparquet::Type::INT64:
242
125
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_BIGINT, nullable);
243
125
            break;
244
12
        case tparquet::Type::INT96:
245
12
            if (_enable_mapping_timestamp_tz) {
246
                // treat INT96 as TIMESTAMPTZ
247
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_TIMESTAMPTZ, nullable,
248
0
                                                                         0, 6);
249
12
            } else {
250
                // in most cases, it's a nano timestamp
251
12
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_DATETIMEV2, nullable,
252
12
                                                                         0, 6);
253
12
            }
254
12
            break;
255
93
        case tparquet::Type::FLOAT:
256
93
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_FLOAT, nullable);
257
93
            break;
258
205
        case tparquet::Type::DOUBLE:
259
205
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_DOUBLE, nullable);
260
205
            break;
261
49
        case tparquet::Type::BYTE_ARRAY:
262
49
            if (_enable_mapping_varbinary) {
263
                // if physical_schema not set logicalType and converted_type,
264
                // we treat BYTE_ARRAY as VARBINARY by default, so that we can read all data directly.
265
22
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_VARBINARY, nullable);
266
27
            } else {
267
27
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
268
27
            }
269
49
            break;
270
0
        case tparquet::Type::FIXED_LEN_BYTE_ARRAY:
271
0
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
272
0
            break;
273
0
        default:
274
0
            throw Exception(Status::InternalError("Not supported parquet logicalType{}",
275
0
                                                  physical_schema.type));
276
0
            break;
277
768
        }
278
768
    }
279
1.71k
    return ans;
280
1.71k
}
281
282
std::pair<DataTypePtr, bool> FieldDescriptor::convert_to_doris_type(
283
716
        tparquet::LogicalType logicalType, bool nullable) {
284
716
    std::pair<DataTypePtr, bool> ans = {std::make_shared<DataTypeNothing>(), false};
285
716
    bool& is_type_compatibility = ans.second;
286
716
    if (logicalType.__isset.STRING) {
287
472
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
288
472
    } else if (logicalType.__isset.DECIMAL) {
289
114
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_DECIMAL128I, nullable,
290
114
                                                                 logicalType.DECIMAL.precision,
291
114
                                                                 logicalType.DECIMAL.scale);
292
130
    } else if (logicalType.__isset.DATE) {
293
57
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_DATEV2, nullable);
294
73
    } else if (logicalType.__isset.INTEGER) {
295
0
        if (logicalType.INTEGER.isSigned) {
296
0
            if (logicalType.INTEGER.bitWidth <= 8) {
297
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_TINYINT, nullable);
298
0
            } else if (logicalType.INTEGER.bitWidth <= 16) {
299
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_SMALLINT, nullable);
300
0
            } else if (logicalType.INTEGER.bitWidth <= 32) {
301
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_INT, nullable);
302
0
            } else {
303
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_BIGINT, nullable);
304
0
            }
305
0
        } else {
306
0
            is_type_compatibility = true;
307
0
            if (logicalType.INTEGER.bitWidth <= 8) {
308
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_SMALLINT, nullable);
309
0
            } else if (logicalType.INTEGER.bitWidth <= 16) {
310
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_INT, nullable);
311
0
            } else if (logicalType.INTEGER.bitWidth <= 32) {
312
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_BIGINT, nullable);
313
0
            } else {
314
0
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_LARGEINT, nullable);
315
0
            }
316
0
        }
317
73
    } else if (logicalType.__isset.TIME) {
318
0
        ans.first = DataTypeFactory::instance().create_data_type(
319
0
                TYPE_TIMEV2, nullable, 0, logicalType.TIME.unit.__isset.MILLIS ? 3 : 6);
320
73
    } else if (logicalType.__isset.TIMESTAMP) {
321
69
        if (_enable_mapping_timestamp_tz) {
322
0
            if (logicalType.TIMESTAMP.isAdjustedToUTC) {
323
                // treat TIMESTAMP with isAdjustedToUTC as TIMESTAMPTZ
324
0
                ans.first = DataTypeFactory::instance().create_data_type(
325
0
                        TYPE_TIMESTAMPTZ, nullable, 0,
326
0
                        logicalType.TIMESTAMP.unit.__isset.MILLIS ? 3 : 6);
327
0
                return ans;
328
0
            }
329
0
        }
330
69
        ans.first = DataTypeFactory::instance().create_data_type(
331
69
                TYPE_DATETIMEV2, nullable, 0, logicalType.TIMESTAMP.unit.__isset.MILLIS ? 3 : 6);
332
69
    } else if (logicalType.__isset.JSON) {
333
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
334
4
    } else if (logicalType.__isset.UUID) {
335
4
        if (_enable_mapping_varbinary) {
336
3
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_VARBINARY, nullable, -1,
337
3
                                                                     -1, 16);
338
3
        } else {
339
1
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
340
1
        }
341
4
    } else if (logicalType.__isset.FLOAT16) {
342
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_FLOAT, nullable);
343
0
    } else {
344
0
        throw Exception(Status::InternalError("Not supported parquet logicalType"));
345
0
    }
346
716
    return ans;
347
716
}
348
349
std::pair<DataTypePtr, bool> FieldDescriptor::convert_to_doris_type(
350
226
        const tparquet::SchemaElement& physical_schema, bool nullable) {
351
226
    std::pair<DataTypePtr, bool> ans = {std::make_shared<DataTypeNothing>(), false};
352
226
    bool& is_type_compatibility = ans.second;
353
226
    switch (physical_schema.converted_type) {
354
106
    case tparquet::ConvertedType::type::UTF8:
355
106
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
356
106
        break;
357
24
    case tparquet::ConvertedType::type::DECIMAL:
358
24
        ans.first = DataTypeFactory::instance().create_data_type(
359
24
                TYPE_DECIMAL128I, nullable, physical_schema.precision, physical_schema.scale);
360
24
        break;
361
24
    case tparquet::ConvertedType::type::DATE:
362
24
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_DATEV2, nullable);
363
24
        break;
364
0
    case tparquet::ConvertedType::type::TIME_MILLIS:
365
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_TIMEV2, nullable, 0, 3);
366
0
        break;
367
0
    case tparquet::ConvertedType::type::TIME_MICROS:
368
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_TIMEV2, nullable, 0, 6);
369
0
        break;
370
0
    case tparquet::ConvertedType::type::TIMESTAMP_MILLIS:
371
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_DATETIMEV2, nullable, 0, 3);
372
0
        break;
373
24
    case tparquet::ConvertedType::type::TIMESTAMP_MICROS:
374
24
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_DATETIMEV2, nullable, 0, 6);
375
24
        break;
376
24
    case tparquet::ConvertedType::type::INT_8:
377
24
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_TINYINT, nullable);
378
24
        break;
379
0
    case tparquet::ConvertedType::type::UINT_8:
380
0
        is_type_compatibility = true;
381
0
        [[fallthrough]];
382
24
    case tparquet::ConvertedType::type::INT_16:
383
24
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_SMALLINT, nullable);
384
24
        break;
385
0
    case tparquet::ConvertedType::type::UINT_16:
386
0
        is_type_compatibility = true;
387
0
        [[fallthrough]];
388
0
    case tparquet::ConvertedType::type::INT_32:
389
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_INT, nullable);
390
0
        break;
391
0
    case tparquet::ConvertedType::type::UINT_32:
392
0
        is_type_compatibility = true;
393
0
        [[fallthrough]];
394
0
    case tparquet::ConvertedType::type::INT_64:
395
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_BIGINT, nullable);
396
0
        break;
397
0
    case tparquet::ConvertedType::type::UINT_64:
398
0
        is_type_compatibility = true;
399
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_LARGEINT, nullable);
400
0
        break;
401
0
    case tparquet::ConvertedType::type::JSON:
402
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
403
0
        break;
404
0
    default:
405
0
        throw Exception(Status::InternalError("Not supported parquet ConvertedType: {}",
406
0
                                              physical_schema.converted_type));
407
226
    }
408
226
    return ans;
409
226
}
410
411
Status FieldDescriptor::parse_group_field(const std::vector<tparquet::SchemaElement>& t_schemas,
412
522
                                          size_t curr_pos, FieldSchema* group_field) {
413
522
    const auto& group_schema = t_schemas[curr_pos];
414
522
    if (is_variant_node(group_schema)) {
415
3
        return parse_variant_field(t_schemas, curr_pos, group_field);
416
3
    }
417
519
    if (is_map_node(group_schema)) {
418
        // the map definition:
419
        // optional group <name> (MAP) {
420
        //   repeated group map (MAP_KEY_VALUE) {
421
        //     required <type> key;
422
        //     optional <type> value;
423
        //   }
424
        // }
425
76
        return parse_map_field(t_schemas, curr_pos, group_field);
426
76
    }
427
443
    if (is_list_node(group_schema)) {
428
        // the list definition:
429
        // optional group <name> (LIST) {
430
        //   repeated group [bag | list] { // hive or spark
431
        //     optional <type> [array_element | element]; // hive or spark
432
        //   }
433
        // }
434
152
        return parse_list_field(t_schemas, curr_pos, group_field);
435
152
    }
436
437
291
    if (is_repeated_node(group_schema)) {
438
0
        group_field->repetition_level++;
439
0
        group_field->definition_level++;
440
0
        group_field->children.resize(1);
441
0
        set_child_node_level(group_field, group_field->definition_level);
442
0
        auto struct_field = &group_field->children[0];
443
        // the list of struct:
444
        // repeated group <name> (LIST) {
445
        //   optional/required <type> <name>;
446
        //   ...
447
        // }
448
        // produce a non-null list<struct>
449
0
        RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos, struct_field));
450
451
0
        group_field->name = group_schema.name;
452
0
        group_field->lower_case_name = to_lower(group_field->name);
453
0
        group_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
454
0
        group_field->data_type =
455
0
                std::make_shared<DataTypeArray>(make_nullable(struct_field->data_type));
456
0
        group_field->field_id = group_schema.__isset.field_id ? group_schema.field_id : -1;
457
291
    } else {
458
291
        RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos, group_field));
459
291
    }
460
461
291
    return Status::OK();
462
291
}
463
464
Status FieldDescriptor::parse_variant_field(const std::vector<tparquet::SchemaElement>& t_schemas,
465
3
                                            size_t curr_pos, FieldSchema* variant_field) {
466
3
    RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos, variant_field));
467
468
3
    bool has_metadata = false;
469
3
    bool metadata_required = false;
470
3
    bool has_value = false;
471
3
    bool has_typed_value = false;
472
7
    for (const auto& child : variant_field->children) {
473
7
        if (child.lower_case_name == "metadata") {
474
3
            has_metadata = child.physical_type == tparquet::Type::BYTE_ARRAY;
475
3
            metadata_required = !child.data_type->is_nullable();
476
4
        } else if (child.lower_case_name == "value") {
477
2
            if (child.physical_type != tparquet::Type::BYTE_ARRAY) {
478
1
                return Status::InvalidArgument(
479
1
                        "Parquet VARIANT field '{}' value child must be binary",
480
1
                        variant_field->name);
481
1
            }
482
1
            has_value = true;
483
2
        } else if (child.lower_case_name == "typed_value") {
484
1
            has_typed_value = true;
485
1
        } else {
486
1
            return Status::InvalidArgument("Parquet VARIANT field '{}' has unexpected child '{}'",
487
1
                                           variant_field->name, child.name);
488
1
        }
489
7
    }
490
1
    if (!has_metadata || !metadata_required || (!has_value && !has_typed_value)) {
491
0
        return Status::InvalidArgument(
492
0
                "Parquet VARIANT field '{}' must contain required binary metadata and at least one "
493
0
                "binary value or typed_value field",
494
0
                variant_field->name);
495
0
    }
496
497
1
    variant_field->data_type = std::make_shared<DataTypeVariant>(0, false);
498
1
    if (is_optional_node(t_schemas[curr_pos])) {
499
0
        variant_field->data_type = make_nullable(variant_field->data_type);
500
0
    }
501
1
    mark_variant_subfields(variant_field);
502
1
    return Status::OK();
503
1
}
504
505
Status FieldDescriptor::parse_list_field(const std::vector<tparquet::SchemaElement>& t_schemas,
506
152
                                         size_t curr_pos, FieldSchema* list_field) {
507
    // the list definition:
508
    // spark and hive have three level schemas but with different schema name
509
    // spark: <column-name> - "list" - "element"
510
    // hive: <column-name> - "bag" - "array_element"
511
    // parse three level schemas to two level primitive like: LIST<INT>,
512
    // or nested structure like: LIST<MAP<INT, INT>>
513
152
    auto& first_level = t_schemas[curr_pos];
514
152
    if (first_level.num_children != 1) {
515
0
        return Status::InvalidArgument("List element should have only one child");
516
0
    }
517
518
152
    if (curr_pos + 1 >= t_schemas.size()) {
519
0
        return Status::InvalidArgument("List element should have the second level schema");
520
0
    }
521
522
152
    if (first_level.repetition_type == tparquet::FieldRepetitionType::REPEATED) {
523
0
        return Status::InvalidArgument("List element can't be a repeated schema");
524
0
    }
525
526
    // the repeated schema element
527
152
    auto& second_level = t_schemas[curr_pos + 1];
528
152
    if (second_level.repetition_type != tparquet::FieldRepetitionType::REPEATED) {
529
0
        return Status::InvalidArgument("The second level of list element should be repeated");
530
0
    }
531
532
    // This indicates if this list is nullable.
533
152
    bool is_optional = is_optional_node(first_level);
534
152
    if (is_optional) {
535
116
        list_field->definition_level++;
536
116
    }
537
152
    list_field->repetition_level++;
538
152
    list_field->definition_level++;
539
152
    list_field->children.resize(1);
540
152
    FieldSchema* list_child = &list_field->children[0];
541
542
152
    size_t num_children = num_children_node(second_level);
543
152
    if (num_children > 0) {
544
152
        if (num_children == 1 && !is_struct_list_node(second_level)) {
545
            // optional field, and the third level element is the nested structure in list
546
            // produce nested structure like: LIST<INT>, LIST<MAP>, LIST<LIST<...>>
547
            // skip bag/list, it's a repeated element.
548
152
            set_child_node_level(list_field, list_field->definition_level);
549
152
            RETURN_IF_ERROR(parse_node_field(t_schemas, curr_pos + 2, list_child));
550
152
        } else {
551
            // required field, produce the list of struct
552
0
            set_child_node_level(list_field, list_field->definition_level);
553
0
            RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos + 1, list_child));
554
0
        }
555
152
    } else if (num_children == 0) {
556
        // required two level list, for compatibility reason.
557
0
        set_child_node_level(list_field, list_field->definition_level);
558
0
        parse_physical_field(second_level, false, list_child);
559
0
        _next_schema_pos = curr_pos + 2;
560
0
    }
561
562
152
    list_field->name = first_level.name;
563
152
    list_field->lower_case_name = to_lower(first_level.name);
564
152
    list_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
565
152
    list_field->data_type =
566
152
            std::make_shared<DataTypeArray>(make_nullable(list_field->children[0].data_type));
567
152
    if (is_optional) {
568
116
        list_field->data_type = make_nullable(list_field->data_type);
569
116
    }
570
152
    list_field->field_id = first_level.__isset.field_id ? first_level.field_id : -1;
571
572
152
    return Status::OK();
573
152
}
574
575
Status FieldDescriptor::parse_map_field(const std::vector<tparquet::SchemaElement>& t_schemas,
576
76
                                        size_t curr_pos, FieldSchema* map_field) {
577
    // the map definition in parquet:
578
    // optional group <name> (MAP) {
579
    //   repeated group map (MAP_KEY_VALUE) {
580
    //     required <type> key;
581
    //     optional <type> value;
582
    //   }
583
    // }
584
    // Map value can be optional, the map without values is a SET
585
76
    if (curr_pos + 2 >= t_schemas.size()) {
586
0
        return Status::InvalidArgument("Map element should have at least three levels");
587
0
    }
588
76
    auto& map_schema = t_schemas[curr_pos];
589
76
    if (map_schema.num_children != 1) {
590
0
        return Status::InvalidArgument(
591
0
                "Map element should have only one child(name='map', type='MAP_KEY_VALUE')");
592
0
    }
593
76
    if (is_repeated_node(map_schema)) {
594
0
        return Status::InvalidArgument("Map element can't be a repeated schema");
595
0
    }
596
76
    auto& map_key_value = t_schemas[curr_pos + 1];
597
76
    if (!is_group_node(map_key_value) || !is_repeated_node(map_key_value)) {
598
0
        return Status::InvalidArgument(
599
0
                "the second level in map must be a repeated group(key and value)");
600
0
    }
601
76
    auto& map_key = t_schemas[curr_pos + 2];
602
76
    if (!is_required_node(map_key)) {
603
0
        LOG(WARNING) << "Filed " << map_schema.name << " is map type, but with nullable key column";
604
0
    }
605
606
76
    if (map_key_value.num_children == 1) {
607
        // The map with three levels is a SET
608
0
        return parse_list_field(t_schemas, curr_pos, map_field);
609
0
    }
610
76
    if (map_key_value.num_children != 2) {
611
        // A standard map should have four levels
612
0
        return Status::InvalidArgument(
613
0
                "the second level in map(MAP_KEY_VALUE) should have two children");
614
0
    }
615
    // standard map
616
76
    bool is_optional = is_optional_node(map_schema);
617
76
    if (is_optional) {
618
52
        map_field->definition_level++;
619
52
    }
620
76
    map_field->repetition_level++;
621
76
    map_field->definition_level++;
622
623
    // Directly create key and value children instead of intermediate key_value node
624
76
    map_field->children.resize(2);
625
    // map is a repeated node, we should set the `repeated_parent_def_level` of its children as `definition_level`
626
76
    set_child_node_level(map_field, map_field->definition_level);
627
628
76
    auto key_field = &map_field->children[0];
629
76
    auto value_field = &map_field->children[1];
630
631
    // Parse key and value fields directly from the key_value group's children
632
76
    _next_schema_pos = curr_pos + 2; // Skip key_value group, go directly to key
633
76
    RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos, key_field));
634
76
    RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos, value_field));
635
636
76
    map_field->name = map_schema.name;
637
76
    map_field->lower_case_name = to_lower(map_field->name);
638
76
    map_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
639
76
    map_field->data_type = std::make_shared<DataTypeMap>(make_nullable(key_field->data_type),
640
76
                                                         make_nullable(value_field->data_type));
641
76
    if (is_optional) {
642
52
        map_field->data_type = make_nullable(map_field->data_type);
643
52
    }
644
76
    map_field->field_id = map_schema.__isset.field_id ? map_schema.field_id : -1;
645
646
76
    return Status::OK();
647
76
}
648
649
Status FieldDescriptor::parse_struct_field(const std::vector<tparquet::SchemaElement>& t_schemas,
650
294
                                           size_t curr_pos, FieldSchema* struct_field) {
651
    // the nested column in parquet, parse group to struct.
652
294
    auto& struct_schema = t_schemas[curr_pos];
653
294
    bool is_optional = is_optional_node(struct_schema);
654
294
    if (is_optional) {
655
277
        struct_field->definition_level++;
656
277
    }
657
294
    auto num_children = struct_schema.num_children;
658
294
    struct_field->children.resize(num_children);
659
294
    set_child_node_level(struct_field, struct_field->repeated_parent_def_level);
660
294
    _next_schema_pos = curr_pos + 1;
661
1.07k
    for (int i = 0; i < num_children; ++i) {
662
785
        RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos, &struct_field->children[i]));
663
785
    }
664
294
    struct_field->name = struct_schema.name;
665
294
    struct_field->lower_case_name = to_lower(struct_field->name);
666
294
    struct_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
667
668
294
    struct_field->field_id = struct_schema.__isset.field_id ? struct_schema.field_id : -1;
669
294
    DataTypes res_data_types;
670
294
    std::vector<String> names;
671
1.07k
    for (int i = 0; i < num_children; ++i) {
672
785
        res_data_types.push_back(make_nullable(struct_field->children[i].data_type));
673
785
        names.push_back(struct_field->children[i].name);
674
785
    }
675
294
    struct_field->data_type = std::make_shared<DataTypeStruct>(res_data_types, names);
676
294
    if (is_optional) {
677
277
        struct_field->data_type = make_nullable(struct_field->data_type);
678
277
    }
679
294
    return Status::OK();
680
294
}
681
682
5
int FieldDescriptor::get_column_index(const std::string& column) const {
683
15
    for (int32_t i = 0; i < _fields.size(); i++) {
684
15
        if (_fields[i].name == column) {
685
5
            return i;
686
5
        }
687
15
    }
688
0
    return -1;
689
5
}
690
691
1.02k
FieldSchema* FieldDescriptor::get_column(const std::string& name) const {
692
1.02k
    auto it = _name_to_field.find(name);
693
1.02k
    if (it != _name_to_field.end()) {
694
1.02k
        return it->second;
695
1.02k
    }
696
0
    throw Exception(Status::InternalError("Name {} not found in FieldDescriptor!", name));
697
0
    return nullptr;
698
1.02k
}
699
700
14
void FieldDescriptor::get_column_names(std::unordered_set<std::string>* names) const {
701
14
    names->clear();
702
210
    for (const FieldSchema& f : _fields) {
703
210
        names->emplace(f.name);
704
210
    }
705
14
}
706
707
0
std::string FieldDescriptor::debug_string() const {
708
0
    std::stringstream ss;
709
0
    ss << "fields=[";
710
0
    for (int i = 0; i < _fields.size(); ++i) {
711
0
        if (i != 0) {
712
0
            ss << ", ";
713
0
        }
714
0
        ss << _fields[i].debug_string();
715
0
    }
716
0
    ss << "]";
717
0
    return ss.str();
718
0
}
719
720
40
void FieldDescriptor::assign_ids() {
721
40
    uint64_t next_id = 1;
722
371
    for (auto& field : _fields) {
723
371
        field.assign_ids(next_id);
724
371
    }
725
40
}
726
727
0
const FieldSchema* FieldDescriptor::find_column_by_id(uint64_t column_id) const {
728
0
    for (const auto& field : _fields) {
729
0
        if (auto result = field.find_column_by_id(column_id)) {
730
0
            return result;
731
0
        }
732
0
    }
733
0
    return nullptr;
734
0
}
735
736
1.98k
void FieldSchema::assign_ids(uint64_t& next_id) {
737
1.98k
    column_id = next_id++;
738
739
1.98k
    for (auto& child : children) {
740
1.60k
        child.assign_ids(next_id);
741
1.60k
    }
742
743
1.98k
    max_column_id = next_id - 1;
744
1.98k
}
745
746
0
const FieldSchema* FieldSchema::find_column_by_id(uint64_t target_id) const {
747
0
    if (column_id == target_id) {
748
0
        return this;
749
0
    }
750
751
0
    for (const auto& child : children) {
752
0
        if (auto result = child.find_column_by_id(target_id)) {
753
0
            return result;
754
0
        }
755
0
    }
756
757
0
    return nullptr;
758
0
}
759
760
571
uint64_t FieldSchema::get_column_id() const {
761
571
    return column_id;
762
571
}
763
764
0
void FieldSchema::set_column_id(uint64_t id) {
765
0
    column_id = id;
766
0
}
767
768
105
uint64_t FieldSchema::get_max_column_id() const {
769
105
    return max_column_id;
770
105
}
771
772
} // namespace doris