Coverage Report

Created: 2026-04-10 16:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/parquet/schema_desc.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/parquet/schema_desc.h"
19
20
#include <ctype.h>
21
22
#include <algorithm>
23
#include <ostream>
24
#include <utility>
25
26
#include "common/cast_set.h"
27
#include "common/logging.h"
28
#include "core/data_type/data_type_array.h"
29
#include "core/data_type/data_type_factory.hpp"
30
#include "core/data_type/data_type_map.h"
31
#include "core/data_type/data_type_struct.h"
32
#include "core/data_type/define_primitive_type.h"
33
#include "format/table/table_format_reader.h"
34
#include "util/slice.h"
35
#include "util/string_util.h"
36
37
namespace doris {
38
39
137k
static bool is_group_node(const tparquet::SchemaElement& schema) {
40
137k
    return schema.num_children > 0;
41
137k
}
42
43
16.3k
static bool is_list_node(const tparquet::SchemaElement& schema) {
44
16.3k
    return schema.__isset.converted_type && schema.converted_type == tparquet::ConvertedType::LIST;
45
16.3k
}
46
47
24.0k
static bool is_map_node(const tparquet::SchemaElement& schema) {
48
24.0k
    return schema.__isset.converted_type &&
49
24.0k
           (schema.converted_type == tparquet::ConvertedType::MAP ||
50
18.1k
            schema.converted_type == tparquet::ConvertedType::MAP_KEY_VALUE);
51
24.0k
}
52
53
116k
static bool is_repeated_node(const tparquet::SchemaElement& schema) {
54
116k
    return schema.__isset.repetition_type &&
55
116k
           schema.repetition_type == tparquet::FieldRepetitionType::REPEATED;
56
116k
}
57
58
7.77k
static bool is_required_node(const tparquet::SchemaElement& schema) {
59
7.77k
    return schema.__isset.repetition_type &&
60
7.77k
           schema.repetition_type == tparquet::FieldRepetitionType::REQUIRED;
61
7.77k
}
62
63
118k
static bool is_optional_node(const tparquet::SchemaElement& schema) {
64
118k
    return schema.__isset.repetition_type &&
65
119k
           schema.repetition_type == tparquet::FieldRepetitionType::OPTIONAL;
66
118k
}
67
68
10.3k
static int num_children_node(const tparquet::SchemaElement& schema) {
69
10.3k
    return schema.__isset.num_children ? schema.num_children : 0;
70
10.3k
}
71
72
/**
73
 * `repeated_parent_def_level` is the definition level of the first ancestor node whose repetition_type equals REPEATED.
74
 * Empty array/map values are not stored in doris columns, so have to use `repeated_parent_def_level` to skip the
75
 * empty or null values in ancestor node.
76
 *
77
 * For instance, considering an array of strings with 3 rows like the following:
78
 * null, [], [a, b, c]
79
 * We can store four elements in data column: null, a, b, c
80
 * and the offsets column is: 1, 1, 4
81
 * and the null map is: 1, 0, 0
82
 * For the i-th row in array column: range from `offsets[i - 1]` until `offsets[i]` represents the elements in this row,
83
 * so we can't store empty array/map values in doris data column.
84
 * As a comparison, spark does not require `repeated_parent_def_level`,
85
 * because the spark column stores empty array/map values , and use anther length column to indicate empty values.
86
 * Please reference: https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
87
 *
88
 * Furthermore, we can also avoid store null array/map values in doris data column.
89
 * The same three rows as above, We can only store three elements in data column: a, b, c
90
 * and the offsets column is: 0, 0, 3
91
 * and the null map is: 1, 0, 0
92
 *
93
 * Inherit the repetition and definition level from parent node, if the parent node is repeated,
94
 * we should set repeated_parent_def_level = definition_level, otherwise as repeated_parent_def_level.
95
 * @param parent parent node
96
 * @param repeated_parent_def_level the first ancestor node whose repetition_type equals REPEATED
97
 */
98
24.1k
static void set_child_node_level(FieldSchema* parent, int16_t repeated_parent_def_level) {
99
40.5k
    for (auto& child : parent->children) {
100
40.5k
        child.repetition_level = parent->repetition_level;
101
40.5k
        child.definition_level = parent->definition_level;
102
40.5k
        child.repeated_parent_def_level = repeated_parent_def_level;
103
40.5k
    }
104
24.1k
}
105
106
10.3k
static bool is_struct_list_node(const tparquet::SchemaElement& schema) {
107
10.3k
    const std::string& name = schema.name;
108
10.3k
    static const Slice array_slice("array", 5);
109
10.3k
    static const Slice tuple_slice("_tuple", 6);
110
10.3k
    Slice slice(name);
111
10.3k
    return slice == array_slice || slice.ends_with(tuple_slice);
112
10.3k
}
113
114
0
std::string FieldSchema::debug_string() const {
115
0
    std::stringstream ss;
116
0
    ss << "FieldSchema(name=" << name << ", R=" << repetition_level << ", D=" << definition_level;
117
0
    if (children.size() > 0) {
118
0
        ss << ", type=" << data_type->get_name() << ", children=[";
119
0
        for (int i = 0; i < children.size(); ++i) {
120
0
            if (i != 0) {
121
0
                ss << ", ";
122
0
            }
123
0
            ss << children[i].debug_string();
124
0
        }
125
0
        ss << "]";
126
0
    } else {
127
0
        ss << ", physical_type=" << physical_type;
128
0
        ss << " , doris_type=" << data_type->get_name();
129
0
    }
130
0
    ss << ")";
131
0
    return ss.str();
132
0
}
133
134
11.2k
Status FieldDescriptor::parse_from_thrift(const std::vector<tparquet::SchemaElement>& t_schemas) {
135
11.2k
    if (t_schemas.size() == 0 || !is_group_node(t_schemas[0])) {
136
0
        return Status::InvalidArgument("Wrong parquet root schema element");
137
0
    }
138
11.2k
    const auto& root_schema = t_schemas[0];
139
11.2k
    _fields.resize(root_schema.num_children);
140
11.2k
    _next_schema_pos = 1;
141
142
90.0k
    for (int i = 0; i < root_schema.num_children; ++i) {
143
78.8k
        RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos, &_fields[i]));
144
78.8k
        if (_name_to_field.find(_fields[i].name) != _name_to_field.end()) {
145
0
            return Status::InvalidArgument("Duplicated field name: {}", _fields[i].name);
146
0
        }
147
78.8k
        _name_to_field.emplace(_fields[i].name, &_fields[i]);
148
78.8k
    }
