Coverage Report

Created: 2026-03-13 03:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/table/parquet_utils.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/table/parquet_utils.h"
19
20
#include <fmt/format.h>
21
22
#include <algorithm>
23
#include <cctype>
24
#include <cstring>
25
#include <unordered_map>
26
#include <utility>
27
28
#include "core/assert_cast.h"
29
#include "core/column/column.h"
30
#include "core/column/column_nullable.h"
31
#include "core/column/column_string.h"
32
#include "core/column/column_vector.h"
33
#include "core/data_type/data_type_nullable.h"
34
#include "format/parquet/parquet_column_convert.h"
35
#include "util/string_util.h"
36
#include "util/unaligned.h"
37
38
namespace doris::parquet_utils {
39
namespace {
40
41
template <typename ColumnType, typename T>
42
5
void insert_numeric_impl(MutableColumnPtr& column, T value) {
43
5
    if (auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) {
44
1
        auto& nested = nullable_column->get_nested_column();
45
1
        assert_cast<ColumnType&>(nested).insert_value(value);
46
1
        nullable_column->push_false_to_nullmap(1);
47
4
    } else {
48
4
        assert_cast<ColumnType&>(*column).insert_value(value);
49
4
    }
50
5
}
parquet_utils.cpp:_ZN5doris13parquet_utils12_GLOBAL__N_119insert_numeric_implINS_12ColumnVectorILNS_13PrimitiveTypeE5EEEiEEvRNS_3COWINS_7IColumnEE11mutable_ptrIS7_EET0_
Line
Count
Source
42
2
void insert_numeric_impl(MutableColumnPtr& column, T value) {
43
2
    if (auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) {
44
1
        auto& nested = nullable_column->get_nested_column();
45
1
        assert_cast<ColumnType&>(nested).insert_value(value);
46
1
        nullable_column->push_false_to_nullmap(1);
47
1
    } else {
48
1
        assert_cast<ColumnType&>(*column).insert_value(value);
49
1
    }
50
2
}
parquet_utils.cpp:_ZN5doris13parquet_utils12_GLOBAL__N_119insert_numeric_implINS_12ColumnVectorILNS_13PrimitiveTypeE6EEElEEvRNS_3COWINS_7IColumnEE11mutable_ptrIS7_EET0_
Line
Count
Source
42
1
void insert_numeric_impl(MutableColumnPtr& column, T value) {
43
1
    if (auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) {
44
0
        auto& nested = nullable_column->get_nested_column();
45
0
        assert_cast<ColumnType&>(nested).insert_value(value);
46
0
        nullable_column->push_false_to_nullmap(1);
47
1
    } else {
48
1
        assert_cast<ColumnType&>(*column).insert_value(value);
49
1
    }
50
1
}
parquet_utils.cpp:_ZN5doris13parquet_utils12_GLOBAL__N_119insert_numeric_implINS_12ColumnVectorILNS_13PrimitiveTypeE2EEEhEEvRNS_3COWINS_7IColumnEE11mutable_ptrIS7_EET0_
Line
Count
Source
42
2
void insert_numeric_impl(MutableColumnPtr& column, T value) {
43
2
    if (auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) {
44
0
        auto& nested = nullable_column->get_nested_column();
45
0
        assert_cast<ColumnType&>(nested).insert_value(value);
46
0
        nullable_column->push_false_to_nullmap(1);
47
2
    } else {
48
2
        assert_cast<ColumnType&>(*column).insert_value(value);
49
2
    }
50
2
}
51
52
} // namespace
53
54
2
std::string join_path(const std::vector<std::string>& items) {
55
2
    return join(items, ".");
56
2
}
57
58
2
void insert_int32(MutableColumnPtr& column, Int32 value) {
59
2
    insert_numeric_impl<ColumnInt32>(column, value);
60
2
}
61
62
1
void insert_int64(MutableColumnPtr& column, Int64 value) {
63
1
    insert_numeric_impl<ColumnInt64>(column, value);
64
1
}
65
66
2
void insert_bool(MutableColumnPtr& column, bool value) {
67
2
    insert_numeric_impl<ColumnUInt8>(column, static_cast<UInt8>(value));
68
2
}
69
70
2
void insert_string(MutableColumnPtr& column, const std::string& value) {
71
2
    if (auto* nullable = check_and_get_column<ColumnNullable>(*column)) {
72
1
        nullable->get_null_map_data().push_back(0);
73
1
        auto& nested = nullable->get_nested_column();
74
1
        assert_cast<ColumnString&>(nested).insert_data(value.c_str(), value.size());
75
1
    } else {
76
1
        assert_cast<ColumnString&>(*column).insert_data(value.c_str(), value.size());
77
1
    }
78
2
}
79
80
2
void insert_null(MutableColumnPtr& column) {
81
2
    if (auto* nullable = check_and_get_column<ColumnNullable>(*column)) {
82
2
        nullable->get_null_map_data().push_back(1);
83
2
        nullable->get_nested_column().insert_default();
84
2
    } else {
85
0
        column->insert_default();
86
0
    }
87
2
}
88
89
2
std::string physical_type_to_string(tparquet::Type::type type) {
90
2
    switch (type) {
91
0
    case tparquet::Type::BOOLEAN:
92
0
        return "BOOLEAN";
93
1
    case tparquet::Type::INT32:
94
1
        return "INT32";
95
0
    case tparquet::Type::INT64:
96
0
        return "INT64";
97
0
    case tparquet::Type::INT96:
98
0
        return "INT96";
99
0
    case tparquet::Type::FLOAT:
100
0
        return "FLOAT";
101
0
    case tparquet::Type::DOUBLE:
102
0
        return "DOUBLE";
103
0
    case tparquet::Type::BYTE_ARRAY:
104
0
        return "BYTE_ARRAY";
105
0
    case tparquet::Type::FIXED_LEN_BYTE_ARRAY:
106
0
        return "FIXED_LEN_BYTE_ARRAY";
107
1
    default:
108
1
        return "UNKNOWN";
109
2
    }
110
2
}
111
112
2
std::string compression_to_string(tparquet::CompressionCodec::type codec) {
113
2
    switch (codec) {
114
0
    case tparquet::CompressionCodec::UNCOMPRESSED:
115
0
        return "UNCOMPRESSED";
116
1
    case tparquet::CompressionCodec::SNAPPY:
117
1
        return "SNAPPY";
118
0
    case tparquet::CompressionCodec::GZIP:
119
0
        return "GZIP";
120
0
    case tparquet::CompressionCodec::LZO:
121
0
        return "LZO";
122
0
    case tparquet::CompressionCodec::BROTLI:
123
0
        return "BROTLI";
124
0
    case tparquet::CompressionCodec::LZ4:
125
0
        return "LZ4";
126
0
    case tparquet::CompressionCodec::ZSTD:
127
0
        return "ZSTD";
128
0
    case tparquet::CompressionCodec::LZ4_RAW:
129
0
        return "LZ4_RAW";
130
1
    default:
131
1
        return "UNKNOWN";
132
2
    }
133
2
}
134
135
3
std::string converted_type_to_string(tparquet::ConvertedType::type type) {
136
3
    switch (type) {
137
1
    case tparquet::ConvertedType::UTF8:
138
1
        return "UTF8";
139
0
    case tparquet::ConvertedType::MAP:
140
0
        return "MAP";
141
0
    case tparquet::ConvertedType::MAP_KEY_VALUE:
142
0
        return "MAP_KEY_VALUE";
143
0
    case tparquet::ConvertedType::LIST:
144
0
        return "LIST";
145
0
    case tparquet::ConvertedType::ENUM:
146
0
        return "ENUM";
147
0
    case tparquet::ConvertedType::DECIMAL:
148
0
        return "DECIMAL";
149
0
    case tparquet::ConvertedType::DATE:
150
0
        return "DATE";
151
0
    case tparquet::ConvertedType::TIME_MILLIS:
152
0
        return "TIME_MILLIS";
153
0
    case tparquet::ConvertedType::TIME_MICROS:
154
0
        return "TIME_MICROS";
155
0
    case tparquet::ConvertedType::TIMESTAMP_MILLIS:
156
0
        return "TIMESTAMP_MILLIS";
157
0
    case tparquet::ConvertedType::TIMESTAMP_MICROS:
158
0
        return "TIMESTAMP_MICROS";
159
0
    case tparquet::ConvertedType::UINT_8:
160
0
        return "UINT_8";
161
0
    case tparquet::ConvertedType::UINT_16:
162
0
        return "UINT_16";
163
1
    case tparquet::ConvertedType::UINT_32:
164
1
        return "UINT_32";
165
0
    case tparquet::ConvertedType::UINT_64:
166
0
        return "UINT_64";
167
0
    case tparquet::ConvertedType::INT_8:
168
0
        return "INT_8";
169
0
    case tparquet::ConvertedType::INT_16:
170
0
        return "INT_16";
171
0
    case tparquet::ConvertedType::INT_32:
172
0
        return "INT_32";
173
0
    case tparquet::ConvertedType::INT_64:
174
0
        return "INT_64";
175
0
    case tparquet::ConvertedType::JSON:
176
0
        return "JSON";
177
0
    case tparquet::ConvertedType::BSON:
178
0
        return "BSON";
179
0
    case tparquet::ConvertedType::INTERVAL:
180
0
        return "INTERVAL";
181
1
    default:
182
1
        return "UNKNOWN";
183
3
    }
184
3
}
185
186
3
std::string logical_type_to_string(const tparquet::SchemaElement& element) {
187
3
    if (element.__isset.logicalType) {
188
1
        const auto& logical = element.logicalType;
189
1
        if (logical.__isset.STRING) {
190
1
            return "STRING";
191
1
        } else if (logical.__isset.MAP) {
192
0
            return "MAP";
193
0
        } else if (logical.__isset.LIST) {
194
0
            return "LIST";
195
0
        } else if (logical.__isset.ENUM) {
196
0
            return "ENUM";
197
0
        } else if (logical.__isset.DECIMAL) {
198
0
            return "DECIMAL";
199
0
        } else if (logical.__isset.DATE) {
200
0
            return "DATE";
201
0
        } else if (logical.__isset.TIME) {
202
0
            return "TIME";
203
0
        } else if (logical.__isset.TIMESTAMP) {
204
0
            return "TIMESTAMP";
205
0
        } else if (logical.__isset.INTEGER) {
206
0
            return "INTEGER";
207
0
        } else if (logical.__isset.UNKNOWN) {
208
0
            return "UNKNOWN";
209
0
        } else if (logical.__isset.JSON) {
210
0
            return "JSON";
211
0
        } else if (logical.__isset.BSON) {
212
0
            return "BSON";
213
0
        } else if (logical.__isset.UUID) {
214
0
            return "UUID";
215
0
        } else if (logical.__isset.FLOAT16) {
216
0
            return "FLOAT16";
217
0
        } else if (logical.__isset.VARIANT) {
218
0
            return "VARIANT";
219
0
        } else if (logical.__isset.GEOMETRY) {
220
0
            return "GEOMETRY";
221
0
        } else if (logical.__isset.GEOGRAPHY) {
222
0
            return "GEOGRAPHY";
223
0
        }
224
1
    }
225
2
    if (element.__isset.converted_type) {
226
1
        return converted_type_to_string(element.converted_type);
227
1
    }
228
1
    return "";
229
2
}
230
231
1
std::string encodings_to_string(const std::vector<tparquet::Encoding::type>& encodings) {
232
1
    std::vector<std::string> parts;
233
1
    parts.reserve(encodings.size());
234
4
    for (auto encoding : encodings) {
235
4
        switch (encoding) {
236
1
        case tparquet::Encoding::PLAIN:
237
1
            parts.emplace_back("PLAIN");
238
1
            break;
239
0
        case tparquet::Encoding::PLAIN_DICTIONARY:
240
0
            parts.emplace_back("PLAIN_DICTIONARY");
241
0
            break;
242
0
        case tparquet::Encoding::RLE:
243
0
            parts.emplace_back("RLE");
244
0
            break;
245
0
        case tparquet::Encoding::BIT_PACKED:
246
0
            parts.emplace_back("BIT_PACKED");
247
0
            break;
248
0
        case tparquet::Encoding::DELTA_BINARY_PACKED:
249
0
            parts.emplace_back("DELTA_BINARY_PACKED");
250
0
            break;
251
0
        case tparquet::Encoding::DELTA_LENGTH_BYTE_ARRAY:
252
0
            parts.emplace_back("DELTA_LENGTH_BYTE_ARRAY");
253
0
            break;
254
1
        case tparquet::Encoding::DELTA_BYTE_ARRAY:
255
1
            parts.emplace_back("DELTA_BYTE_ARRAY");
256
1
            break;
257
1
        case tparquet::Encoding::RLE_DICTIONARY:
258
1
            parts.emplace_back("RLE_DICTIONARY");
259
1
            break;
260
1
        default:
261
1
            parts.emplace_back("UNKNOWN");
262
1
            break;
263
4
        }
264
4
    }
265
1
    return fmt::format("{}", fmt::join(parts, ","));
266
1
}
267
268
bool try_get_statistics_encoded_value(const tparquet::Statistics& statistics, bool is_min,
269
5
                                      std::string* encoded_value) {
270
5
    if (is_min) {
271
3
        if (statistics.__isset.min_value) {
272
1
            *encoded_value = statistics.min_value;
273
1
            return true;
274
1
        }
275
2
        if (statistics.__isset.min) {
276
1
            *encoded_value = statistics.min;
277
1
            return true;
278
1
        }
279
2
    } else {
280
2
        if (statistics.__isset.max_value) {
281
1
            *encoded_value = statistics.max_value;
282
1
            return true;
283
1
        }
284
1
        if (statistics.__isset.max) {
285
1
            *encoded_value = statistics.max;
286
1
            return true;
287
1
        }
288
1
    }
289
1
    encoded_value->clear();
290
1
    return false;
291
5
}
292
293
2
std::string bytes_to_hex_string(const std::string& bytes) {
294
2
    static constexpr char kHexDigits[] = "0123456789ABCDEF";
295
2
    std::string hex;
296
2
    hex.resize(bytes.size() * 2);
297
8
    for (size_t i = 0; i < bytes.size(); ++i) {
298
6
        auto byte = static_cast<uint8_t>(bytes[i]);
299
6
        hex[i * 2] = kHexDigits[byte >> 4];
300
6
        hex[i * 2 + 1] = kHexDigits[byte & 0x0F];
301
6
    }
302
2
    return fmt::format("0x{}", hex);
303
2
}
304
305
std::string decode_statistics_value(const FieldSchema* schema_field,
306
                                    tparquet::Type::type physical_type,
307
3
                                    const std::string& encoded_value, const cctz::time_zone& ctz) {
308
3
    if (encoded_value.empty()) {
309
1
        return "";
310
1
    }
311
2
    if (schema_field == nullptr) {
312
1
        return bytes_to_hex_string(encoded_value);
313
1
    }
314
315
1
    auto logical_data_type = remove_nullable(schema_field->data_type);
316
1
    auto converter = parquet::PhysicalToLogicalConverter::get_converter(
317
1
            schema_field, logical_data_type, logical_data_type, &ctz);
318
1
    if (!converter || !converter->support()) {
319
0
        return bytes_to_hex_string(encoded_value);
320
0
    }
321
322
1
    ColumnPtr physical_column;
323
1
    switch (physical_type) {
324
0
    case tparquet::Type::type::BOOLEAN: {
325
0
        if (encoded_value.size() != sizeof(UInt8)) {
326
0
            return bytes_to_hex_string(encoded_value);
327
0
        }
328
0
        auto physical_col = ColumnUInt8::create();
329
0
        physical_col->insert_value(doris::unaligned_load<UInt8>(encoded_value.data()));
330
0
        physical_column = std::move(physical_col);
331
0
        break;
332
0
    }
333
1
    case tparquet::Type::type::INT32: {
334
1
        if (encoded_value.size() != sizeof(Int32)) {
335
0
            return bytes_to_hex_string(encoded_value);
336
0
        }
337
1
        auto physical_col = ColumnInt32::create();
338
1
        physical_col->insert_value(doris::unaligned_load<Int32>(encoded_value.data()));
339
1
        physical_column = std::move(physical_col);
340
1
        break;
341
1
    }
342
0
    case tparquet::Type::type::INT64: {
343
0
        if (encoded_value.size() != sizeof(Int64)) {
344
0
            return bytes_to_hex_string(encoded_value);
345
0
        }
346
0
        auto physical_col = ColumnInt64::create();
347
0
        physical_col->insert_value(doris::unaligned_load<Int64>(encoded_value.data()));
348
0
        physical_column = std::move(physical_col);
349
0
        break;
350
0
    }
351
0
    case tparquet::Type::type::FLOAT: {
352
0
        if (encoded_value.size() != sizeof(Float32)) {
353
0
            return bytes_to_hex_string(encoded_value);
354
0
        }
355
0
        auto physical_col = ColumnFloat32::create();
356
0
        physical_col->insert_value(doris::unaligned_load<Float32>(encoded_value.data()));
357
0
        physical_column = std::move(physical_col);
358
0
        break;
359
0
    }
360
0
    case tparquet::Type::type::DOUBLE: {
361
0
        if (encoded_value.size() != sizeof(Float64)) {
362
0
            return bytes_to_hex_string(encoded_value);
363
0
        }
364
0
        auto physical_col = ColumnFloat64::create();
365
0
        physical_col->insert_value(doris::unaligned_load<Float64>(encoded_value.data()));
366
0
        physical_column = std::move(physical_col);
367
0
        break;
368
0
    }
369
0
    case tparquet::Type::type::BYTE_ARRAY: {
370
0
        auto physical_col = ColumnString::create();
371
0
        physical_col->insert_data(encoded_value.data(), encoded_value.size());
372
0
        physical_column = std::move(physical_col);
373
0
        break;
374
0
    }
375
0
    case tparquet::Type::type::FIXED_LEN_BYTE_ARRAY: {
376
0
        int32_t type_length = schema_field->parquet_schema.__isset.type_length
377
0
                                      ? schema_field->parquet_schema.type_length
378
0
                                      : 0;
379
0
        if (type_length <= 0) {
380
0
            type_length = static_cast<int32_t>(encoded_value.size());
381
0
        }
382
0
        if (static_cast<size_t>(type_length) != encoded_value.size()) {
383
0
            return bytes_to_hex_string(encoded_value);
384
0
        }
385
0
        auto physical_col = ColumnUInt8::create();
386
0
        physical_col->resize(type_length);
387
0
        memcpy(physical_col->get_data().data(), encoded_value.data(), encoded_value.size());
388
0
        physical_column = std::move(physical_col);
389
0
        break;
390
0
    }
391
0
    case tparquet::Type::type::INT96: {
392
0
        constexpr size_t kInt96Size = 12;
393
0
        if (encoded_value.size() != kInt96Size) {
394
0
            return bytes_to_hex_string(encoded_value);
395
0
        }
396
0
        auto physical_col = ColumnInt8::create();
397
0
        physical_col->resize(kInt96Size);
398
0
        memcpy(physical_col->get_data().data(), encoded_value.data(), encoded_value.size());
399
0
        physical_column = std::move(physical_col);
400
0
        break;
401
0
    }
402
0
    default:
403
0
        return bytes_to_hex_string(encoded_value);
404
1
    }
405
406
1
    ColumnPtr logical_column;
407
1
    if (converter->is_consistent()) {
408
1
        logical_column = physical_column;
409
1
    } else {
410
0
        logical_column = logical_data_type->create_column();
411
0
        if (Status st = converter->physical_convert(physical_column, logical_column); !st.ok()) {
412
0
            return bytes_to_hex_string(encoded_value);
413
0
        }
414
0
    }
415
416
1
    if (logical_column->size() != 1) {
417
0
        return bytes_to_hex_string(encoded_value);
418
0
    }
419
1
    DataTypeSerDe::FormatOptions options;
420
1
    options.timezone = &ctz;
421
1
    return logical_data_type->to_string(*logical_column, 0, options);
422
1
}
423
424
void build_path_map(const FieldSchema& field, const std::string& prefix,
425
4
                    std::unordered_map<std::string, const FieldSchema*>* map) {
426
4
    std::string current = prefix.empty() ? field.name : fmt::format("{}.{}", prefix, field.name);
427
4
    if (field.children.empty()) {
428
2
        (*map)[current] = &field;
429
2
    } else {
430
3
        for (const auto& child : field.children) {
431
3
            build_path_map(child, current, map);
432
3
        }
433
2
    }
434
4
}
435
436
#define MERGE_STATS_CASE(ParquetType)                                                     \
437
0
    case ParquetType: {                                                                   \
438
0
        auto typed_left_stat = std::static_pointer_cast<                                  \
439
0
                ::parquet::TypedStatistics<::parquet::PhysicalType<ParquetType>>>(left);  \
440
0
        auto typed_right_stat = std::static_pointer_cast<                                 \
441
0
                ::parquet::TypedStatistics<::parquet::PhysicalType<ParquetType>>>(right); \
442
0
        typed_left_stat->Merge(*typed_right_stat);                                        \
443
0
        return;                                                                           \
444
0
    }
445
446
void merge_stats(const std::shared_ptr<::parquet::Statistics>& left,
447
0
                 const std::shared_ptr<::parquet::Statistics>& right) {
448
0
    if (left == nullptr || right == nullptr) {
449
0
        return;
450
0
    }
451
0
    DCHECK(left->physical_type() == right->physical_type());
452
453
0
    switch (left->physical_type()) {
454
0
        MERGE_STATS_CASE(::parquet::Type::BOOLEAN);
455
0
        MERGE_STATS_CASE(::parquet::Type::INT32);
456
0
        MERGE_STATS_CASE(::parquet::Type::INT64);
457
0
        MERGE_STATS_CASE(::parquet::Type::INT96);
458
0
        MERGE_STATS_CASE(::parquet::Type::FLOAT);
459
0
        MERGE_STATS_CASE(::parquet::Type::DOUBLE);
460
0
        MERGE_STATS_CASE(::parquet::Type::BYTE_ARRAY);
461
0
        MERGE_STATS_CASE(::parquet::Type::FIXED_LEN_BYTE_ARRAY);
462
0
    default:
463
        LOG(WARNING) << "Unsupported parquet type for statistics merge: "
464
0
                     << static_cast<int>(left->physical_type());
465
0
        break;
466
0
    }
467
0
}
468
469
} // namespace doris::parquet_utils