149
150
11.2k
    if (_next_schema_pos != t_schemas.size()) {
151
0
        return Status::InvalidArgument("Remaining {} unparsed schema elements",
152
0
                                       t_schemas.size() - _next_schema_pos);
153
0
    }
154
155
11.2k
    return Status::OK();
156
11.2k
}
157
158
Status FieldDescriptor::parse_node_field(const std::vector<tparquet::SchemaElement>& t_schemas,
159
118k
                                         size_t curr_pos, FieldSchema* node_field) {
160
118k
    if (curr_pos >= t_schemas.size()) {
161
0
        return Status::InvalidArgument("Out-of-bounds index of schema elements");
162
0
    }
163
118k
    auto& t_schema = t_schemas[curr_pos];
164
118k
    if (is_group_node(t_schema)) {
165
        // nested structure or nullable list
166
24.0k
        return parse_group_field(t_schemas, curr_pos, node_field);
167
24.0k
    }
168
94.8k
    if (is_repeated_node(t_schema)) {
169
        // repeated <primitive-type> <name> (LIST)
170
        // produce required list<element>
171
36
        node_field->repetition_level++;
172
36
        node_field->definition_level++;
173
36
        node_field->children.resize(1);
174
36
        set_child_node_level(node_field, node_field->definition_level);
175
36
        auto child = &node_field->children[0];
176
36
        parse_physical_field(t_schema, false, child);
177
178
36
        node_field->name = t_schema.name;
179
36
        node_field->lower_case_name = to_lower(t_schema.name);
180
36
        node_field->data_type = std::make_shared<DataTypeArray>(make_nullable(child->data_type));
181
36
        _next_schema_pos = curr_pos + 1;
182
36
        node_field->field_id = t_schema.__isset.field_id ? t_schema.field_id : -1;
183
94.8k
    } else {
184
94.8k
        bool is_optional = is_optional_node(t_schema);
185
94.8k
        if (is_optional) {
186
71.2k
            node_field->definition_level++;
187
71.2k
        }
188
94.8k
        parse_physical_field(t_schema, is_optional, node_field);
189
94.8k
        _next_schema_pos = curr_pos + 1;
190
94.8k
    }
191
94.8k
    return Status::OK();
192
118k
}
193
194
void FieldDescriptor::parse_physical_field(const tparquet::SchemaElement& physical_schema,
195
94.9k
                                           bool is_nullable, FieldSchema* physical_field) {
196
94.9k
    physical_field->name = physical_schema.name;
197
94.9k
    physical_field->lower_case_name = to_lower(physical_field->name);
198
94.9k
    physical_field->parquet_schema = physical_schema;
199
94.9k
    physical_field->physical_type = physical_schema.type;
200
94.9k
    physical_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
201
94.9k
    _physical_fields.push_back(physical_field);
202
94.9k
    physical_field->physical_column_index = cast_set<int>(_physical_fields.size() - 1);
203
94.9k
    auto type = get_doris_type(physical_schema, is_nullable);
204
94.9k
    physical_field->data_type = type.first;
205
94.9k
    physical_field->is_type_compatibility = type.second;
206
94.9k
    physical_field->field_id = physical_schema.__isset.field_id ? physical_schema.field_id : -1;
207
94.9k
}
208
209
std::pair<DataTypePtr, bool> FieldDescriptor::get_doris_type(
210
95.0k
        const tparquet::SchemaElement& physical_schema, bool nullable) {
211
95.0k
    std::pair<DataTypePtr, bool> ans = {std::make_shared<DataTypeNothing>(), false};
212
95.0k
    try {
213
95.0k
        if (physical_schema.__isset.logicalType) {
214
39.7k
            ans = convert_to_doris_type(physical_schema.logicalType, nullable);
215
55.2k
        } else if (physical_schema.__isset.converted_type) {
216
11.5k
            ans = convert_to_doris_type(physical_schema, nullable);
217
11.5k
        }
218
95.0k
    } catch (...) {
219
        // now the Not supported exception are ignored
220
        // so those byte_array maybe be treated as varbinary(now) : string(before)
221
54
    }
222
95.0k
    if (ans.first->get_primitive_type() == PrimitiveType::INVALID_TYPE) {
223
43.9k
        switch (physical_schema.type) {
224
3.40k
        case tparquet::Type::BOOLEAN:
225
3.40k
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_BOOLEAN, nullable);
226
3.40k
            break;
227
17.2k
        case tparquet::Type::INT32:
228
17.2k
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_INT, nullable);
229
17.2k
            break;
230
10.0k
        case tparquet::Type::INT64:
231
10.0k
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_BIGINT, nullable);
232
10.0k
            break;
233
1.64k
        case tparquet::Type::INT96:
234
1.64k
            if (_enable_mapping_timestamp_tz) {
235
                // treat INT96 as TIMESTAMPTZ
236
32
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_TIMESTAMPTZ, nullable,
237
32
                                                                         0, 6);
238
1.61k
            } else {
239
                // in most cases, it's a nano timestamp
240
1.61k
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_DATETIMEV2, nullable,
241
1.61k
                                                                         0, 6);
242
1.61k
            }
243
1.64k
            break;
244
5.01k
        case tparquet::Type::FLOAT:
245
5.01k
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_FLOAT, nullable);
246
5.01k
            break;
247
5.44k
        case tparquet::Type::DOUBLE:
248
5.44k
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_DOUBLE, nullable);
249
5.44k
            break;
250
1.05k
        case tparquet::Type::BYTE_ARRAY:
251
1.05k
            if (_enable_mapping_varbinary) {
252
                // if physical_schema not set logicalType and converted_type,
253
                // we treat BYTE_ARRAY as VARBINARY by default, so that we can read all data directly.
254
122
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_VARBINARY, nullable);
255
936
            } else {
256
936
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
257
936
            }
258
1.05k
            break;
259
22
        case tparquet::Type::FIXED_LEN_BYTE_ARRAY:
260
22
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
261
22
            break;
262
0
        default:
263
0
            throw Exception(Status::InternalError("Not supported parquet logicalType{}",
264
0
                                                  physical_schema.type));
265
0
            break;
266
43.9k
        }
267
43.9k
    }
268
95.0k
    return ans;
269
95.0k
}
270
271
std::pair<DataTypePtr, bool> FieldDescriptor::convert_to_doris_type(
272
39.7k
        tparquet::LogicalType logicalType, bool nullable) {
273
39.7k
    std::pair<DataTypePtr, bool> ans = {std::make_shared<DataTypeNothing>(), false};
274
39.7k
    bool& is_type_compatibility = ans.second;
275
39.7k
    if (logicalType.__isset.STRING) {
276
21.9k
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
277
21.9k
    } else if (logicalType.__isset.DECIMAL) {
278
10.6k
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_DECIMAL128I, nullable,
279
10.6k
                                                                 logicalType.DECIMAL.precision,
280
10.6k
                                                                 logicalType.DECIMAL.scale);
281
10.6k
    } else if (logicalType.__isset.DATE) {
282
2.21k
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_DATEV2, nullable);
283
4.95k
    } else if (logicalType.__isset.INTEGER) {
284
2.27k
        if (logicalType.INTEGER.isSigned) {
285
2.01k
            if (logicalType.INTEGER.bitWidth <= 8) {
286
626
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_TINYINT, nullable);
287
1.38k
            } else if (logicalType.INTEGER.bitWidth <= 16) {
288
940
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_SMALLINT, nullable);
289
940
            } else if (logicalType.INTEGER.bitWidth <= 32) {
290
198
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_INT, nullable);
291
246
            } else {
292
246
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_BIGINT, nullable);
293
246
            }
294
2.01k
        } else {
295
264
            is_type_compatibility = true;
296
264
            if (logicalType.INTEGER.bitWidth <= 8) {
297
66
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_SMALLINT, nullable);
298
198
            } else if (logicalType.INTEGER.bitWidth <= 16) {
299
66
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_INT, nullable);
300
132
            } else if (logicalType.INTEGER.bitWidth <= 32) {
301
70
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_BIGINT, nullable);
302
70
            } else {
303
62
                ans.first = DataTypeFactory::instance().create_data_type(TYPE_LARGEINT, nullable);
304
62
            }
305
264
        }
306
2.67k
    } else if (logicalType.__isset.TIME) {
307
40
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_TIMEV2, nullable);
308
2.63k
    } else if (logicalType.__isset.TIMESTAMP) {
309
2.57k
        if (_enable_mapping_timestamp_tz) {
310
48
            if (logicalType.TIMESTAMP.isAdjustedToUTC) {
311
                // treat TIMESTAMP with isAdjustedToUTC as TIMESTAMPTZ
312
48
                ans.first = DataTypeFactory::instance().create_data_type(
313
48
                        TYPE_TIMESTAMPTZ, nullable, 0,
314
48
                        logicalType.TIMESTAMP.unit.__isset.MILLIS ? 3 : 6);
315
48
                return ans;
316
48
            }
317
48
        }
318
2.53k
        ans.first = DataTypeFactory::instance().create_data_type(
319
2.53k
                TYPE_DATETIMEV2, nullable, 0, logicalType.TIMESTAMP.unit.__isset.MILLIS ? 3 : 6);
320
2.53k
    } else if (logicalType.__isset.JSON) {
321
4
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
322
54
    } else if (logicalType.__isset.UUID) {
323
14
        if (_enable_mapping_varbinary) {
324
9
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_VARBINARY, nullable, -1,
325
9
                                                                     -1, 16);
326
9
        } else {
327
5
            ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
328
5
        }
329
40
    } else if (logicalType.__isset.FLOAT16) {
330
20
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_FLOAT, nullable);
331
20
    } else {
332
20
        throw Exception(Status::InternalError("Not supported parquet logicalType"));
333
20
    }
334
39.6k
    return ans;
335
39.7k
}
336
337
std::pair<DataTypePtr, bool> FieldDescriptor::convert_to_doris_type(
338
11.5k
        const tparquet::SchemaElement& physical_schema, bool nullable) {
339
11.5k
    std::pair<DataTypePtr, bool> ans = {std::make_shared<DataTypeNothing>(), false};
340
11.5k
    bool& is_type_compatibility = ans.second;
341
11.5k
    switch (physical_schema.converted_type) {
342
7.42k
    case tparquet::ConvertedType::type::UTF8:
343
7.42k
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
344
7.42k
        break;
345
1.65k
    case tparquet::ConvertedType::type::DECIMAL:
346
1.65k
        ans.first = DataTypeFactory::instance().create_data_type(
347
1.65k
                TYPE_DECIMAL128I, nullable, physical_schema.precision, physical_schema.scale);
348
1.65k
        break;
349
1.08k
    case tparquet::ConvertedType::type::DATE:
350
1.08k
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_DATEV2, nullable);
351
1.08k
        break;
352
0
    case tparquet::ConvertedType::type::TIME_MILLIS:
353
0
        [[fallthrough]];
354
0
    case tparquet::ConvertedType::type::TIME_MICROS:
355
0
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_TIMEV2, nullable);
356
0
        break;
357
24
    case tparquet::ConvertedType::type::TIMESTAMP_MILLIS:
358
24
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_DATETIMEV2, nullable, 0, 3);
359
24
        break;
360
52
    case tparquet::ConvertedType::type::TIMESTAMP_MICROS:
361
52
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_DATETIMEV2, nullable, 0, 6);
362
52
        break;
363
342
    case tparquet::ConvertedType::type::INT_8:
364
342
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_TINYINT, nullable);
365
342
        break;
366
8
    case tparquet::ConvertedType::type::UINT_8:
367
8
        is_type_compatibility = true;
368
8
        [[fallthrough]];
369
357
    case tparquet::ConvertedType::type::INT_16:
370
357
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_SMALLINT, nullable);
371
357
        break;
372
8
    case tparquet::ConvertedType::type::UINT_16:
373
8
        is_type_compatibility = true;
374
8
        [[fallthrough]];
375
40
    case tparquet::ConvertedType::type::INT_32:
376
40
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_INT, nullable);
377
40
        break;
378
24
    case tparquet::ConvertedType::type::UINT_32:
379
24
        is_type_compatibility = true;
380
24
        [[fallthrough]];
381
356
    case tparquet::ConvertedType::type::INT_64:
382
356
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_BIGINT, nullable);
383
356
        break;
384
148
    case tparquet::ConvertedType::type::UINT_64:
385
148
        is_type_compatibility = true;
386
148
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_LARGEINT, nullable);
387
148
        break;
388
4
    case tparquet::ConvertedType::type::JSON:
389
4
        ans.first = DataTypeFactory::instance().create_data_type(TYPE_STRING, nullable);
390
4
        break;
391
22
    default:
392
22
        throw Exception(Status::InternalError("Not supported parquet ConvertedType: {}",
393
22
                                              physical_schema.converted_type));
394
11.5k
    }
395
11.4k
    return ans;
396
11.5k
}
397
398
Status FieldDescriptor::parse_group_field(const std::vector<tparquet::SchemaElement>& t_schemas,
399
24.0k
                                          size_t curr_pos, FieldSchema* group_field) {
400
24.0k
    auto& group_schema = t_schemas[curr_pos];
401
24.0k
    if (is_map_node(group_schema)) {
402
        // the map definition:
403
        // optional group <name> (MAP) {
404
        //   repeated group map (MAP_KEY_VALUE) {
405
        //     required <type> key;
406
        //     optional <type> value;
407
        //   }
408
        // }
409
7.78k
        return parse_map_field(t_schemas, curr_pos, group_field);
410
7.78k
    }
411
16.3k
    if (is_list_node(group_schema)) {
412
        // the list definition:
413
        // optional group <name> (LIST) {
414
        //   repeated group [bag | list] { // hive or spark
415
        //     optional <type> [array_element | element]; // hive or spark
416
        //   }
417
        // }
418
10.3k
        return parse_list_field(t_schemas, curr_pos, group_field);
419
10.3k
    }
420
421
5.95k
    if (is_repeated_node(group_schema)) {
422
32
        group_field->repetition_level++;
423
32
        group_field->definition_level++;
424
32
        group_field->children.resize(1);
425
32
        set_child_node_level(group_field, group_field->definition_level);
426
32
        auto struct_field = &group_field->children[0];
427
        // the list of struct:
428
        // repeated group <name> (LIST) {
429
        //   optional/required <type> <name>;
430
        //   ...
431
        // }
432
        // produce a non-null list<struct>
433
32
        RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos, struct_field));
434
435
32
        group_field->name = group_schema.name;
436
32
        group_field->lower_case_name = to_lower(group_field->name);
437
32
        group_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
438
32
        group_field->data_type =
439
32
                std::make_shared<DataTypeArray>(make_nullable(struct_field->data_type));
440
32
        group_field->field_id = group_schema.__isset.field_id ? group_schema.field_id : -1;
441
5.92k
    } else {
442
5.92k
        RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos, group_field));
443
5.92k
    }
444
445
5.95k
    return Status::OK();
446
5.95k
}
447
448
Status FieldDescriptor::parse_list_field(const std::vector<tparquet::SchemaElement>& t_schemas,
449
10.3k
                                         size_t curr_pos, FieldSchema* list_field) {
450
    // the list definition:
451
    // spark and hive have three level schemas but with different schema name
452
    // spark: <column-name> - "list" - "element"
453
    // hive: <column-name> - "bag" - "array_element"
454
    // parse three level schemas to two level primitive like: LIST<INT>,
455
    // or nested structure like: LIST<MAP<INT, INT>>
456
10.3k
    auto& first_level = t_schemas[curr_pos];
457
10.3k
    if (first_level.num_children != 1) {
458
0
        return Status::InvalidArgument("List element should have only one child");
459
0
    }
460
461
10.3k
    if (curr_pos + 1 >= t_schemas.size()) {
462
0
        return Status::InvalidArgument("List element should have the second level schema");
463
0
    }
464
465
10.3k
    if (first_level.repetition_type == tparquet::FieldRepetitionType::REPEATED) {
466
0
        return Status::InvalidArgument("List element can't be a repeated schema");
467
0
    }
468
469
    // the repeated schema element
470
10.3k
    auto& second_level = t_schemas[curr_pos + 1];
471
10.3k
    if (second_level.repetition_type != tparquet::FieldRepetitionType::REPEATED) {
472
0
        return Status::InvalidArgument("The second level of list element should be repeated");
473
0
    }
474
475
    // This indicates if this list is nullable.
476
10.3k
    bool is_optional = is_optional_node(first_level);
477
10.3k
    if (is_optional) {
478
9.43k
        list_field->definition_level++;
479
9.43k
    }
480
10.3k
    list_field->repetition_level++;
481
10.3k
    list_field->definition_level++;
482
10.3k
    list_field->children.resize(1);
483
10.3k
    FieldSchema* list_child = &list_field->children[0];
484
485
10.3k
    size_t num_children = num_children_node(second_level);
486
10.3k
    if (num_children > 0) {
487
10.3k
        if (num_children == 1 && !is_struct_list_node(second_level)) {
488
            // optional field, and the third level element is the nested structure in list
489
            // produce nested structure like: LIST<INT>, LIST<MAP>, LIST<LIST<...>>
490
            // skip bag/list, it's a repeated element.
491
10.2k
            set_child_node_level(list_field, list_field->definition_level);
492
10.2k
            RETURN_IF_ERROR(parse_node_field(t_schemas, curr_pos + 2, list_child));
493
10.2k
        } else {
494
            // required field, produce the list of struct
495
36
            set_child_node_level(list_field, list_field->definition_level);
496
36
            RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos + 1, list_child));
497
36
        }
498
10.3k
    } else if (num_children == 0) {
499
        // required two level list, for compatibility reason.
500
48
        set_child_node_level(list_field, list_field->definition_level);
501
48
        parse_physical_field(second_level, false, list_child);
502
48
        _next_schema_pos = curr_pos + 2;
503
48
    }
504
505
10.3k
    list_field->name = first_level.name;
506
10.3k
    list_field->lower_case_name = to_lower(first_level.name);
507
10.3k
    list_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
508
10.3k
    list_field->data_type =
509
10.3k
            std::make_shared<DataTypeArray>(make_nullable(list_field->children[0].data_type));
510
10.3k
    if (is_optional) {
511
9.44k
        list_field->data_type = make_nullable(list_field->data_type);
512
9.44k
    }
513
10.3k
    list_field->field_id = first_level.__isset.field_id ? first_level.field_id : -1;
514
515
10.3k
    return Status::OK();
516
10.3k
}
517
518
Status FieldDescriptor::parse_map_field(const std::vector<tparquet::SchemaElement>& t_schemas,
519
7.77k
                                        size_t curr_pos, FieldSchema* map_field) {
520
    // the map definition in parquet:
521
    // optional group <name> (MAP) {
522
    //   repeated group map (MAP_KEY_VALUE) {
523
    //     required <type> key;
524
    //     optional <type> value;
525
    //   }
526
    // }
527
    // Map value can be optional, the map without values is a SET
528
7.77k
    if (curr_pos + 2 >= t_schemas.size()) {
529
0
        return Status::InvalidArgument("Map element should have at least three levels");
530
0
    }
531
7.77k
    auto& map_schema = t_schemas[curr_pos];
532
7.77k
    if (map_schema.num_children != 1) {
533
2
        return Status::InvalidArgument(
534
2
                "Map element should have only one child(name='map', type='MAP_KEY_VALUE')");
535
2
    }
536
7.77k
    if (is_repeated_node(map_schema)) {
537
0
        return Status::InvalidArgument("Map element can't be a repeated schema");
538
0
    }
539
7.77k
    auto& map_key_value = t_schemas[curr_pos + 1];
540
7.77k
    if (!is_group_node(map_key_value) || !is_repeated_node(map_key_value)) {
541
0
        return Status::InvalidArgument(
542
0
                "the second level in map must be a repeated group(key and value)");
543
0
    }
544
7.77k
    auto& map_key = t_schemas[curr_pos + 2];
545
7.77k
    if (!is_required_node(map_key)) {
546
436
        LOG(WARNING) << "Filed " << map_schema.name << " is map type, but with nullable key column";
547
436
    }
548
549
7.77k
    if (map_key_value.num_children == 1) {
550
        // The map with three levels is a SET
551
0
        return parse_list_field(t_schemas, curr_pos, map_field);
552
0
    }
553
7.77k
    if (map_key_value.num_children != 2) {
554
        // A standard map should have four levels
555
0
        return Status::InvalidArgument(
556
0
                "the second level in map(MAP_KEY_VALUE) should have two children");
557
0
    }
558
    // standard map
559
7.77k
    bool is_optional = is_optional_node(map_schema);
560
7.77k
    if (is_optional) {
561
7.11k
        map_field->definition_level++;
562
7.11k
    }
563
7.77k
    map_field->repetition_level++;
564
7.77k
    map_field->definition_level++;
565
566
    // Directly create key and value children instead of intermediate key_value node
567
7.77k
    map_field->children.resize(2);
568
    // map is a repeated node, we should set the `repeated_parent_def_level` of its children as `definition_level`
569
7.77k
    set_child_node_level(map_field, map_field->definition_level);
570
571
7.77k
    auto key_field = &map_field->children[0];
572
7.77k
    auto value_field = &map_field->children[1];
573
574
    // Parse key and value fields directly from the key_value group's children
575
7.77k
    _next_schema_pos = curr_pos + 2; // Skip key_value group, go directly to key
576
7.77k
    RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos, key_field));
577
7.77k
    RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos, value_field));
578
579
7.77k
    map_field->name = map_schema.name;
580
7.77k
    map_field->lower_case_name = to_lower(map_field->name);
581
7.77k
    map_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
582
7.77k
    map_field->data_type = std::make_shared<DataTypeMap>(make_nullable(key_field->data_type),
583
7.77k
                                                         make_nullable(value_field->data_type));
584
7.77k
    if (is_optional) {
585
7.11k
        map_field->data_type = make_nullable(map_field->data_type);
586
7.11k
    }
587
7.77k
    map_field->field_id = map_schema.__isset.field_id ? map_schema.field_id : -1;
588
589
7.77k
    return Status::OK();
590
7.77k
}
591
592
Status FieldDescriptor::parse_struct_field(const std::vector<tparquet::SchemaElement>& t_schemas,
593
5.99k
                                           size_t curr_pos, FieldSchema* struct_field) {
594
    // the nested column in parquet, parse group to struct.
595
5.99k
    auto& struct_schema = t_schemas[curr_pos];
596
5.99k
    bool is_optional = is_optional_node(struct_schema);
597
5.99k
    if (is_optional) {
598
5.14k
        struct_field->definition_level++;
599
5.14k
    }
600
5.99k
    auto num_children = struct_schema.num_children;
601
5.99k
    struct_field->children.resize(num_children);
602
5.99k
    set_child_node_level(struct_field, struct_field->repeated_parent_def_level);
603
5.99k
    _next_schema_pos = curr_pos + 1;
604
20.5k
    for (int i = 0; i < num_children; ++i) {
605
14.5k
        RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos, &struct_field->children[i]));
606
14.5k
    }
607
5.98k
    struct_field->name = struct_schema.name;
608
5.98k
    struct_field->lower_case_name = to_lower(struct_field->name);
609
5.98k
    struct_field->column_id = UNASSIGNED_COLUMN_ID; // Initialize column_id
610
611
5.98k
    struct_field->field_id = struct_schema.__isset.field_id ? struct_schema.field_id : -1;
612
5.98k
    DataTypes res_data_types;
613
5.98k
    std::vector<String> names;
614
20.5k
    for (int i = 0; i < num_children; ++i) {
615
14.5k
        res_data_types.push_back(make_nullable(struct_field->children[i].data_type));
616
14.5k
        names.push_back(struct_field->children[i].name);
617
14.5k
    }
618
5.98k
    struct_field->data_type = std::make_shared<DataTypeStruct>(res_data_types, names);
619
5.98k
    if (is_optional) {
620
5.14k
        struct_field->data_type = make_nullable(struct_field->data_type);
621
5.14k
    }
622
5.98k
    return Status::OK();
623
5.99k
}
624
625
5
int FieldDescriptor::get_column_index(const std::string& column) const {
626
15
    for (int32_t i = 0; i < _fields.size(); i++) {
627
15
        if (_fields[i].name == column) {
628
5
            return i;
629
5
        }
630
15
    }
631
0
    return -1;
632
5
}
633
634
928k
FieldSchema* FieldDescriptor::get_column(const std::string& name) const {
635
928k
    auto it = _name_to_field.find(name);
636
930k
    if (it != _name_to_field.end()) {
637
930k
        return it->second;
638
930k
    }
639
18.4E
    throw Exception(Status::InternalError("Name {} not found in FieldDescriptor!", name));
640
0
    return nullptr;
641
928k
}
642
643
33.8k
void FieldDescriptor::get_column_names(std::unordered_set<std::string>* names) const {
644
33.8k
    names->clear();
645
299k
    for (const FieldSchema& f : _fields) {
646
299k
        names->emplace(f.name);
647
299k
    }
648
33.8k
}
649
650
0
std::string FieldDescriptor::debug_string() const {
651
0
    std::stringstream ss;
652
0
    ss << "fields=[";
653
0
    for (int i = 0; i < _fields.size(); ++i) {
654
0
        if (i != 0) {
655
0
            ss << ", ";
656
0
        }
657
0
        ss << _fields[i].debug_string();
658
0
    }
659
0
    ss << "]";
660
0
    return ss.str();
661
0
}
662
663
28.5k
void FieldDescriptor::assign_ids() {
664
28.5k
    uint64_t next_id = 1;
665
202k
    for (auto& field : _fields) {
666
202k
        field.assign_ids(next_id);
667
202k
    }
668
28.5k
}
669
670
0
const FieldSchema* FieldDescriptor::find_column_by_id(uint64_t column_id) const {
671
0
    for (const auto& field : _fields) {
672
0
        if (auto result = field.find_column_by_id(column_id)) {
673
0
            return result;
674
0
        }
675
0
    }
676
0
    return nullptr;
677
0
}
678
679
291k
void FieldSchema::assign_ids(uint64_t& next_id) {
680
291k
    column_id = next_id++;
681
682
291k
    for (auto& child : children) {
683
89.1k
        child.assign_ids(next_id);
684
89.1k
    }
685
686
291k
    max_column_id = next_id - 1;
687
291k
}
688
689
0
const FieldSchema* FieldSchema::find_column_by_id(uint64_t target_id) const {
690
0
    if (column_id == target_id) {
691
0
        return this;
692
0
    }
693
694
0
    for (const auto& child : children) {
695
0
        if (auto result = child.find_column_by_id(target_id)) {
696
0
            return result;
697
0
        }
698
0
    }
699
700
0
    return nullptr;
701
0
}
702
703
259k
uint64_t FieldSchema::get_column_id() const {
704
259k
    return column_id;
705
259k
}
706
707
0
void FieldSchema::set_column_id(uint64_t id) {
708
0
    column_id = id;
709
0
}
710
711
32.1k
uint64_t FieldSchema::get_max_column_id() const {
712
32.1k
    return max_column_id;
713
32.1k
}
714
715
} // namespace doris