Coverage Report

Created: 2026-05-17 18:02

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/parquet/vparquet_column_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/parquet/vparquet_column_reader.h"
19
20
#include <cctz/time_zone.h>
21
#include <gen_cpp/parquet_types.h>
22
#include <rapidjson/document.h>
23
#include <sys/types.h>
24
25
#include <algorithm>
26
#include <deque>
27
#include <limits>
28
#include <map>
29
#include <set>
30
#include <string_view>
31
#include <utility>
32
#include <vector>
33
34
#include "common/exception.h"
35
#include "common/status.h"
36
#include "core/column/column.h"
37
#include "core/column/column_array.h"
38
#include "core/column/column_map.h"
39
#include "core/column/column_nullable.h"
40
#include "core/column/column_string.h"
41
#include "core/column/column_struct.h"
42
#include "core/column/column_varbinary.h"
43
#include "core/column/column_variant.h"
44
#include "core/data_type/data_type_array.h"
45
#include "core/data_type/data_type_factory.hpp"
46
#include "core/data_type/data_type_jsonb.h"
47
#include "core/data_type/data_type_map.h"
48
#include "core/data_type/data_type_nullable.h"
49
#include "core/data_type/data_type_number.h"
50
#include "core/data_type/data_type_string.h"
51
#include "core/data_type/data_type_struct.h"
52
#include "core/data_type/data_type_variant.h"
53
#include "core/data_type/define_primitive_type.h"
54
#include "core/data_type_serde/data_type_serde.h"
55
#include "core/string_buffer.hpp"
56
#include "core/value/jsonb_value.h"
57
#include "core/value/timestamptz_value.h"
58
#include "core/value/vdatetime_value.h"
59
#include "exec/common/variant_util.h"
60
#include "format/parquet/level_decoder.h"
61
#include "format/parquet/parquet_variant_reader.h"
62
#include "format/parquet/schema_desc.h"
63
#include "format/parquet/vparquet_column_chunk_reader.h"
64
#include "io/fs/tracing_file_reader.h"
65
#include "runtime/runtime_profile.h"
66
#include "util/jsonb_document.h"
67
68
namespace doris {
69
static void fill_struct_null_map(FieldSchema* field, NullMap& null_map,
70
                                 const std::vector<level_t>& rep_levels,
71
11
                                 const std::vector<level_t>& def_levels) {
72
11
    size_t num_levels = def_levels.size();
73
11
    DCHECK_EQ(num_levels, rep_levels.size());
74
11
    size_t origin_size = null_map.size();
75
11
    null_map.resize(origin_size + num_levels);
76
11
    size_t pos = origin_size;
77
26
    for (size_t i = 0; i < num_levels; ++i) {
78
        // skip the levels affect its ancestor or its descendants
79
15
        if (def_levels[i] < field->repeated_parent_def_level ||
80
15
            rep_levels[i] > field->repetition_level) {
81
0
            continue;
82
0
        }
83
15
        if (def_levels[i] >= field->definition_level) {
84
15
            null_map[pos++] = 0;
85
15
        } else {
86
0
            null_map[pos++] = 1;
87
0
        }
88
15
    }
89
11
    null_map.resize(pos);
90
11
}
91
92
static void fill_array_offset(FieldSchema* field, ColumnArray::Offsets64& offsets_data,
93
                              NullMap* null_map_ptr, const std::vector<level_t>& rep_levels,
94
2
                              const std::vector<level_t>& def_levels) {
95
2
    size_t num_levels = rep_levels.size();
96
2
    DCHECK_EQ(num_levels, def_levels.size());
97
2
    size_t origin_size = offsets_data.size();
98
2
    offsets_data.resize(origin_size + num_levels);
99
2
    if (null_map_ptr != nullptr) {
100
2
        null_map_ptr->resize(origin_size + num_levels);
101
2
    }
102
2
    size_t offset_pos = origin_size - 1;
103
8
    for (size_t i = 0; i < num_levels; ++i) {
104
        // skip the levels affect its ancestor or its descendants
105
6
        if (def_levels[i] < field->repeated_parent_def_level ||
106
6
            rep_levels[i] > field->repetition_level) {
107
0
            continue;
108
0
        }
109
6
        if (rep_levels[i] == field->repetition_level) {
110
4
            offsets_data[offset_pos]++;
111
4
            continue;
112
4
        }
113
2
        offset_pos++;
114
2
        offsets_data[offset_pos] = offsets_data[offset_pos - 1];
115
2
        if (def_levels[i] >= field->definition_level) {
116
2
            offsets_data[offset_pos]++;
117
2
        }
118
2
        if (def_levels[i] >= field->definition_level - 1) {
119
2
            (*null_map_ptr)[offset_pos] = 0;
120
2
        } else {
121
0
            (*null_map_ptr)[offset_pos] = 1;
122
0
        }
123
2
    }
124
2
    offsets_data.resize(offset_pos + 1);
125
2
    if (null_map_ptr != nullptr) {
126
2
        null_map_ptr->resize(offset_pos + 1);
127
2
    }
128
2
}
129
130
static constexpr int64_t UNIX_EPOCH_DAYNR = 719528;
131
static constexpr int64_t MICROS_PER_SECOND = 1000000;
132
133
0
static int64_t variant_date_value(const VecDateTimeValue& value) {
134
0
    return value.daynr() - UNIX_EPOCH_DAYNR;
135
0
}
136
137
1
static int64_t variant_date_value(const DateV2Value<DateV2ValueType>& value) {
138
1
    return value.daynr() - UNIX_EPOCH_DAYNR;
139
1
}
140
141
0
static int64_t variant_datetime_value(const VecDateTimeValue& value) {
142
0
    int64_t timestamp = 0;
143
0
    value.unix_timestamp(&timestamp, cctz::utc_time_zone());
144
0
    return timestamp * MICROS_PER_SECOND;
145
0
}
146
147
1
static int64_t variant_datetime_value(const DateV2Value<DateTimeV2ValueType>& value) {
148
1
    int64_t timestamp = 0;
149
1
    value.unix_timestamp(&timestamp, cctz::utc_time_zone());
150
1
    return timestamp * MICROS_PER_SECOND + value.microsecond();
151
1
}
152
153
0
static int64_t variant_datetime_value(const TimestampTzValue& value) {
154
0
    int64_t timestamp = 0;
155
0
    value.unix_timestamp(&timestamp, cctz::utc_time_zone());
156
0
    return timestamp * MICROS_PER_SECOND + value.microsecond();
157
0
}
158
159
114
static int find_child_idx(const FieldSchema& field, std::string_view name) {
160
236
    for (int i = 0; i < field.children.size(); ++i) {
161
209
        if (field.children[i].lower_case_name == name) {
162
87
            return i;
163
87
        }
164
209
    }
165
27
    return -1;
166
114
}
167
168
0
static bool is_variant_wrapper_typed_value_child(const FieldSchema& field) {
169
0
    auto type = remove_nullable(field.data_type);
170
0
    return type->get_primitive_type() == TYPE_STRUCT || type->get_primitive_type() == TYPE_ARRAY;
171
0
}
172
173
20
static bool is_unannotated_variant_value_field(const FieldSchema& field) {
174
    // VARIANT residual value is raw binary; annotated strings named value are user fields.
175
20
    return field.lower_case_name == "value" && field.physical_type == tparquet::Type::BYTE_ARRAY &&
176
20
           !field.parquet_schema.__isset.logicalType &&
177
20
           !field.parquet_schema.__isset.converted_type;
178
20
}
179
180
2
static bool is_unannotated_variant_metadata_field(const FieldSchema& field) {
181
2
    return field.lower_case_name == "metadata" &&
182
2
           field.physical_type == tparquet::Type::BYTE_ARRAY &&
183
2
           !field.parquet_schema.__isset.logicalType &&
184
2
           !field.parquet_schema.__isset.converted_type;
185
2
}
186
187
static bool is_variant_wrapper_field(const FieldSchema& field,
188
95
                                     bool allow_scalar_typed_value_only_wrapper) {
189
95
    auto type = remove_nullable(field.data_type);
190
95
    if (type->get_primitive_type() != TYPE_STRUCT && type->get_primitive_type() != TYPE_VARIANT) {
191
76
        return false;
192
76
    }
193
194
19
    bool has_metadata = false;
195
19
    bool has_value = false;
196
19
    const FieldSchema* typed_value = nullptr;
197
25
    for (const auto& child : field.children) {
198
25
        if (child.lower_case_name == "metadata") {
199
2
            if (!is_unannotated_variant_metadata_field(child)) {
200
1
                return false;
201
1
            }
202
1
            has_metadata = true;
203
1
            continue;
204
2
        }
205
23
        if (child.lower_case_name == "value") {
206
9
            if (!is_unannotated_variant_value_field(child)) {
207
5
                return false;
208
5
            }
209
4
            has_value = true;
210
4
            continue;
211
9
        }
212
14
        if (child.lower_case_name == "typed_value") {
213
5
            typed_value = &child;
214
5
            continue;
215
5
        }
216
9
        return false;
217
14
    }
218
4
    if (has_metadata) {
219
1
        return type->get_primitive_type() == TYPE_VARIANT && (has_value || typed_value != nullptr);
220
1
    }
221
3
    if (has_value) {
222
3
        return typed_value != nullptr;
223
3
    }
224
0
    return typed_value != nullptr && (allow_scalar_typed_value_only_wrapper ||
225
0
                                      is_variant_wrapper_typed_value_child(*typed_value));
226
3
}
227
228
34
static bool is_value_only_variant_wrapper_candidate(const FieldSchema& field) {
229
34
    auto type = remove_nullable(field.data_type);
230
34
    if (type->get_primitive_type() != TYPE_STRUCT && type->get_primitive_type() != TYPE_VARIANT) {
231
23
        return false;
232
23
    }
233
234
11
    bool has_value = false;
235
11
    for (const auto& child : field.children) {
236
11
        if (is_unannotated_variant_value_field(child)) {
237
1
            has_value = true;
238
1
            continue;
239
1
        }
240
10
        return false;
241
11
    }
242
1
    return has_value;
243
11
}
244
245
49
static Status get_binary_field(const Field& field, std::string* value, bool* present) {
246
49
    if (field.is_null()) {
247
8
        *present = false;
248
8
        return Status::OK();
249
8
    }
250
41
    *present = true;
251
41
    switch (field.get_type()) {
252
41
    case TYPE_STRING:
253
41
        *value = field.get<TYPE_STRING>();
254
41
        return Status::OK();
255
0
    case TYPE_CHAR:
256
0
        *value = field.get<TYPE_CHAR>();
257
0
        return Status::OK();
258
0
    case TYPE_VARCHAR:
259
0
        *value = field.get<TYPE_VARCHAR>();
260
0
        return Status::OK();
261
0
    case TYPE_VARBINARY: {
262
0
        auto ref = field.get<TYPE_VARBINARY>().to_string_ref();
263
0
        value->assign(ref.data, ref.size);
264
0
        return Status::OK();
265
0
    }
266
0
    default:
267
0
        return Status::Corruption("Parquet VARIANT binary field has unexpected Doris type {}",
268
0
                                  field.get_type_name());
269
41
    }
270
41
}
271
272
5
static PathInData append_path(const PathInData& prefix, const PathInData& suffix) {
273
5
    if (prefix.empty()) {
274
5
        return suffix;
275
5
    }
276
0
    if (suffix.empty()) {
277
0
        return prefix;
278
0
    }
279
0
    PathInDataBuilder builder;
280
0
    builder.append(prefix.get_parts(), false);
281
0
    builder.append(suffix.get_parts(), false);
282
0
    return builder.build();
283
0
}
284
285
19
static Status make_jsonb_field(std::string_view json, FieldWithDataType* value) {
286
19
    JsonBinaryValue jsonb_value;
287
19
    RETURN_IF_ERROR(jsonb_value.from_json_string(json.data(), json.size()));
288
19
    value->field =
289
19
            Field::create_field<TYPE_JSONB>(JsonbField(jsonb_value.value(), jsonb_value.size()));
290
19
    value->base_scalar_type_id = TYPE_JSONB;
291
19
    value->num_dimensions = 0;
292
19
    value->precision = 0;
293
19
    value->scale = 0;
294
19
    return Status::OK();
295
19
}
296
297
1
static std::string make_null_array_json(size_t elements) {
298
1
    std::string json = "[";
299
2
    for (size_t i = 0; i < elements; ++i) {
300
1
        if (i != 0) {
301
0
            json.push_back(',');
302
0
        }
303
1
        json.append("null");
304
1
    }
305
1
    json.push_back(']');
306
1
    return json;
307
1
}
308
309
14
static Status make_empty_object_field(Field* field) {
310
14
    FieldWithDataType value;
311
14
    RETURN_IF_ERROR(make_jsonb_field("{}", &value));
312
14
    *field = std::move(value.field);
313
14
    return Status::OK();
314
14
}
315
316
static Status insert_jsonb_value(const PathInData& path, std::string_view json,
317
4
                                 VariantMap* values) {
318
4
    FieldWithDataType value;
319
4
    RETURN_IF_ERROR(make_jsonb_field(json, &value));
320
4
    (*values)[path] = std::move(value);
321
4
    return Status::OK();
322
4
}
323
324
4
static Status insert_empty_object_marker(const PathInData& path, VariantMap* values) {
325
4
    return insert_jsonb_value(path, "{}", values);
326
4
}
327
328
48
static bool is_empty_object_marker(const FieldWithDataType& value) {
329
48
    if (value.field.get_type() != TYPE_JSONB) {
330
37
        return false;
331
37
    }
332
11
    const auto& jsonb = value.field.get<TYPE_JSONB>();
333
11
    const JsonbDocument* document = nullptr;
334
11
    Status st =
335
11
            JsonbDocument::checkAndCreateDocument(jsonb.get_value(), jsonb.get_size(), &document);
336
11
    if (!st.ok() || document == nullptr || document->getValue() == nullptr ||
337
11
        !document->getValue()->isObject()) {
338
1
        return false;
339
1
    }
340
10
    return document->getValue()->unpack<ObjectVal>()->numElem() == 0;
341
11
}
342
343
static Status collect_empty_object_markers(const rapidjson::Value& value, PathInDataBuilder* path,
344
1
                                           VariantMap* values) {
345
1
    if (!value.IsObject()) {
346
0
        return Status::OK();
347
0
    }
348
1
    if (value.MemberCount() == 0) {
349
1
        return insert_empty_object_marker(path->build(), values);
350
1
    }
351
0
    for (auto it = value.MemberBegin(); it != value.MemberEnd(); ++it) {
352
0
        if (it->value.IsObject()) {
353
0
            path->append(std::string_view(it->name.GetString(), it->name.GetStringLength()), false);
354
0
            RETURN_IF_ERROR(collect_empty_object_markers(it->value, path, values));
355
0
            path->pop_back();
356
0
        }
357
0
    }
358
0
    return Status::OK();
359
0
}
360
361
static Status add_empty_object_markers_from_json(const std::string& json, const PathInData& prefix,
362
6
                                                 VariantMap* values) {
363
6
    if (json.find("{}") == std::string::npos) {
364
5
        return Status::OK();
365
5
    }
366
1
    rapidjson::Document document;
367
1
    document.Parse(json.data(), json.size());
368
1
    if (document.HasParseError()) {
369
0
        return Status::Corruption("Invalid Parquet VARIANT decoded JSON");
370
0
    }
371
1
    PathInDataBuilder path;
372
1
    path.append(prefix.get_parts(), false);
373
1
    return collect_empty_object_markers(document, &path, values);
374
1
}
375
376
static Status parse_json_to_variant_map(const std::string& json, const PathInData& prefix,
377
6
                                        VariantMap* values) {
378
6
    auto parsed_column = ColumnVariant::create(0, false);
379
6
    ParseConfig parse_config;
380
6
    StringRef json_ref(json.data(), json.size());
381
6
    RETURN_IF_CATCH_EXCEPTION(
382
6
            variant_util::parse_json_to_variant(*parsed_column, json_ref, nullptr, parse_config));
383
6
    Field parsed = (*parsed_column)[0];
384
6
    if (!parsed.is_null()) {
385
5
        auto& parsed_values = parsed.get<TYPE_VARIANT>();
386
5
        for (auto& [path, value] : parsed_values) {
387
5
            (*values)[append_path(prefix, path)] = std::move(value);
388
5
        }
389
5
    }
390
6
    RETURN_IF_ERROR(add_empty_object_markers_from_json(json, prefix, values));
391
6
    return Status::OK();
392
6
}
393
394
2
static Status variant_map_to_json(VariantMap values, std::string* json) {
395
2
    auto variant_column = ColumnVariant::create(0, false);
396
2
    RETURN_IF_CATCH_EXCEPTION(
397
2
            variant_column->insert(Field::create_field<TYPE_VARIANT>(std::move(values))));
398
2
    DataTypeSerDe::FormatOptions options;
399
2
    variant_column->serialize_one_row_to_string(0, json, options);
400
2
    return Status::OK();
401
2
}
402
403
18
static bool path_has_prefix(const PathInData& path, const PathInData& prefix) {
404
18
    const auto& parts = path.get_parts();
405
18
    const auto& prefix_parts = prefix.get_parts();
406
18
    if (parts.size() < prefix_parts.size()) {
407
0
        return false;
408
0
    }
409
19
    for (size_t i = 0; i < prefix_parts.size(); ++i) {
410
1
        if (parts[i] != prefix_parts[i]) {
411
0
            return false;
412
0
        }
413
1
    }
414
18
    return true;
415
18
}
416
417
15
static bool has_descendant_path(const VariantMap& values, const PathInData& prefix) {
418
15
    const size_t prefix_size = prefix.get_parts().size();
419
15
    return std::ranges::any_of(values, [&](const auto& entry) {
420
12
        const auto& path = entry.first;
421
12
        return path.get_parts().size() > prefix_size && path_has_prefix(path, prefix);
422
12
    });
423
15
}
424
425
static void erase_shadowed_empty_object_markers(VariantMap* values,
426
62
                                                const VariantMap& shadowing_values) {
427
102
    for (auto it = values->begin(); it != values->end();) {
428
40
        if (is_empty_object_marker(it->second) &&
429
40
            (has_descendant_path(*values, it->first) ||
430
7
             has_descendant_path(shadowing_values, it->first))) {
431
3
            it = values->erase(it);
432
3
            continue;
433
3
        }
434
37
        ++it;
435
37
    }
436
62
}
437
438
static void erase_shadowed_empty_object_markers(VariantMap* value_values,
439
31
                                                VariantMap* typed_values) {
440
31
    erase_shadowed_empty_object_markers(value_values, *typed_values);
441
31
    erase_shadowed_empty_object_markers(typed_values, *value_values);
442
31
}
443
444
static Status check_no_shredded_value_typed_duplicates(const VariantMap& value_values,
445
                                                       const VariantMap& typed_values,
446
31
                                                       const PathInData& prefix) {
447
31
    const size_t prefix_size = prefix.get_parts().size();
448
31
    for (const auto& value_entry : value_values) {
449
11
        const auto& value_path = value_entry.first;
450
11
        if (!path_has_prefix(value_path, prefix)) {
451
0
            continue;
452
0
        }
453
11
        if (value_path.get_parts().size() == prefix_size) {
454
4
            if (is_empty_object_marker(value_entry.second) &&
455
4
                !has_descendant_path(typed_values, value_path)) {
456
1
                continue;
457
1
            }
458
3
            if (!typed_values.empty()) {
459
0
                return Status::Corruption(
460
0
                        "Parquet VARIANT residual value conflicts with typed_value at path {}",
461
0
                        value_path.get_path());
462
0
            }
463
3
            continue;
464
3
        }
465
7
        for (const auto& typed_entry : typed_values) {
466
4
            const auto& typed_path = typed_entry.first;
467
4
            if (!path_has_prefix(typed_path, prefix)) {
468
0
                continue;
469
0
            }
470
4
            if (typed_path.get_parts().size() == prefix_size) {
471
0
                if (is_empty_object_marker(typed_entry.second) &&
472
0
                    !has_descendant_path(value_values, typed_path)) {
473
0
                    continue;
474
0
                }
475
0
                return Status::Corruption(
476
0
                        "Parquet VARIANT residual value and typed_value contain duplicate field {}",
477
0
                        value_path.get_parts()[prefix_size].key);
478
0
            }
479
4
            if (value_path.get_parts()[prefix_size] == typed_path.get_parts()[prefix_size]) {
480
3
                if (value_path == typed_path && is_empty_object_marker(value_entry.second) &&
481
3
                    is_empty_object_marker(typed_entry.second)) {
482
1
                    continue;
483
1
                }
484
2
                return Status::Corruption(
485
2
                        "Parquet VARIANT residual value and typed_value contain duplicate field {}",
486
2
                        value_path.get_parts()[prefix_size].key);
487
3
            }
488
4
        }
489
7
    }
490
29
    return Status::OK();
491
31
}
492
493
70
static bool has_direct_typed_parent_null(const std::vector<const NullMap*>& null_maps, size_t row) {
494
95
    return std::ranges::any_of(null_maps, [&](const NullMap* null_map) {
495
95
        DCHECK_LT(row, null_map->size());
496
95
        return (*null_map)[row];
497
95
    });
498
70
}
499
500
static void insert_direct_typed_leaf_range(const IColumn& column, size_t start, size_t rows,
501
                                           const std::vector<const NullMap*>& parent_null_maps,
502
13
                                           IColumn* variant_leaf) {
503
13
    auto& nullable_leaf = assert_cast<ColumnNullable&>(*variant_leaf);
504
13
    const IColumn* value_column = &column;
505
13
    const NullMap* leaf_null_map = nullptr;
506
13
    if (const auto* nullable_column = check_and_get_column<ColumnNullable>(&column)) {
507
0
        value_column = &nullable_column->get_nested_column();
508
0
        leaf_null_map = &nullable_column->get_null_map_data();
509
0
    }
510
511
13
    nullable_leaf.get_nested_column().insert_range_from(*value_column, start, rows);
512
13
    auto& null_map = nullable_leaf.get_null_map_data();
513
13
    null_map.reserve(null_map.size() + rows);
514
33
    for (size_t i = 0; i < rows; ++i) {
515
20
        const size_t row = start + i;
516
20
        const bool leaf_is_null = leaf_null_map != nullptr && (*leaf_null_map)[row];
517
20
        null_map.push_back(leaf_is_null || has_direct_typed_parent_null(parent_null_maps, row));
518
20
    }
519
13
}
520
521
77
static bool is_temporal_variant_leaf_type(PrimitiveType type) {
522
77
    switch (type) {
523
5
    case TYPE_TIMEV2:
524
5
    case TYPE_DATE:
525
5
    case TYPE_DATETIME:
526
9
    case TYPE_DATEV2:
527
11
    case TYPE_DATETIMEV2:
528
11
    case TYPE_TIMESTAMPTZ:
529
11
        return true;
530
66
    default:
531
66
        return false;
532
77
    }
533
77
}
534
535
17
static bool is_floating_point_variant_leaf_type(PrimitiveType type) {
536
17
    switch (type) {
537
0
    case TYPE_FLOAT:
538
5
    case TYPE_DOUBLE:
539
5
        return true;
540
12
    default:
541
12
        return false;
542
17
    }
543
17
}
544
545
static bool is_uuid_typed_value_field(const FieldSchema& field_schema);
546
static bool contains_uuid_typed_value_field(const FieldSchema& field_schema);
547
548
20
static DataTypePtr direct_variant_leaf_type(const DataTypePtr& data_type) {
549
20
    const auto& type = remove_nullable(data_type);
550
20
    if (is_temporal_variant_leaf_type(type->get_primitive_type())) {
551
5
        return std::make_shared<DataTypeInt64>();
552
5
    }
553
15
    return type;
554
20
}
555
556
28
static DataTypePtr direct_variant_leaf_type(const FieldSchema& field_schema) {
557
28
    const auto& type = remove_nullable(field_schema.data_type);
558
28
    if (is_uuid_typed_value_field(field_schema)) {
559
3
        return std::make_shared<DataTypeString>();
560
3
    }
561
25
    if (type->get_primitive_type() == TYPE_ARRAY) {
562
5
        DORIS_CHECK(!field_schema.children.empty());
563
5
        DataTypePtr nested_type = direct_variant_leaf_type(field_schema.children[0]);
564
5
        if (field_schema.children[0].data_type->is_nullable()) {
565
5
            nested_type = make_nullable(nested_type);
566
5
        }
567
5
        return std::make_shared<DataTypeArray>(nested_type);
568
5
    }
569
20
    return direct_variant_leaf_type(field_schema.data_type);
570
25
}
571
572
16
static bool contains_temporal_variant_leaf_type(const DataTypePtr& data_type) {
573
16
    const auto& type = remove_nullable(data_type);
574
16
    if (is_temporal_variant_leaf_type(type->get_primitive_type())) {
575
1
        return true;
576
1
    }
577
15
    if (type->get_primitive_type() == TYPE_ARRAY) {
578
8
        return contains_temporal_variant_leaf_type(
579
8
                assert_cast<const DataTypeArray*>(type.get())->get_nested_type());
580
8
    }
581
7
    return false;
582
15
}
583
584
14
static bool contains_floating_point_variant_leaf_type(const DataTypePtr& data_type) {
585
14
    const auto& type = remove_nullable(data_type);
586
14
    if (is_floating_point_variant_leaf_type(type->get_primitive_type())) {
587
2
        return true;
588
2
    }
589
12
    if (type->get_primitive_type() == TYPE_ARRAY) {
590
7
        return contains_floating_point_variant_leaf_type(
591
7
                assert_cast<const DataTypeArray*>(type.get())->get_nested_type());
592
7
    }
593
5
    return false;
594
12
}
595
596
static int64_t direct_temporal_variant_value(PrimitiveType type, const IColumn& column,
597
3
                                             size_t row) {
598
3
    switch (type) {
599
1
    case TYPE_TIMEV2:
600
1
        return static_cast<int64_t>(
601
1
                std::llround(assert_cast<const ColumnTimeV2&>(column).get_data()[row]));
602
0
    case TYPE_DATE:
603
0
        return variant_date_value(assert_cast<const ColumnDate&>(column).get_data()[row]);
604
0
    case TYPE_DATETIME:
605
0
        return variant_datetime_value(assert_cast<const ColumnDateTime&>(column).get_data()[row]);
606
1
    case TYPE_DATEV2:
607
1
        return variant_date_value(assert_cast<const ColumnDateV2&>(column).get_data()[row]);
608
1
    case TYPE_DATETIMEV2:
609
1
        return variant_datetime_value(assert_cast<const ColumnDateTimeV2&>(column).get_data()[row]);
610
0
    case TYPE_TIMESTAMPTZ:
611
0
        return variant_datetime_value(
612
0
                assert_cast<const ColumnTimeStampTz&>(column).get_data()[row]);
613
0
    default:
614
0
        DORIS_CHECK(false);
615
0
        return 0;
616
3
    }
617
3
}
618
619
static void insert_direct_typed_temporal_leaf_range(
620
        PrimitiveType type, const IColumn& column, size_t start, size_t rows,
621
4
        const std::vector<const NullMap*>& parent_null_maps, IColumn* variant_leaf) {
622
4
    auto& nullable_leaf = assert_cast<ColumnNullable&>(*variant_leaf);
623
4
    const IColumn* value_column = &column;
624
4
    const NullMap* leaf_null_map = nullptr;
625
4
    if (const auto* nullable_column = check_and_get_column<ColumnNullable>(&column)) {
626
0
        value_column = &nullable_column->get_nested_column();
627
0
        leaf_null_map = &nullable_column->get_null_map_data();
628
0
    }
629
630
4
    auto& data = assert_cast<ColumnInt64&>(nullable_leaf.get_nested_column()).get_data();
631
4
    data.reserve(data.size() + rows);
632
4
    auto& null_map = nullable_leaf.get_null_map_data();
633
4
    null_map.reserve(null_map.size() + rows);
634
8
    for (size_t i = 0; i < rows; ++i) {
635
4
        const size_t row = start + i;
636
4
        const bool leaf_is_null = leaf_null_map != nullptr && (*leaf_null_map)[row];
637
4
        const bool is_null = leaf_is_null || has_direct_typed_parent_null(parent_null_maps, row);
638
4
        if (is_null) {
639
1
            data.push_back(0);
640
1
            null_map.push_back(1);
641
1
            continue;
642
1
        }
643
3
        data.push_back(direct_temporal_variant_value(type, *value_column, row));
644
3
        null_map.push_back(0);
645
3
    }
646
4
}
647
648
static Status insert_direct_typed_uuid_leaf_range(
649
        const IColumn& column, size_t start, size_t rows,
650
1
        const std::vector<const NullMap*>& parent_null_maps, IColumn* variant_leaf) {
651
1
    auto& nullable_leaf = assert_cast<ColumnNullable&>(*variant_leaf);
652
1
    const IColumn* value_column = &column;
653
1
    const NullMap* leaf_null_map = nullptr;
654
1
    if (const auto* nullable_column = check_and_get_column<ColumnNullable>(&column)) {
655
0
        value_column = &nullable_column->get_nested_column();
656
0
        leaf_null_map = &nullable_column->get_null_map_data();
657
0
    }
658
659
1
    auto& data = assert_cast<ColumnString&>(nullable_leaf.get_nested_column());
660
1
    auto& null_map = nullable_leaf.get_null_map_data();
661
1
    null_map.reserve(null_map.size() + rows);
662
3
    for (size_t i = 0; i < rows; ++i) {
663
2
        const size_t row = start + i;
664
2
        const bool leaf_is_null = leaf_null_map != nullptr && (*leaf_null_map)[row];
665
2
        const bool is_null = leaf_is_null || has_direct_typed_parent_null(parent_null_maps, row);
666
2
        if (is_null) {
667
1
            data.insert_default();
668
1
            null_map.push_back(1);
669
1
            continue;
670
1
        }
671
1
        StringRef bytes = value_column->get_data_at(row);
672
1
        if (bytes.size != 16) {
673
0
            return Status::Corruption("Parquet VARIANT UUID typed_value has invalid length {}",
674
0
                                      bytes.size);
675
0
        }
676
1
        std::string uuid =
677
1
                parquet::format_variant_uuid(reinterpret_cast<const uint8_t*>(bytes.data));
678
1
        data.insert_data(uuid.data(), uuid.size());
679
1
        null_map.push_back(0);
680
1
    }
681
1
    return Status::OK();
682
1
}
683
684
3
static void append_json_string(std::string_view value, std::string* json) {
685
3
    auto column = ColumnString::create();
686
3
    VectorBufferWriter writer(*column);
687
3
    writer.write_json_string(value);
688
3
    writer.commit();
689
3
    json->append(column->get_data_at(0).data, column->get_data_at(0).size);
690
3
}
691
692
static bool is_column_selected(const FieldSchema& field_schema,
693
136
                               const std::set<uint64_t>& column_ids) {
694
136
    return column_ids.empty() || column_ids.find(field_schema.get_column_id()) != column_ids.end();
695
136
}
696
697
static bool has_selected_column(const FieldSchema& field_schema,
698
136
                                const std::set<uint64_t>& column_ids) {
699
136
    if (is_column_selected(field_schema, column_ids)) {
700
123
        return true;
701
123
    }
702
13
    return std::any_of(field_schema.children.begin(), field_schema.children.end(),
703
13
                       [&column_ids](const FieldSchema& child) {
704
10
                           return has_selected_column(child, column_ids);
705
10
                       });
706
136
}
707
708
45
static bool is_direct_variant_leaf_type(const DataTypePtr& data_type) {
709
45
    const auto& type = remove_nullable(data_type);
710
45
    switch (type->get_primitive_type()) {
711
0
    case TYPE_BOOLEAN:
712
0
    case TYPE_TINYINT:
713
0
    case TYPE_SMALLINT:
714
4
    case TYPE_INT:
715
13
    case TYPE_BIGINT:
716
13
    case TYPE_LARGEINT:
717
13
    case TYPE_DECIMALV2:
718
13
    case TYPE_DECIMAL32:
719
14
    case TYPE_DECIMAL64:
720
14
    case TYPE_DECIMAL128I:
721
14
    case TYPE_DECIMAL256:
722
17
    case TYPE_FLOAT:
723
23
    case TYPE_DOUBLE:
724
23
    case TYPE_STRING:
725
23
    case TYPE_CHAR:
726
23
    case TYPE_VARCHAR:
727
27
    case TYPE_VARBINARY:
728
27
        return true;
729
2
    case TYPE_TIMEV2:
730
2
    case TYPE_DATE:
731
2
    case TYPE_DATETIME:
732
3
    case TYPE_DATEV2:
733
4
    case TYPE_DATETIMEV2:
734
4
    case TYPE_TIMESTAMPTZ:
735
4
        return true;
736
10
    case TYPE_ARRAY: {
737
10
        const auto* array_type = assert_cast<const DataTypeArray*>(type.get());
738
10
        return is_direct_variant_leaf_type(array_type->get_nested_type());
739
4
    }
740
4
    default:
741
4
        return false;
742
45
    }
743
45
}
744
745
static bool can_direct_read_typed_value(const FieldSchema& field_schema, bool allow_variant_wrapper,
746
41
                                        const std::set<uint64_t>& column_ids) {
747
41
    if (!has_selected_column(field_schema, column_ids)) {
748
0
        return true;
749
0
    }
750
41
    if (allow_variant_wrapper && is_variant_wrapper_field(field_schema, false)) {
751
0
        const int value_idx = find_child_idx(field_schema, "value");
752
0
        const int typed_value_idx = find_child_idx(field_schema, "typed_value");
753
0
        return (value_idx < 0 ||
754
0
                !has_selected_column(field_schema.children[value_idx], column_ids)) &&
755
0
               typed_value_idx >= 0 &&
756
0
               can_direct_read_typed_value(field_schema.children[typed_value_idx], false,
757
0
                                           column_ids);
758
0
    }
759
760
41
    const auto& type = remove_nullable(field_schema.data_type);
761
41
    if (type->get_primitive_type() == TYPE_STRUCT) {
762
16
        return std::all_of(field_schema.children.begin(), field_schema.children.end(),
763
23
                           [&column_ids](const FieldSchema& child) {
764
23
                               return can_direct_read_typed_value(child, true, column_ids);
765
23
                           });
766
16
    }
767
25
    return is_direct_variant_leaf_type(field_schema.data_type);
768
41
}
769
770
static bool has_selected_direct_typed_leaf(const FieldSchema& field_schema,
771
                                           bool allow_variant_wrapper,
772
11
                                           const std::set<uint64_t>& column_ids) {
773
11
    if (!has_selected_column(field_schema, column_ids)) {
774
2
        return false;
775
2
    }
776
9
    if (allow_variant_wrapper && is_variant_wrapper_field(field_schema, false)) {
777
0
        const int typed_value_idx = find_child_idx(field_schema, "typed_value");
778
0
        DCHECK_GE(typed_value_idx, 0);
779
0
        return has_selected_direct_typed_leaf(field_schema.children[typed_value_idx], false,
780
0
                                              column_ids);
781
0
    }
782
783
9
    const auto& type = remove_nullable(field_schema.data_type);
784
9
    if (type->get_primitive_type() == TYPE_STRUCT) {
785
5
        return std::any_of(field_schema.children.begin(), field_schema.children.end(),
786
5
                           [&column_ids](const FieldSchema& child) {
787
5
                               return has_selected_direct_typed_leaf(child, true, column_ids);
788
5
                           });
789
5
    }
790
4
    return is_direct_variant_leaf_type(field_schema.data_type);
791
9
}
792
793
static bool can_use_direct_typed_only_value(const FieldSchema& variant_field,
794
8
                                            const std::set<uint64_t>& column_ids) {
795
8
    const int value_idx = find_child_idx(variant_field, "value");
796
8
    const int typed_value_idx = find_child_idx(variant_field, "typed_value");
797
8
    return (value_idx < 0 || !has_selected_column(variant_field.children[value_idx], column_ids)) &&
798
8
           typed_value_idx >= 0 &&
799
8
           has_selected_direct_typed_leaf(variant_field.children[typed_value_idx], false,
800
6
                                          column_ids) &&
801
8
           can_direct_read_typed_value(variant_field.children[typed_value_idx], false, column_ids);
802
8
}
803
804
3
static DataTypePtr make_variant_struct_reader_type(const FieldSchema& field) {
805
3
    DataTypes child_types;
806
3
    Strings child_names;
807
3
    child_types.reserve(field.children.size());
808
3
    child_names.reserve(field.children.size());
809
6
    for (const auto& child : field.children) {
810
6
        child_types.push_back(make_nullable(child.data_type));
811
6
        child_names.push_back(child.name);
812
6
    }
813
3
    return std::make_shared<DataTypeStruct>(child_types, child_names);
814
3
}
815
816
static ColumnPtr make_variant_struct_read_column(const FieldSchema& field,
817
2
                                                 const DataTypePtr& variant_struct_type) {
818
2
    if (field.data_type->is_nullable()) {
819
1
        return make_nullable(variant_struct_type)->create_column();
820
1
    }
821
1
    return variant_struct_type->create_column();
822
2
}
823
824
28
static void fill_variant_field_info(FieldWithDataType* value) {
825
28
    FieldInfo info;
826
28
    variant_util::get_field_info(value->field, &info);
827
28
    DCHECK_LE(info.num_dimensions, std::numeric_limits<uint8_t>::max());
828
28
    value->base_scalar_type_id = info.scalar_type_id;
829
28
    value->num_dimensions = static_cast<uint8_t>(info.num_dimensions);
830
28
}
831
832
20
static void fill_variant_leaf_type_info(const DataTypePtr& data_type, FieldWithDataType* value) {
833
20
    auto leaf_type = remove_nullable(data_type);
834
20
    size_t num_dimensions = 0;
835
21
    while (leaf_type->get_primitive_type() == TYPE_ARRAY) {
836
1
        ++num_dimensions;
837
1
        leaf_type = remove_nullable(
838
1
                assert_cast<const DataTypeArray*>(leaf_type.get())->get_nested_type());
839
1
    }
840
20
    DCHECK_LE(num_dimensions, std::numeric_limits<uint8_t>::max());
841
20
    if (value->base_scalar_type_id == INVALID_TYPE) {
842
0
        value->base_scalar_type_id = leaf_type->get_primitive_type();
843
0
    }
844
20
    if (value->num_dimensions == 0 && num_dimensions > 0) {
845
0
        value->num_dimensions = static_cast<uint8_t>(num_dimensions);
846
0
    }
847
20
    if (is_decimal(leaf_type->get_primitive_type())) {
848
1
        value->precision = leaf_type->get_precision();
849
1
        value->scale = leaf_type->get_scale();
850
1
    }
851
20
}
852
853
3
static Status fill_floating_point_variant_field(const Field& field, FieldWithDataType* value) {
854
3
    value->field = field;
855
3
    fill_variant_field_info(value);
856
3
    return Status::OK();
857
3
}
858
859
static Status fill_floating_point_variant_field(PrimitiveType type, const Field& field,
860
3
                                                FieldWithDataType* value) {
861
3
    DORIS_CHECK(type == TYPE_FLOAT || type == TYPE_DOUBLE);
862
3
    return fill_floating_point_variant_field(field, value);
863
3
}
864
865
102
static bool is_uuid_typed_value_field(const FieldSchema& field_schema) {
866
102
    return field_schema.parquet_schema.__isset.logicalType &&
867
102
           field_schema.parquet_schema.logicalType.__isset.UUID;
868
102
}
869
870
26
static bool contains_uuid_typed_value_field(const FieldSchema& field_schema) {
871
26
    return is_uuid_typed_value_field(field_schema) ||
872
26
           std::any_of(
873
24
                   field_schema.children.begin(), field_schema.children.end(),
874
24
                   [](const FieldSchema& child) { return contains_uuid_typed_value_field(child); });
875
26
}
876
877
3
static Status uuid_field_to_string(const Field& field, std::string* uuid) {
878
3
    StringRef bytes;
879
3
    switch (field.get_type()) {
880
0
    case TYPE_STRING:
881
0
        bytes = StringRef(field.get<TYPE_STRING>());
882
0
        break;
883
0
    case TYPE_CHAR:
884
0
        bytes = StringRef(field.get<TYPE_CHAR>());
885
0
        break;
886
0
    case TYPE_VARCHAR:
887
0
        bytes = StringRef(field.get<TYPE_VARCHAR>());
888
0
        break;
889
3
    case TYPE_VARBINARY:
890
3
        bytes = field.get<TYPE_VARBINARY>().to_string_ref();
891
3
        break;
892
0
    default:
893
0
        return Status::Corruption("Parquet VARIANT UUID typed_value has unexpected Doris type {}",
894
0
                                  field.get_type_name());
895
3
    }
896
3
    if (bytes.size != 16) {
897
0
        return Status::Corruption("Parquet VARIANT UUID typed_value has invalid length {}",
898
0
                                  bytes.size);
899
0
    }
900
3
    *uuid = parquet::format_variant_uuid(reinterpret_cast<const uint8_t*>(bytes.data));
901
3
    return Status::OK();
902
3
}
903
904
3
static Status fill_uuid_variant_field(const Field& field, FieldWithDataType* value) {
905
3
    std::string uuid;
906
3
    RETURN_IF_ERROR(uuid_field_to_string(field, &uuid));
907
3
    value->field = Field::create_field<TYPE_STRING>(std::move(uuid));
908
3
    value->base_scalar_type_id = TYPE_STRING;
909
3
    return Status::OK();
910
3
}
911
912
static Status fill_temporal_variant_field(PrimitiveType type, const Field& field,
913
1
                                          FieldWithDataType* value) {
914
1
    switch (type) {
915
1
    case TYPE_TIMEV2:
916
1
        value->field = Field::create_field<TYPE_BIGINT>(
917
1
                static_cast<int64_t>(std::llround(field.get<TYPE_TIMEV2>())));
918
1
        value->base_scalar_type_id = TYPE_BIGINT;
919
1
        return Status::OK();
920
0
    case TYPE_DATE:
921
0
        value->field = Field::create_field<TYPE_BIGINT>(variant_date_value(field.get<TYPE_DATE>()));
922
0
        value->base_scalar_type_id = TYPE_BIGINT;
923
0
        return Status::OK();
924
0
    case TYPE_DATETIME:
925
0
        value->field = Field::create_field<TYPE_BIGINT>(
926
0
                variant_datetime_value(field.get<TYPE_DATETIME>()));
927
0
        value->base_scalar_type_id = TYPE_BIGINT;
928
0
        return Status::OK();
929
0
    case TYPE_DATEV2:
930
0
        value->field =
931
0
                Field::create_field<TYPE_BIGINT>(variant_date_value(field.get<TYPE_DATEV2>()));
932
0
        value->base_scalar_type_id = TYPE_BIGINT;
933
0
        return Status::OK();
934
0
    case TYPE_DATETIMEV2:
935
0
        value->field = Field::create_field<TYPE_BIGINT>(
936
0
                variant_datetime_value(field.get<TYPE_DATETIMEV2>()));
937
0
        value->base_scalar_type_id = TYPE_BIGINT;
938
0
        return Status::OK();
939
0
    case TYPE_TIMESTAMPTZ:
940
0
        value->field = Field::create_field<TYPE_BIGINT>(
941
0
                variant_datetime_value(field.get<TYPE_TIMESTAMPTZ>()));
942
0
        value->base_scalar_type_id = TYPE_BIGINT;
943
0
        return Status::OK();
944
0
    default:
945
0
        DORIS_CHECK(false);
946
0
        return Status::OK();
947
1
    }
948
1
}
949
950
1
static uint8_t direct_array_dimensions(const DataTypePtr& data_type) {
951
1
    uint8_t num_dimensions = 0;
952
1
    auto type = remove_nullable(data_type);
953
2
    while (type->get_primitive_type() == TYPE_ARRAY) {
954
1
        ++num_dimensions;
955
1
        type = remove_nullable(assert_cast<const DataTypeArray*>(type.get())->get_nested_type());
956
1
    }
957
1
    return num_dimensions;
958
1
}
959
960
1
static PrimitiveType direct_array_base_scalar_type(const FieldSchema& field_schema) {
961
1
    auto leaf_type = remove_nullable(direct_variant_leaf_type(field_schema));
962
2
    while (leaf_type->get_primitive_type() == TYPE_ARRAY) {
963
1
        leaf_type = remove_nullable(
964
1
                assert_cast<const DataTypeArray*>(leaf_type.get())->get_nested_type());
965
1
    }
966
1
    return leaf_type->get_primitive_type();
967
1
}
968
969
static Status convert_direct_array_value(const FieldSchema& field_schema, const Field& field,
970
14
                                         Field* converted) {
971
14
    if (field.is_null()) {
972
3
        *converted = Field();
973
3
        return Status::OK();
974
3
    }
975
976
11
    const auto& type = remove_nullable(field_schema.data_type);
977
11
    if (type->get_primitive_type() == TYPE_ARRAY) {
978
5
        if (field_schema.children.empty()) {
979
0
            return Status::Corruption("Parquet VARIANT array typed_value has no element schema");
980
0
        }
981
5
        Array converted_elements;
982
5
        const auto& elements = field.get<TYPE_ARRAY>();
983
5
        converted_elements.reserve(elements.size());
984
9
        for (const auto& element : elements) {
985
9
            Field converted_element;
986
9
            RETURN_IF_ERROR(convert_direct_array_value(field_schema.children[0], element,
987
9
                                                       &converted_element));
988
9
            converted_elements.push_back(std::move(converted_element));
989
9
        }
990
5
        *converted = Field::create_field<TYPE_ARRAY>(std::move(converted_elements));
991
5
        return Status::OK();
992
5
    }
993
994
6
    if (is_uuid_typed_value_field(field_schema)) {
995
2
        FieldWithDataType value;
996
2
        RETURN_IF_ERROR(fill_uuid_variant_field(field, &value));
997
2
        *converted = std::move(value.field);
998
2
        return Status::OK();
999
2
    }
1000
4
    if (is_temporal_variant_leaf_type(type->get_primitive_type())) {
1001
1
        FieldWithDataType value;
1002
1
        RETURN_IF_ERROR(fill_temporal_variant_field(type->get_primitive_type(), field, &value));
1003
1
        *converted = std::move(value.field);
1004
1
        return Status::OK();
1005
1
    }
1006
3
    if (is_floating_point_variant_leaf_type(type->get_primitive_type())) {
1007
3
        FieldWithDataType value;
1008
3
        RETURN_IF_ERROR(
1009
3
                fill_floating_point_variant_field(type->get_primitive_type(), field, &value));
1010
3
        *converted = std::move(value.field);
1011
3
        return Status::OK();
1012
3
    }
1013
1014
0
    *converted = field;
1015
0
    return Status::OK();
1016
3
}
1017
1018
static Status insert_direct_typed_array_leaf_range(
1019
        const FieldSchema& field_schema, const IColumn& column, size_t start, size_t rows,
1020
4
        const std::vector<const NullMap*>& parent_null_maps, IColumn* variant_leaf) {
1021
4
    auto& nullable_leaf = assert_cast<ColumnNullable&>(*variant_leaf);
1022
4
    const IColumn* value_column = &column;
1023
4
    const NullMap* leaf_null_map = nullptr;
1024
4
    if (const auto* nullable_column = check_and_get_column<ColumnNullable>(&column)) {
1025
0
        value_column = &nullable_column->get_nested_column();
1026
0
        leaf_null_map = &nullable_column->get_null_map_data();
1027
0
    }
1028
1029
4
    auto& data = nullable_leaf.get_nested_column();
1030
4
    auto& null_map = nullable_leaf.get_null_map_data();
1031
4
    null_map.reserve(null_map.size() + rows);
1032
11
    for (size_t i = 0; i < rows; ++i) {
1033
7
        const size_t row = start + i;
1034
7
        const bool leaf_is_null = leaf_null_map != nullptr && (*leaf_null_map)[row];
1035
7
        const bool is_null = leaf_is_null || has_direct_typed_parent_null(parent_null_maps, row);
1036
7
        if (is_null) {
1037
3
            data.insert_default();
1038
3
            null_map.push_back(1);
1039
3
            continue;
1040
3
        }
1041
1042
4
        Field field;
1043
4
        value_column->get(row, field);
1044
4
        Field converted;
1045
4
        RETURN_IF_ERROR(convert_direct_array_value(field_schema, field, &converted));
1046
4
        data.insert(converted);
1047
4
        null_map.push_back(0);
1048
4
    }
1049
4
    return Status::OK();
1050
4
}
1051
1052
static Status fill_direct_array_variant_field(const FieldSchema& field_schema, const Field& field,
1053
1
                                              FieldWithDataType* value, bool* present) {
1054
1
    if (field.is_null()) {
1055
0
        *present = false;
1056
0
        return Status::OK();
1057
0
    }
1058
1
    *present = true;
1059
1
    RETURN_IF_ERROR(convert_direct_array_value(field_schema, field, &value->field));
1060
1
    value->base_scalar_type_id = direct_array_base_scalar_type(field_schema);
1061
1
    value->num_dimensions = direct_array_dimensions(field_schema.data_type);
1062
1
    return Status::OK();
1063
1
}
1064
1065
static Status field_to_variant_field(const FieldSchema& field_schema, const Field& field,
1066
21
                                     FieldWithDataType* value, bool* present) {
1067
21
    if (field.is_null()) {
1068
0
        *present = false;
1069
0
        return Status::OK();
1070
0
    }
1071
21
    *present = true;
1072
21
    if (is_uuid_typed_value_field(field_schema)) {
1073
1
        return fill_uuid_variant_field(field, value);
1074
1
    }
1075
20
    const DataTypePtr& type = remove_nullable(field_schema.data_type);
1076
20
    if (is_temporal_variant_leaf_type(type->get_primitive_type())) {
1077
0
        return fill_temporal_variant_field(type->get_primitive_type(), field, value);
1078
0
    }
1079
20
    switch (type->get_primitive_type()) {
1080
0
    case TYPE_BOOLEAN:
1081
0
    case TYPE_TINYINT:
1082
0
    case TYPE_SMALLINT:
1083
9
    case TYPE_INT:
1084
12
    case TYPE_BIGINT:
1085
12
    case TYPE_LARGEINT:
1086
12
    case TYPE_DECIMALV2:
1087
12
    case TYPE_DECIMAL32:
1088
12
    case TYPE_DECIMAL64:
1089
12
    case TYPE_DECIMAL128I:
1090
12
    case TYPE_DECIMAL256:
1091
17
    case TYPE_STRING:
1092
17
    case TYPE_CHAR:
1093
17
    case TYPE_VARCHAR:
1094
19
    case TYPE_VARBINARY:
1095
20
    case TYPE_ARRAY:
1096
20
        value->field = field;
1097
20
        fill_variant_field_info(value);
1098
20
        fill_variant_leaf_type_info(type, value);
1099
20
        return Status::OK();
1100
0
    case TYPE_FLOAT:
1101
0
    case TYPE_DOUBLE:
1102
0
        return fill_floating_point_variant_field(field, value);
1103
0
    default:
1104
0
        return Status::Corruption("Unsupported Parquet VARIANT typed_value Doris type {}",
1105
0
                                  type->get_name());
1106
20
    }
1107
20
}
1108
1109
static Status typed_value_to_json(const FieldSchema& typed_value_field, const Field& field,
1110
                                  const std::string& metadata, std::string* json, bool* present);
1111
static Status typed_map_to_variant_map(const FieldSchema& typed_value_field, const Field& field,
1112
                                       const std::string& metadata, PathInDataBuilder* path,
1113
                                       VariantMap* values, bool* present,
1114
                                       std::deque<std::string>* string_values);
1115
1116
static Status serialize_field_to_json(const DataTypePtr& data_type, const Field& field,
1117
3
                                      std::string* json) {
1118
3
    MutableColumnPtr column = data_type->create_column();
1119
3
    column->insert(field);
1120
1121
3
    auto json_column = ColumnString::create();
1122
3
    VectorBufferWriter writer(*json_column);
1123
3
    auto serde = data_type->get_serde();
1124
3
    DataTypeSerDe::FormatOptions options;
1125
3
    RETURN_IF_ERROR(serde->serialize_one_cell_to_json(*column, 0, writer, options));
1126
3
    writer.commit();
1127
3
    *json = json_column->get_data_at(0).to_string();
1128
3
    return Status::OK();
1129
3
}
1130
1131
static Status scalar_typed_value_to_json(const FieldSchema& field_schema, const Field& field,
1132
3
                                         std::string* json, bool* present) {
1133
3
    FieldWithDataType value;
1134
3
    RETURN_IF_ERROR(field_to_variant_field(field_schema, field, &value, present));
1135
3
    if (!*present) {
1136
0
        return Status::OK();
1137
0
    }
1138
3
    if (value.field.is_null()) {
1139
0
        *json = "null";
1140
0
        return Status::OK();
1141
0
    }
1142
3
    if (!is_uuid_typed_value_field(field_schema) &&
1143
3
        remove_nullable(field_schema.data_type)->get_primitive_type() == TYPE_VARBINARY) {
1144
0
        return Status::NotSupported(
1145
0
                "Parquet VARIANT binary typed_value cannot be serialized to JSON");
1146
0
    }
1147
1148
3
    DataTypePtr json_type;
1149
3
    if (value.base_scalar_type_id != PrimitiveType::INVALID_TYPE) {
1150
3
        json_type = DataTypeFactory::instance().create_data_type(value.base_scalar_type_id, false,
1151
3
                                                                 value.precision, value.scale);
1152
3
    } else {
1153
0
        json_type = remove_nullable(field_schema.data_type);
1154
0
    }
1155
3
    return serialize_field_to_json(json_type, value.field, json);
1156
3
}
1157
1158
static Status resolve_variant_metadata(const FieldSchema& variant_field, const Struct& fields,
1159
                                       const std::string* inherited_metadata, std::string* metadata,
1160
3
                                       bool* has_metadata) {
1161
3
    *has_metadata = false;
1162
3
    if (inherited_metadata != nullptr) {
1163
3
        *metadata = *inherited_metadata;
1164
3
        *has_metadata = true;
1165
3
    }
1166
1167
3
    const int metadata_idx = find_child_idx(variant_field, "metadata");
1168
3
    if (metadata_idx >= 0) {
1169
0
        bool metadata_present = false;
1170
0
        RETURN_IF_ERROR(get_binary_field(fields[metadata_idx], metadata, &metadata_present));
1171
0
        *has_metadata = metadata_present;
1172
0
    }
1173
3
    return Status::OK();
1174
3
}
1175
1176
static Status variant_typed_value_to_json(const FieldSchema& variant_field, const Struct& fields,
1177
                                          const std::string& metadata, std::string* typed_json,
1178
3
                                          bool* typed_present) {
1179
3
    *typed_present = false;
1180
3
    const int typed_value_idx = find_child_idx(variant_field, "typed_value");
1181
3
    if (typed_value_idx < 0) {
1182
0
        return Status::OK();
1183
0
    }
1184
3
    return typed_value_to_json(variant_field.children[typed_value_idx], fields[typed_value_idx],
1185
3
                               metadata, typed_json, typed_present);
1186
3
}
1187
1188
static Status variant_residual_value_to_json(const FieldSchema& variant_field, const Struct& fields,
1189
                                             const std::string& metadata, bool has_metadata,
1190
3
                                             std::string* value_json, bool* value_present) {
1191
3
    *value_present = false;
1192
3
    const int value_idx = find_child_idx(variant_field, "value");
1193
3
    if (value_idx < 0) {
1194
0
        return Status::OK();
1195
0
    }
1196
1197
3
    std::string value;
1198
3
    RETURN_IF_ERROR(get_binary_field(fields[value_idx], &value, value_present));
1199
3
    if (!*value_present) {
1200
0
        return Status::OK();
1201
0
    }
1202
3
    if (!has_metadata) {
1203
0
        return Status::Corruption("Parquet VARIANT value is present without metadata");
1204
0
    }
1205
3
    return parquet::decode_variant_to_json(StringRef(metadata.data(), metadata.size()),
1206
3
                                           StringRef(value.data(), value.size()), value_json);
1207
3
}
1208
1209
static Status merge_variant_value_and_typed_json(const std::string& value_json,
1210
3
                                                 const std::string& typed_json, std::string* json) {
1211
3
    VariantMap value_values;
1212
3
    RETURN_IF_ERROR(parse_json_to_variant_map(value_json, PathInData(), &value_values));
1213
3
    VariantMap typed_values;
1214
3
    RETURN_IF_ERROR(parse_json_to_variant_map(typed_json, PathInData(), &typed_values));
1215
3
    erase_shadowed_empty_object_markers(&value_values, &typed_values);
1216
3
    auto root_value = value_values.find(PathInData());
1217
3
    if (root_value != value_values.end() && !is_empty_object_marker(root_value->second)) {
1218
0
        return Status::Corruption(
1219
0
                "Parquet VARIANT has conflicting non-object value and typed_value");
1220
0
    }
1221
3
    RETURN_IF_ERROR(
1222
3
            check_no_shredded_value_typed_duplicates(value_values, typed_values, PathInData()));
1223
2
    value_values.merge(std::move(typed_values));
1224
2
    return variant_map_to_json(std::move(value_values), json);
1225
3
}
1226
1227
static Status variant_to_json(const FieldSchema& variant_field, const Field& field,
1228
                              const std::string* inherited_metadata, std::string* json,
1229
3
                              bool* present) {
1230
3
    if (field.is_null()) {
1231
0
        *present = false;
1232
0
        return Status::OK();
1233
0
    }
1234
1235
3
    const auto& fields = field.get<TYPE_STRUCT>();
1236
3
    std::string metadata;
1237
3
    bool has_metadata = false;
1238
3
    RETURN_IF_ERROR(resolve_variant_metadata(variant_field, fields, inherited_metadata, &metadata,
1239
3
                                             &has_metadata));
1240
1241
3
    std::string typed_json;
1242
3
    bool typed_present = false;
1243
3
    RETURN_IF_ERROR(variant_typed_value_to_json(variant_field, fields, metadata, &typed_json,
1244
3
                                                &typed_present));
1245
1246
3
    std::string value_json;
1247
3
    bool value_present = false;
1248
3
    RETURN_IF_ERROR(variant_residual_value_to_json(variant_field, fields, metadata, has_metadata,
1249
3
                                                   &value_json, &value_present));
1250
1251
3
    if (value_present && typed_present) {
1252
3
        RETURN_IF_ERROR(merge_variant_value_and_typed_json(value_json, typed_json, json));
1253
2
        *present = true;
1254
2
        return Status::OK();
1255
3
    }
1256
1257
0
    if (typed_present) {
1258
0
        *json = std::move(typed_json);
1259
0
        *present = true;
1260
0
        return Status::OK();
1261
0
    }
1262
0
    if (value_present) {
1263
0
        *json = std::move(value_json);
1264
0
        *present = true;
1265
0
        return Status::OK();
1266
0
    }
1267
1268
0
    *present = false;
1269
0
    return Status::OK();
1270
0
}
1271
1272
static Status shredded_field_to_json(const FieldSchema& field_schema, const Field& field,
1273
                                     const std::string& metadata, std::string* json, bool* present,
1274
3
                                     bool allow_scalar_typed_value_only_wrapper) {
1275
3
    if (is_variant_wrapper_field(field_schema, allow_scalar_typed_value_only_wrapper)) {
1276
0
        return variant_to_json(field_schema, field, &metadata, json, present);
1277
0
    }
1278
3
    if (is_value_only_variant_wrapper_candidate(field_schema)) {
1279
0
        Status st = variant_to_json(field_schema, field, &metadata, json, present);
1280
0
        if (st.ok()) {
1281
0
            return st;
1282
0
        }
1283
0
        if (!st.is<ErrorCode::CORRUPTION>()) {
1284
0
            return st;
1285
0
        }
1286
0
    }
1287
3
    return typed_value_to_json(field_schema, field, metadata, json, present);
1288
3
}
1289
1290
static Status typed_array_to_json(const FieldSchema& typed_value_field, const Field& field,
1291
0
                                  const std::string& metadata, std::string* json, bool* present) {
1292
0
    if (field.is_null()) {
1293
0
        *present = false;
1294
0
        return Status::OK();
1295
0
    }
1296
0
    if (typed_value_field.children.empty()) {
1297
0
        return Status::Corruption("Parquet VARIANT array typed_value has no element schema");
1298
0
    }
1299
1300
0
    const auto& elements = field.get<TYPE_ARRAY>();
1301
0
    const auto& element_schema = typed_value_field.children[0];
1302
0
    json->clear();
1303
0
    json->push_back('[');
1304
0
    for (size_t i = 0; i < elements.size(); ++i) {
1305
0
        if (i != 0) {
1306
0
            json->push_back(',');
1307
0
        }
1308
0
        std::string element_json;
1309
0
        bool element_present = false;
1310
0
        RETURN_IF_ERROR(shredded_field_to_json(element_schema, elements[i], metadata, &element_json,
1311
0
                                               &element_present, true));
1312
0
        if (!element_present) {
1313
0
            if (elements[i].is_null()) {
1314
0
                json->append("null");
1315
0
                continue;
1316
0
            }
1317
0
            return Status::Corruption("Parquet VARIANT array element is missing");
1318
0
        }
1319
0
        json->append(element_json);
1320
0
    }
1321
0
    json->push_back(']');
1322
0
    *present = true;
1323
0
    return Status::OK();
1324
0
}
1325
1326
static Status typed_struct_to_json(const FieldSchema& typed_value_field, const Field& field,
1327
3
                                   const std::string& metadata, std::string* json, bool* present) {
1328
3
    if (field.is_null()) {
1329
0
        *present = false;
1330
0
        return Status::OK();
1331
0
    }
1332
1333
3
    const auto& fields = field.get<TYPE_STRUCT>();
1334
3
    json->clear();
1335
3
    json->push_back('{');
1336
3
    bool first = true;
1337
6
    for (int i = 0; i < typed_value_field.children.size(); ++i) {
1338
3
        std::string child_json;
1339
3
        bool child_present = false;
1340
3
        RETURN_IF_ERROR(shredded_field_to_json(typed_value_field.children[i], fields[i], metadata,
1341
3
                                               &child_json, &child_present, false));
1342
3
        if (!child_present) {
1343
0
            continue;
1344
0
        }
1345
3
        if (!first) {
1346
0
            json->push_back(',');
1347
0
        }
1348
3
        append_json_string(typed_value_field.children[i].name, json);
1349
3
        json->push_back(':');
1350
3
        json->append(child_json);
1351
3
        first = false;
1352
3
    }
1353
3
    json->push_back('}');
1354
3
    *present = true;
1355
3
    return Status::OK();
1356
3
}
1357
1358
static Status typed_value_to_json(const FieldSchema& typed_value_field, const Field& field,
1359
6
                                  const std::string& metadata, std::string* json, bool* present) {
1360
6
    const DataTypePtr& typed_type = remove_nullable(typed_value_field.data_type);
1361
6
    switch (typed_type->get_primitive_type()) {
1362
3
    case TYPE_STRUCT:
1363
3
        return typed_struct_to_json(typed_value_field, field, metadata, json, present);
1364
0
    case TYPE_ARRAY:
1365
0
        return typed_array_to_json(typed_value_field, field, metadata, json, present);
1366
0
    case TYPE_MAP: {
1367
0
        VariantMap values;
1368
0
        PathInDataBuilder path;
1369
0
        std::deque<std::string> string_values;
1370
0
        RETURN_IF_ERROR(typed_map_to_variant_map(typed_value_field, field, metadata, &path, &values,
1371
0
                                                 present, &string_values));
1372
0
        if (!*present) {
1373
0
            return Status::OK();
1374
0
        }
1375
0
        return variant_map_to_json(std::move(values), json);
1376
0
    }
1377
3
    default:
1378
3
        return scalar_typed_value_to_json(typed_value_field, field, json, present);
1379
6
    }
1380
6
}
1381
1382
static Status typed_value_to_variant_map(const FieldSchema& typed_value_field, const Field& field,
1383
                                         const std::string& metadata, PathInDataBuilder* path,
1384
                                         VariantMap* values, bool* present,
1385
                                         std::deque<std::string>* string_values);
1386
1387
static Status variant_to_variant_map(const FieldSchema& variant_field, const Field& field,
1388
                                     const std::string* inherited_metadata, PathInDataBuilder* path,
1389
                                     VariantMap* values, bool* present,
1390
29
                                     std::deque<std::string>* string_values) {
1391
29
    if (field.is_null()) {
1392
0
        *present = false;
1393
0
        return Status::OK();
1394
0
    }
1395
29
    const auto& fields = field.get<TYPE_STRUCT>();
1396
29
    const int metadata_idx = find_child_idx(variant_field, "metadata");
1397
29
    const int value_idx = find_child_idx(variant_field, "value");
1398
29
    const int typed_value_idx = find_child_idx(variant_field, "typed_value");
1399
1400
29
    std::string metadata;
1401
29
    bool has_metadata = false;
1402
29
    if (inherited_metadata != nullptr) {
1403
3
        metadata = *inherited_metadata;
1404
3
        has_metadata = true;
1405
3
    }
1406
29
    if (metadata_idx >= 0) {
1407
26
        bool metadata_present = false;
1408
26
        RETURN_IF_ERROR(get_binary_field(fields[metadata_idx], &metadata, &metadata_present));
1409
26
        has_metadata = metadata_present;
1410
26
    }
1411
1412
29
    VariantMap value_values;
1413
29
    bool value_present = false;
1414
29
    const PathInData current_path = path->build();
1415
29
    if (value_idx >= 0) {
1416
18
        std::string value;
1417
18
        RETURN_IF_ERROR(get_binary_field(fields[value_idx], &value, &value_present));
1418
18
        if (value_present) {
1419
10
            if (!has_metadata) {
1420
0
                return Status::Corruption("Parquet VARIANT value is present without metadata");
1421
0
            }
1422
10
            RETURN_IF_ERROR(parquet::decode_variant_to_variant_map(
1423
10
                    StringRef(metadata.data(), metadata.size()),
1424
10
                    StringRef(value.data(), value.size()), current_path, &value_values,
1425
10
                    string_values));
1426
10
        }
1427
18
    }
1428
1429
29
    VariantMap typed_values;
1430
29
    bool typed_present = false;
1431
29
    if (typed_value_idx >= 0) {
1432
24
        RETURN_IF_ERROR(typed_value_to_variant_map(variant_field.children[typed_value_idx],
1433
24
                                                   fields[typed_value_idx], metadata, path,
1434
24
                                                   &typed_values, &typed_present, string_values));
1435
24
    }
1436
1437
28
    erase_shadowed_empty_object_markers(&value_values, &typed_values);
1438
28
    auto current_value = value_values.find(current_path);
1439
28
    if (value_present && typed_present && current_value != value_values.end() &&
1440
28
        !is_empty_object_marker(current_value->second)) {
1441
0
        return Status::Corruption(
1442
0
                "Parquet VARIANT has conflicting non-object value and typed_value");
1443
0
    }
1444
28
    RETURN_IF_ERROR(
1445
28
            check_no_shredded_value_typed_duplicates(value_values, typed_values, current_path));
1446
27
    values->merge(std::move(value_values));
1447
27
    values->merge(std::move(typed_values));
1448
27
    *present = value_present || typed_present;
1449
27
    return Status::OK();
1450
28
}
1451
1452
static Status shredded_field_to_variant_map(const FieldSchema& field_schema, const Field& field,
1453
                                            const std::string& metadata, PathInDataBuilder* path,
1454
                                            VariantMap* values, bool* present,
1455
33
                                            std::deque<std::string>* string_values) {
1456
33
    if (is_variant_wrapper_field(field_schema, false)) {
1457
2
        return variant_to_variant_map(field_schema, field, &metadata, path, values, present,
1458
2
                                      string_values);
1459
2
    }
1460
31
    if (is_value_only_variant_wrapper_candidate(field_schema)) {
1461
1
        Status st = variant_to_variant_map(field_schema, field, &metadata, path, values, present,
1462
1
                                           string_values);
1463
1
        if (st.ok()) {
1464
1
            return st;
1465
1
        }
1466
0
        if (!st.is<ErrorCode::CORRUPTION>()) {
1467
0
            return st;
1468
0
        }
1469
0
    }
1470
30
    return typed_value_to_variant_map(field_schema, field, metadata, path, values, present,
1471
30
                                      string_values);
1472
31
}
1473
1474
static Status append_typed_field_to_variant_map(const FieldSchema& typed_value_field,
1475
                                                const Field& field, PathInDataBuilder* path,
1476
18
                                                VariantMap* values, bool* present) {
1477
18
    FieldWithDataType value;
1478
18
    RETURN_IF_ERROR(field_to_variant_field(typed_value_field, field, &value, present));
1479
18
    if (*present) {
1480
18
        (*values)[path->build()] = std::move(value);
1481
18
    }
1482
18
    return Status::OK();
1483
18
}
1484
1485
3
static void move_variant_map_to_field(VariantMap&& element_values, FieldWithDataType* value) {
1486
3
    if (element_values.size() == 1 && element_values.begin()->first.empty()) {
1487
1
        *value = std::move(element_values.begin()->second);
1488
1
        return;
1489
1
    }
1490
2
    value->field = Field::create_field<TYPE_VARIANT>(std::move(element_values));
1491
2
    fill_variant_field_info(value);
1492
2
}
1493
1494
static Status typed_array_to_variant_map(const FieldSchema& typed_value_field, const Field& field,
1495
                                         const std::string& metadata, PathInDataBuilder* path,
1496
                                         VariantMap* values, bool* present,
1497
6
                                         std::deque<std::string>* string_values) {
1498
6
    if ((contains_uuid_typed_value_field(typed_value_field) ||
1499
6
         contains_temporal_variant_leaf_type(typed_value_field.data_type) ||
1500
6
         contains_floating_point_variant_leaf_type(typed_value_field.data_type)) &&
1501
6
        is_direct_variant_leaf_type(typed_value_field.data_type)) {
1502
1
        FieldWithDataType value;
1503
1
        RETURN_IF_ERROR(fill_direct_array_variant_field(typed_value_field, field, &value, present));
1504
1
        if (*present) {
1505
1
            (*values)[path->build()] = std::move(value);
1506
1
        }
1507
1
        return Status::OK();
1508
1
    }
1509
5
    if (is_direct_variant_leaf_type(typed_value_field.data_type)) {
1510
1
        return append_typed_field_to_variant_map(typed_value_field, field, path, values, present);
1511
1
    }
1512
1513
4
    if (field.is_null()) {
1514
0
        *present = false;
1515
0
        return Status::OK();
1516
0
    }
1517
4
    if (typed_value_field.children.empty()) {
1518
0
        return Status::Corruption("Parquet VARIANT array typed_value has no element schema");
1519
0
    }
1520
1521
4
    const auto& elements = field.get<TYPE_ARRAY>();
1522
4
    const auto& element_schema = typed_value_field.children[0];
1523
4
    Array array;
1524
4
    array.reserve(elements.size());
1525
5
    for (const auto& element : elements) {
1526
5
        VariantMap element_values;
1527
5
        bool element_present = false;
1528
5
        PathInDataBuilder element_path;
1529
5
        RETURN_IF_ERROR(shredded_field_to_variant_map(element_schema, element, metadata,
1530
5
                                                      &element_path, &element_values,
1531
5
                                                      &element_present, string_values));
1532
5
        if (!element_present) {
1533
2
            if (element.is_null()) {
1534
1
                array.push_back(Field());
1535
1
                continue;
1536
1
            }
1537
1
            return Status::Corruption("Parquet VARIANT array element is missing");
1538
2
        }
1539
1540
3
        FieldWithDataType element_value;
1541
3
        move_variant_map_to_field(std::move(element_values), &element_value);
1542
3
        array.push_back(std::move(element_value.field));
1543
3
    }
1544
1545
3
    FieldWithDataType value;
1546
3
    const size_t elements_count = array.size();
1547
3
    value.field = Field::create_field<TYPE_ARRAY>(std::move(array));
1548
3
    fill_variant_field_info(&value);
1549
3
    if (value.base_scalar_type_id == INVALID_TYPE) {
1550
1
        RETURN_IF_ERROR(make_jsonb_field(make_null_array_json(elements_count), &value));
1551
1
    }
1552
3
    (*values)[path->build()] = std::move(value);
1553
3
    *present = true;
1554
3
    return Status::OK();
1555
3
}
1556
1557
static Status typed_map_to_variant_map(const FieldSchema& typed_value_field, const Field& field,
1558
                                       const std::string& metadata, PathInDataBuilder* path,
1559
                                       VariantMap* values, bool* present,
1560
1
                                       std::deque<std::string>* string_values) {
1561
1
    if (field.is_null()) {
1562
0
        *present = false;
1563
0
        return Status::OK();
1564
0
    }
1565
1
    if (typed_value_field.children.size() != 2) {
1566
0
        return Status::Corruption("Parquet VARIANT map typed_value has {} child fields",
1567
0
                                  typed_value_field.children.size());
1568
0
    }
1569
1570
1
    const auto& map = field.get<TYPE_MAP>();
1571
1
    DORIS_CHECK(map.size() == 2);
1572
1
    DORIS_CHECK(map[0].get_type() == TYPE_ARRAY);
1573
1
    DORIS_CHECK(map[1].get_type() == TYPE_ARRAY);
1574
1
    const auto& keys = map[0].get<TYPE_ARRAY>();
1575
1
    const auto& value_fields = map[1].get<TYPE_ARRAY>();
1576
1
    DORIS_CHECK(keys.size() == value_fields.size());
1577
1578
1
    if (keys.empty()) {
1579
0
        RETURN_IF_ERROR(insert_empty_object_marker(path->build(), values));
1580
0
        *present = true;
1581
0
        return Status::OK();
1582
0
    }
1583
1584
1
    std::set<std::string> object_keys;
1585
1
    const FieldSchema& key_field = typed_value_field.children[0];
1586
1
    const FieldSchema& value_field = typed_value_field.children[1];
1587
3
    for (size_t i = 0; i < keys.size(); ++i) {
1588
2
        std::string key;
1589
2
        bool key_present = false;
1590
2
        RETURN_IF_ERROR(get_binary_field(keys[i], &key, &key_present));
1591
2
        if (!key_present) {
1592
0
            return Status::Corruption("Parquet VARIANT map typed_value has null key {}",
1593
0
                                      key_field.name);
1594
0
        }
1595
2
        if (!object_keys.insert(key).second) {
1596
0
            return Status::Corruption("Parquet VARIANT map typed_value has duplicate key {}", key);
1597
0
        }
1598
1599
2
        path->append(key, false);
1600
2
        bool value_present = false;
1601
2
        Status st = shredded_field_to_variant_map(value_field, value_fields[i], metadata, path,
1602
2
                                                  values, &value_present, string_values);
1603
2
        if (!st.ok()) {
1604
0
            path->pop_back();
1605
0
            return st;
1606
0
        }
1607
2
        if (!value_present) {
1608
0
            (*values)[path->build()] = FieldWithDataType {.field = Field()};
1609
0
        }
1610
2
        path->pop_back();
1611
2
    }
1612
1
    *present = true;
1613
1
    return Status::OK();
1614
1
}
1615
1616
static Status typed_value_to_variant_map(const FieldSchema& typed_value_field, const Field& field,
1617
                                         const std::string& metadata, PathInDataBuilder* path,
1618
                                         VariantMap* values, bool* present,
1619
54
                                         std::deque<std::string>* string_values) {
1620
54
    if (field.is_null()) {
1621
7
        *present = false;
1622
7
        return Status::OK();
1623
7
    }
1624
47
    const DataTypePtr& typed_type = remove_nullable(typed_value_field.data_type);
1625
47
    if (typed_type->get_primitive_type() == TYPE_STRUCT) {
1626
23
        const auto& fields = field.get<TYPE_STRUCT>();
1627
23
        *present = true;
1628
23
        bool has_present_child = false;
1629
49
        for (int i = 0; i < typed_value_field.children.size(); ++i) {
1630
26
            path->append(typed_value_field.children[i].name, false);
1631
26
            bool child_present = false;
1632
26
            RETURN_IF_ERROR(shredded_field_to_variant_map(typed_value_field.children[i], fields[i],
1633
26
                                                          metadata, path, values, &child_present,
1634
26
                                                          string_values));
1635
26
            has_present_child |= child_present;
1636
26
            path->pop_back();
1637
26
        }
1638
23
        if (!has_present_child) {
1639
3
            RETURN_IF_ERROR(insert_empty_object_marker(path->build(), values));
1640
3
        }
1641
23
        return Status::OK();
1642
23
    }
1643
24
    if (typed_type->get_primitive_type() == TYPE_ARRAY) {
1644
6
        return typed_array_to_variant_map(typed_value_field, field, metadata, path, values, present,
1645
6
                                          string_values);
1646
6
    }
1647
18
    if (typed_type->get_primitive_type() == TYPE_MAP) {
1648
1
        return typed_map_to_variant_map(typed_value_field, field, metadata, path, values, present,
1649
1
                                        string_values);
1650
1
    }
1651
1652
17
    return append_typed_field_to_variant_map(typed_value_field, field, path, values, present);
1653
18
}
1654
1655
static bool direct_typed_value_present_at(const FieldSchema& field_schema, const IColumn& column,
1656
                                          size_t row, bool allow_variant_wrapper,
1657
                                          const std::set<uint64_t>& column_ids,
1658
15
                                          const std::vector<const NullMap*>& parent_null_maps) {
1659
15
    if (!has_selected_column(field_schema, column_ids) ||
1660
15
        has_direct_typed_parent_null(parent_null_maps, row)) {
1661
0
        return false;
1662
0
    }
1663
1664
15
    const IColumn* value_column = &column;
1665
15
    if (const auto* nullable_column = check_and_get_column<ColumnNullable>(&column)) {
1666
14
        const auto& null_map = nullable_column->get_null_map_data();
1667
14
        DCHECK_LT(row, null_map.size());
1668
14
        if (null_map[row]) {
1669
3
            return false;
1670
3
        }
1671
11
        value_column = &nullable_column->get_nested_column();
1672
11
    }
1673
1674
12
    if (allow_variant_wrapper && is_variant_wrapper_field(field_schema, false)) {
1675
0
        const int typed_value_idx = find_child_idx(field_schema, "typed_value");
1676
0
        DCHECK_GE(typed_value_idx, 0);
1677
0
        const auto& typed_struct = assert_cast<const ColumnStruct&>(*value_column);
1678
0
        return direct_typed_value_present_at(field_schema.children[typed_value_idx],
1679
0
                                             typed_struct.get_column(typed_value_idx), row, false,
1680
0
                                             column_ids, parent_null_maps);
1681
0
    }
1682
1683
12
    return true;
1684
12
}
1685
1686
static Status append_direct_typed_empty_object_markers(
1687
        const FieldSchema& field_schema, const ColumnStruct& struct_column, size_t start,
1688
        size_t rows, PathInDataBuilder* path, ColumnVariant* batch,
1689
14
        const std::set<uint64_t>& column_ids, const std::vector<const NullMap*>& parent_null_maps) {
1690
14
    DataTypePtr marker_type = make_nullable(std::make_shared<DataTypeJsonb>());
1691
14
    MutableColumnPtr marker_column = marker_type->create_column();
1692
14
    marker_column->insert_default();
1693
14
    bool has_marker = false;
1694
1695
14
    const PathInData marker_path = path->build();
1696
14
    Field empty_object;
1697
14
    RETURN_IF_ERROR(make_empty_object_field(&empty_object));
1698
36
    for (size_t i = 0; i < rows; ++i) {
1699
22
        const size_t row = start + i;
1700
22
        if (has_direct_typed_parent_null(parent_null_maps, row)) {
1701
7
            marker_column->insert_default();
1702
7
            has_marker |= marker_path.empty();
1703
7
            continue;
1704
7
        }
1705
1706
15
        bool has_present_child = false;
1707
18
        for (int child_idx = 0; child_idx < field_schema.children.size(); ++child_idx) {
1708
15
            if (direct_typed_value_present_at(field_schema.children[child_idx],
1709
15
                                              struct_column.get_column(child_idx), row, true,
1710
15
                                              column_ids, parent_null_maps)) {
1711
12
                has_present_child = true;
1712
12
                break;
1713
12
            }
1714
15
        }
1715
1716
15
        if (has_present_child) {
1717
12
            marker_column->insert_default();
1718
12
            continue;
1719
12
        }
1720
3
        marker_column->insert(empty_object);
1721
3
        has_marker = true;
1722
3
    }
1723
1724
14
    if (!has_marker) {
1725
6
        return Status::OK();
1726
6
    }
1727
8
    if (!batch->add_sub_column(marker_path, std::move(marker_column), marker_type)) {
1728
0
        return Status::Corruption("Failed to add Parquet VARIANT empty typed object marker {}",
1729
0
                                  marker_path.get_path());
1730
0
    }
1731
8
    return Status::OK();
1732
8
}
1733
1734
static Status append_direct_typed_column_to_batch(const FieldSchema& field_schema,
1735
                                                  const IColumn& column, size_t start, size_t rows,
1736
                                                  PathInDataBuilder* path, ColumnVariant* batch,
1737
                                                  bool allow_variant_wrapper,
1738
                                                  const std::set<uint64_t>& column_ids,
1739
36
                                                  std::vector<const NullMap*> parent_null_maps) {
1740
36
    if (!has_selected_column(field_schema, column_ids)) {
1741
0
        return Status::OK();
1742
0
    }
1743
1744
36
    const IColumn* value_column = &column;
1745
36
    if (const auto* nullable_column = check_and_get_column<ColumnNullable>(&column)) {
1746
35
        parent_null_maps.push_back(&nullable_column->get_null_map_data());
1747
35
        value_column = &nullable_column->get_nested_column();
1748
35
    }
1749
1750
36
    if (allow_variant_wrapper && is_variant_wrapper_field(field_schema, false)) {
1751
0
        const int typed_value_idx = find_child_idx(field_schema, "typed_value");
1752
0
        DCHECK_GE(typed_value_idx, 0);
1753
0
        const auto& typed_struct = assert_cast<const ColumnStruct&>(*value_column);
1754
0
        return append_direct_typed_column_to_batch(
1755
0
                field_schema.children[typed_value_idx], typed_struct.get_column(typed_value_idx),
1756
0
                start, rows, path, batch, false, column_ids, parent_null_maps);
1757
0
    }
1758
1759
36
    const auto& type = remove_nullable(field_schema.data_type);
1760
36
    if (type->get_primitive_type() == TYPE_STRUCT) {
1761
14
        const auto& struct_column = assert_cast<const ColumnStruct&>(*value_column);
1762
34
        for (int i = 0; i < field_schema.children.size(); ++i) {
1763
20
            if (!has_selected_column(field_schema.children[i], column_ids)) {
1764
0
                continue;
1765
0
            }
1766
20
            path->append(field_schema.children[i].name, false);
1767
20
            RETURN_IF_ERROR(append_direct_typed_column_to_batch(
1768
20
                    field_schema.children[i], struct_column.get_column(i), start, rows, path, batch,
1769
20
                    true, column_ids, parent_null_maps));
1770
20
            path->pop_back();
1771
20
        }
1772
14
        return append_direct_typed_empty_object_markers(field_schema, struct_column, start, rows,
1773
14
                                                        path, batch, column_ids, parent_null_maps);
1774
14
    }
1775
1776
22
    DataTypePtr variant_leaf_type = make_nullable(direct_variant_leaf_type(field_schema));
1777
22
    MutableColumnPtr variant_leaf = variant_leaf_type->create_column();
1778
22
    variant_leaf->insert_default();
1779
22
    if (type->get_primitive_type() == TYPE_ARRAY &&
1780
22
        (contains_uuid_typed_value_field(field_schema) ||
1781
4
         contains_temporal_variant_leaf_type(field_schema.data_type) ||
1782
4
         contains_floating_point_variant_leaf_type(field_schema.data_type))) {
1783
4
        RETURN_IF_ERROR(insert_direct_typed_array_leaf_range(
1784
4
                field_schema, *value_column, start, rows, parent_null_maps, variant_leaf.get()));
1785
18
    } else if (is_uuid_typed_value_field(field_schema)) {
1786
1
        RETURN_IF_ERROR(insert_direct_typed_uuid_leaf_range(*value_column, start, rows,
1787
1
                                                            parent_null_maps, variant_leaf.get()));
1788
17
    } else if (is_temporal_variant_leaf_type(type->get_primitive_type())) {
1789
4
        insert_direct_typed_temporal_leaf_range(type->get_primitive_type(), *value_column, start,
1790
4
                                                rows, parent_null_maps, variant_leaf.get());
1791
13
    } else {
1792
13
        insert_direct_typed_leaf_range(*value_column, start, rows, parent_null_maps,
1793
13
                                       variant_leaf.get());
1794
13
    }
1795
22
    if (!batch->add_sub_column(path->build(), std::move(variant_leaf), variant_leaf_type)) {
1796
0
        return Status::Corruption("Failed to add Parquet VARIANT typed subcolumn {}",
1797
0
                                  path->build().get_path());
1798
0
    }
1799
22
    return Status::OK();
1800
22
}
1801
1802
static Status append_variant_struct_rows_to_column(
1803
        const FieldSchema& field_schema, const ColumnStruct& variant_struct_column,
1804
        const NullMap* struct_null_map, size_t start, size_t rows,
1805
        const std::set<uint64_t>& column_ids, ColumnPtr& doris_column,
1806
2
        ParquetColumnReader::ColumnStatistics* variant_statistics) {
1807
2
    DCHECK_LE(start + rows, variant_struct_column.size());
1808
1809
2
    MutableColumnPtr variant_column_ptr;
1810
2
    NullMap* null_map_ptr = nullptr;
1811
2
    auto mutable_column = doris_column->assume_mutable();
1812
2
    if (doris_column->is_nullable()) {
1813
0
        auto* nullable_column = assert_cast<ColumnNullable*>(mutable_column.get());
1814
0
        variant_column_ptr = nullable_column->get_nested_column_ptr();
1815
0
        null_map_ptr = &nullable_column->get_null_map_data();
1816
2
    } else {
1817
2
        if (field_schema.data_type->is_nullable()) {
1818
0
            return Status::Corruption("Not nullable column has null values in parquet file");
1819
0
        }
1820
2
        variant_column_ptr = std::move(mutable_column);
1821
2
    }
1822
2
    auto* variant_column = assert_cast<ColumnVariant*>(variant_column_ptr.get());
1823
1824
2
    const int typed_value_idx = find_child_idx(field_schema, "typed_value");
1825
2
    if (can_use_direct_typed_only_value(field_schema, column_ids)) {
1826
1
        variant_statistics->variant_direct_typed_value_read_rows += static_cast<int64_t>(rows);
1827
1
        MutableColumnPtr batch_variant_column =
1828
1
                ColumnVariant::create(variant_column->max_subcolumns_count(),
1829
1
                                      variant_column->enable_doc_mode(), rows + 1);
1830
1
        auto* batch_variant = assert_cast<ColumnVariant*>(batch_variant_column.get());
1831
1
        PathInDataBuilder path;
1832
1
        RETURN_IF_ERROR(append_direct_typed_column_to_batch(
1833
1
                field_schema.children[typed_value_idx],
1834
1
                variant_struct_column.get_column(typed_value_idx), start, rows, &path,
1835
1
                batch_variant, false, column_ids, {}));
1836
1
        variant_column->insert_range_from(*batch_variant_column, 1, rows);
1837
1
        if (null_map_ptr != nullptr) {
1838
0
            for (size_t i = start; i < start + rows; ++i) {
1839
0
                null_map_ptr->push_back(struct_null_map != nullptr && (*struct_null_map)[i]);
1840
0
            }
1841
0
        }
1842
1
        return Status::OK();
1843
1
    }
1844
1845
1
    variant_statistics->variant_rowwise_read_rows += static_cast<int64_t>(rows);
1846
3
    for (size_t i = start; i < start + rows; ++i) {
1847
2
        if (struct_null_map != nullptr && (*struct_null_map)[i]) {
1848
0
            if (null_map_ptr == nullptr) {
1849
0
                return Status::Corruption("Not nullable column has null values in parquet file");
1850
0
            }
1851
0
            variant_column->insert_default();
1852
0
            null_map_ptr->push_back(1);
1853
0
            continue;
1854
0
        }
1855
2
        VariantMap values;
1856
2
        bool present = false;
1857
2
        PathInDataBuilder path;
1858
2
        std::deque<std::string> string_values;
1859
2
        RETURN_IF_ERROR(variant_to_variant_map(field_schema, variant_struct_column[i], nullptr,
1860
2
                                               &path, &values, &present, &string_values));
1861
2
        if (!present) {
1862
0
            values[PathInData()] = FieldWithDataType {.field = Field()};
1863
0
        }
1864
2
        RETURN_IF_CATCH_EXCEPTION(
1865
2
                variant_column->insert(Field::create_field<TYPE_VARIANT>(std::move(values))));
1866
2
        if (null_map_ptr != nullptr) {
1867
0
            null_map_ptr->push_back(0);
1868
0
        }
1869
2
    }
1870
1
    return Status::OK();
1871
1
}
1872
1873
#ifdef BE_TEST
1874
namespace parquet_variant_reader_test {
1875
14
bool can_direct_read_typed_value_for_test(const FieldSchema& typed_value_field) {
1876
14
    const std::set<uint64_t> column_ids;
1877
14
    return can_direct_read_typed_value(typed_value_field, false, column_ids);
1878
14
}
1879
1880
bool can_use_direct_typed_only_value_for_test(const FieldSchema& variant_field,
1881
6
                                              const std::set<uint64_t>& column_ids) {
1882
6
    return can_use_direct_typed_only_value(variant_field, column_ids);
1883
6
}
1884
1885
Status append_direct_typed_column_to_batch_for_test(const FieldSchema& typed_value_field,
1886
                                                    const IColumn& typed_value_column, size_t start,
1887
15
                                                    size_t rows, ColumnVariant* batch) {
1888
15
    PathInDataBuilder path;
1889
15
    const std::set<uint64_t> column_ids;
1890
15
    return append_direct_typed_column_to_batch(typed_value_field, typed_value_column, start, rows,
1891
15
                                               &path, batch, false, column_ids, {});
1892
15
}
1893
1894
Status read_variant_row_for_test(const FieldSchema& variant_field, const Field& field,
1895
25
                                 bool output_nullable, Field* result, bool* sql_null) {
1896
25
    if (field.is_null()) {
1897
1
        if (!output_nullable) {
1898
0
            return Status::Corruption("Not nullable column has null values in parquet file");
1899
0
        }
1900
1
        *sql_null = true;
1901
1
        return Status::OK();
1902
1
    }
1903
1904
24
    VariantMap values;
1905
24
    bool present = false;
1906
24
    PathInDataBuilder path;
1907
24
    std::deque<std::string> string_values;
1908
24
    RETURN_IF_ERROR(variant_to_variant_map(variant_field, field, nullptr, &path, &values, &present,
1909
24
                                           &string_values));
1910
22
    if (!present) {
1911
1
        values[PathInData()] = FieldWithDataType {.field = Field()};
1912
1
    }
1913
1914
22
    auto variant_column = ColumnVariant::create(0, false);
1915
22
    RETURN_IF_CATCH_EXCEPTION(
1916
22
            variant_column->insert(Field::create_field<TYPE_VARIANT>(std::move(values))));
1917
22
    variant_column->get(0, *result);
1918
22
    *sql_null = false;
1919
22
    return Status::OK();
1920
22
}
1921
1922
Status read_variant_rows_for_test(const FieldSchema& variant_field, const IColumn& struct_column,
1923
                                  const std::set<uint64_t>& column_ids, ColumnPtr& doris_column,
1924
2
                                  int64_t* direct_rows, int64_t* rowwise_rows) {
1925
2
    const IColumn* struct_source = &struct_column;
1926
2
    const NullMap* struct_null_map = nullptr;
1927
2
    if (const auto* nullable_struct = check_and_get_column<ColumnNullable>(struct_source)) {
1928
0
        struct_null_map = &nullable_struct->get_null_map_data();
1929
0
        struct_source = &nullable_struct->get_nested_column();
1930
0
    }
1931
2
    const auto& variant_struct_column = assert_cast<const ColumnStruct&>(*struct_source);
1932
1933
2
    ParquetColumnReader::ColumnStatistics variant_statistics;
1934
2
    RETURN_IF_ERROR(append_variant_struct_rows_to_column(
1935
2
            variant_field, variant_struct_column, struct_null_map, 0, variant_struct_column.size(),
1936
2
            column_ids, doris_column, &variant_statistics));
1937
2
    *direct_rows = variant_statistics.variant_direct_typed_value_read_rows;
1938
2
    *rowwise_rows = variant_statistics.variant_rowwise_read_rows;
1939
2
    return Status::OK();
1940
2
}
1941
1942
Status variant_to_json_for_test(const FieldSchema& variant_field, const Field& field,
1943
                                const std::string& inherited_metadata, std::string* json,
1944
3
                                bool* present) {
1945
3
    return variant_to_json(variant_field, field, &inherited_metadata, json, present);
1946
3
}
1947
1948
1
bool variant_struct_reader_type_is_nullable_for_test(const FieldSchema& variant_field) {
1949
1
    return make_variant_struct_reader_type(variant_field)->is_nullable();
1950
1
}
1951
1952
2
bool variant_struct_reader_column_is_nullable_for_test(const FieldSchema& variant_field) {
1953
2
    auto variant_struct_type = make_variant_struct_reader_type(variant_field);
1954
2
    return make_variant_struct_read_column(variant_field, variant_struct_type)->is_nullable();
1955
2
}
1956
} // namespace parquet_variant_reader_test
1957
#endif
1958
1959
// Existing recursive factory keeps nested reader wiring and shared state in one dispatch point.
1960
// NOLINTNEXTLINE(readability-function-cognitive-complexity,readability-function-size)
1961
Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field,
1962
                                   const tparquet::RowGroup& row_group, const RowRanges& row_ranges,
1963
                                   const cctz::time_zone* ctz, io::IOContext* io_ctx,
1964
                                   std::unique_ptr<ParquetColumnReader>& reader,
1965
                                   size_t max_buf_size,
1966
                                   std::unordered_map<int, tparquet::OffsetIndex>& col_offsets,
1967
                                   RuntimeState* state, bool in_collection,
1968
                                   const std::set<uint64_t>& column_ids,
1969
132
                                   const std::set<uint64_t>& filter_column_ids) {
1970
132
    size_t total_rows = row_group.num_rows;
1971
132
    const auto field_primitive_type = remove_nullable(field->data_type)->get_primitive_type();
1972
132
    if (field_primitive_type == TYPE_ARRAY) {
1973
2
        const bool offset_only = !column_ids.empty() &&
1974
2
                                 column_ids.contains(field->get_column_id()) &&
1975
2
                                 !column_ids.contains(field->children[0].get_column_id());
1976
2
        std::unique_ptr<ParquetColumnReader> element_reader;
1977
2
        RETURN_IF_ERROR(create(file, field->children.data(), row_group, row_ranges, ctz, io_ctx,
1978
2
                               element_reader, max_buf_size, col_offsets, state, true, column_ids,
1979
2
                               filter_column_ids));
1980
2
        auto array_reader = ArrayColumnReader::create_unique(row_ranges, total_rows, ctz, io_ctx);
1981
2
        element_reader->set_column_in_nested();
1982
2
        RETURN_IF_ERROR(array_reader->init(std::move(element_reader), field, offset_only));
1983
2
        array_reader->_filter_column_ids = filter_column_ids;
1984
2
        reader.reset(array_reader.release());
1985
130
    } else if (field_primitive_type == TYPE_MAP) {
1986
0
        std::unique_ptr<ParquetColumnReader> key_reader;
1987
0
        std::unique_ptr<ParquetColumnReader> value_reader;
1988
1989
0
        if (column_ids.empty() ||
1990
0
            column_ids.find(field->children[0].get_column_id()) != column_ids.end()) {
1991
            // Create key reader
1992
0
            RETURN_IF_ERROR(create(file, field->children.data(), row_group, row_ranges, ctz, io_ctx,
1993
0
                                   key_reader, max_buf_size, col_offsets, state, true, column_ids,
1994
0
                                   filter_column_ids));
1995
0
        } else {
1996
0
            auto skip_reader = std::make_unique<SkipReadingReader>(row_ranges, total_rows, ctz,
1997
0
                                                                   io_ctx, field->children.data());
1998
0
            key_reader = std::move(skip_reader);
1999
0
        }
2000
2001
0
        if (column_ids.empty() ||
2002
0
            column_ids.find(field->children[1].get_column_id()) != column_ids.end()) {
2003
            // Create value reader
2004
0
            RETURN_IF_ERROR(create(file, &field->children[1], row_group, row_ranges, ctz, io_ctx,
2005
0
                                   value_reader, max_buf_size, col_offsets, state, true, column_ids,
2006
0
                                   filter_column_ids));
2007
0
        } else {
2008
0
            auto skip_reader = std::make_unique<SkipReadingReader>(row_ranges, total_rows, ctz,
2009
0
                                                                   io_ctx, &field->children[1]);
2010
0
            value_reader = std::move(skip_reader);
2011
0
        }
2012
2013
0
        auto map_reader = MapColumnReader::create_unique(row_ranges, total_rows, ctz, io_ctx);
2014
0
        key_reader->set_column_in_nested();
2015
0
        value_reader->set_column_in_nested();
2016
0
        RETURN_IF_ERROR(map_reader->init(std::move(key_reader), std::move(value_reader), field));
2017
0
        map_reader->_filter_column_ids = filter_column_ids;
2018
0
        reader.reset(map_reader.release());
2019
130
    } else if (field_primitive_type == TYPE_STRUCT) {
2020
11
        std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> child_readers;
2021
11
        child_readers.reserve(field->children.size());
2022
11
        int non_skip_reader_idx = -1;
2023
37
        for (int i = 0; i < field->children.size(); ++i) {
2024
26
            auto& child = field->children[i];
2025
26
            std::unique_ptr<ParquetColumnReader> child_reader;
2026
26
            if (column_ids.empty() || column_ids.find(child.get_column_id()) != column_ids.end()) {
2027
22
                RETURN_IF_ERROR(create(file, &child, row_group, row_ranges, ctz, io_ctx,
2028
22
                                       child_reader, max_buf_size, col_offsets, state,
2029
22
                                       in_collection, column_ids, filter_column_ids));
2030
22
                child_readers[child.name] = std::move(child_reader);
2031
                // Record the first non-SkippingReader
2032
22
                if (non_skip_reader_idx == -1) {
2033
11
                    non_skip_reader_idx = i;
2034
11
                }
2035
22
            } else {
2036
4
                auto skip_reader = std::make_unique<SkipReadingReader>(row_ranges, total_rows, ctz,
2037
4
                                                                       io_ctx, &child);
2038
4
                skip_reader->_filter_column_ids = filter_column_ids;
2039
4
                child_readers[child.name] = std::move(skip_reader);
2040
4
            }
2041
26
            child_readers[child.name]->set_column_in_nested();
2042
26
        }
2043
        // If all children are SkipReadingReader, force the first child to call create
2044
11
        if (non_skip_reader_idx == -1) {
2045
0
            std::unique_ptr<ParquetColumnReader> child_reader;
2046
0
            RETURN_IF_ERROR(create(file, field->children.data(), row_group, row_ranges, ctz, io_ctx,
2047
0
                                   child_reader, max_buf_size, col_offsets, state, in_collection,
2048
0
                                   column_ids, filter_column_ids));
2049
0
            child_reader->set_column_in_nested();
2050
0
            child_readers[field->children[0].name] = std::move(child_reader);
2051
0
        }
2052
11
        auto struct_reader = StructColumnReader::create_unique(row_ranges, total_rows, ctz, io_ctx);
2053
11
        RETURN_IF_ERROR(struct_reader->init(std::move(child_readers), field));
2054
11
        struct_reader->_filter_column_ids = filter_column_ids;
2055
11
        reader.reset(struct_reader.release());
2056
119
    } else if (field_primitive_type == TYPE_VARIANT) {
2057
0
        auto variant_reader =
2058
0
                VariantColumnReader::create_unique(row_ranges, total_rows, ctz, io_ctx);
2059
0
        RETURN_IF_ERROR(variant_reader->init(file, field, row_group, max_buf_size, col_offsets,
2060
0
                                             state, in_collection, column_ids, filter_column_ids));
2061
0
        variant_reader->_filter_column_ids = filter_column_ids;
2062
0
        reader.reset(variant_reader.release());
2063
119
    } else {
2064
119
        auto physical_index = field->physical_column_index;
2065
119
        const tparquet::OffsetIndex* offset_index =
2066
119
                col_offsets.find(physical_index) != col_offsets.end() ? &col_offsets[physical_index]
2067
119
                                                                      : nullptr;
2068
2069
119
        const tparquet::ColumnChunk& chunk = row_group.columns[physical_index];
2070
119
        if (in_collection) {
2071
3
            if (offset_index == nullptr) {
2072
3
                auto scalar_reader = ScalarColumnReader<true, false>::create_unique(
2073
3
                        row_ranges, total_rows, chunk, offset_index, ctz, io_ctx);
2074
2075
3
                RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size, state));
2076
3
                scalar_reader->_filter_column_ids = filter_column_ids;
2077
3
                reader.reset(scalar_reader.release());
2078
3
            } else {
2079
0
                auto scalar_reader = ScalarColumnReader<true, true>::create_unique(
2080
0
                        row_ranges, total_rows, chunk, offset_index, ctz, io_ctx);
2081
2082
0
                RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size, state));
2083
0
                scalar_reader->_filter_column_ids = filter_column_ids;
2084
0
                reader.reset(scalar_reader.release());
2085
0
            }
2086
116
        } else {
2087
116
            if (offset_index == nullptr) {
2088
116
                auto scalar_reader = ScalarColumnReader<false, false>::create_unique(
2089
116
                        row_ranges, total_rows, chunk, offset_index, ctz, io_ctx);
2090
2091
116
                RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size, state));
2092
116
                scalar_reader->_filter_column_ids = filter_column_ids;
2093
116
                reader.reset(scalar_reader.release());
2094
116
            } else {
2095
0
                auto scalar_reader = ScalarColumnReader<false, true>::create_unique(
2096
0
                        row_ranges, total_rows, chunk, offset_index, ctz, io_ctx);
2097
2098
0
                RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size, state));
2099
0
                scalar_reader->_filter_column_ids = filter_column_ids;
2100
0
                reader.reset(scalar_reader.release());
2101
0
            }
2102
116
        }
2103
119
    }
2104
132
    return Status::OK();
2105
132
}
2106
2107
void ParquetColumnReader::_generate_read_ranges(RowRange page_row_range,
2108
274
                                                RowRanges* result_ranges) const {
2109
274
    result_ranges->add(page_row_range);
2110
274
    RowRanges::ranges_intersection(*result_ranges, _row_ranges, result_ranges);
2111
274
}
2112
2113
template <bool IN_COLLECTION, bool OFFSET_INDEX>
2114
Status ScalarColumnReader<IN_COLLECTION, OFFSET_INDEX>::init(io::FileReaderSPtr file,
2115
                                                             FieldSchema* field,
2116
                                                             size_t max_buf_size,
2117
119
                                                             RuntimeState* state) {
2118
119
    _field_schema = field;
2119
119
    auto& chunk_meta = _chunk_meta.meta_data;
2120
119
    int64_t chunk_start = has_dict_page(chunk_meta) ? chunk_meta.dictionary_page_offset
2121
119
                                                    : chunk_meta.data_page_offset;
2122
119
    size_t chunk_len = chunk_meta.total_compressed_size;
2123
119
    size_t prefetch_buffer_size = std::min(chunk_len, max_buf_size);
2124
119
    if ((typeid_cast<doris::io::TracingFileReader*>(file.get()) &&
2125
119
         typeid_cast<io::MergeRangeFileReader*>(
2126
53
                 ((doris::io::TracingFileReader*)(file.get()))->inner_reader().get())) ||
2127
119
        typeid_cast<io::MergeRangeFileReader*>(file.get())) {
2128
        // turn off prefetch data when using MergeRangeFileReader
2129
119
        prefetch_buffer_size = 0;
2130
119
    }
2131
119
    _stream_reader = std::make_unique<io::BufferedFileStreamReader>(file, chunk_start, chunk_len,
2132
119
                                                                    prefetch_buffer_size);
2133
119
    ParquetPageReadContext ctx(
2134
119
            (state == nullptr) ? true : state->query_options().enable_parquet_file_page_cache);
2135
2136
119
    _chunk_reader = std::make_unique<ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>>(
2137
119
            _stream_reader.get(), &_chunk_meta, field, _offset_index, _total_rows, _io_ctx, ctx);
2138
119
    RETURN_IF_ERROR(_chunk_reader->init());
2139
119
    return Status::OK();
2140
119
}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE4initESt10shared_ptrINS_2io10FileReaderEEPNS_11FieldSchemaEmPNS_12RuntimeStateE
_ZN5doris18ScalarColumnReaderILb1ELb0EE4initESt10shared_ptrINS_2io10FileReaderEEPNS_11FieldSchemaEmPNS_12RuntimeStateE
Line
Count
Source
2117
3
                                                             RuntimeState* state) {
2118
3
    _field_schema = field;
2119
3
    auto& chunk_meta = _chunk_meta.meta_data;
2120
3
    int64_t chunk_start = has_dict_page(chunk_meta) ? chunk_meta.dictionary_page_offset
2121
3
                                                    : chunk_meta.data_page_offset;
2122
3
    size_t chunk_len = chunk_meta.total_compressed_size;
2123
3
    size_t prefetch_buffer_size = std::min(chunk_len, max_buf_size);
2124
3
    if ((typeid_cast<doris::io::TracingFileReader*>(file.get()) &&
2125
3
         typeid_cast<io::MergeRangeFileReader*>(
2126
0
                 ((doris::io::TracingFileReader*)(file.get()))->inner_reader().get())) ||
2127
3
        typeid_cast<io::MergeRangeFileReader*>(file.get())) {
2128
        // turn off prefetch data when using MergeRangeFileReader
2129
3
        prefetch_buffer_size = 0;
2130
3
    }
2131
3
    _stream_reader = std::make_unique<io::BufferedFileStreamReader>(file, chunk_start, chunk_len,
2132
3
                                                                    prefetch_buffer_size);
2133
3
    ParquetPageReadContext ctx(
2134
3
            (state == nullptr) ? true : state->query_options().enable_parquet_file_page_cache);
2135
2136
3
    _chunk_reader = std::make_unique<ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>>(
2137
3
            _stream_reader.get(), &_chunk_meta, field, _offset_index, _total_rows, _io_ctx, ctx);
2138
3
    RETURN_IF_ERROR(_chunk_reader->init());
2139
3
    return Status::OK();
2140
3
}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE4initESt10shared_ptrINS_2io10FileReaderEEPNS_11FieldSchemaEmPNS_12RuntimeStateE
_ZN5doris18ScalarColumnReaderILb0ELb0EE4initESt10shared_ptrINS_2io10FileReaderEEPNS_11FieldSchemaEmPNS_12RuntimeStateE
Line
Count
Source
2117
116
                                                             RuntimeState* state) {
2118
116
    _field_schema = field;
2119
116
    auto& chunk_meta = _chunk_meta.meta_data;
2120
116
    int64_t chunk_start = has_dict_page(chunk_meta) ? chunk_meta.dictionary_page_offset
2121
116
                                                    : chunk_meta.data_page_offset;
2122
116
    size_t chunk_len = chunk_meta.total_compressed_size;
2123
116
    size_t prefetch_buffer_size = std::min(chunk_len, max_buf_size);
2124
116
    if ((typeid_cast<doris::io::TracingFileReader*>(file.get()) &&
2125
116
         typeid_cast<io::MergeRangeFileReader*>(
2126
53
                 ((doris::io::TracingFileReader*)(file.get()))->inner_reader().get())) ||
2127
116
        typeid_cast<io::MergeRangeFileReader*>(file.get())) {
2128
        // turn off prefetch data when using MergeRangeFileReader
2129
116
        prefetch_buffer_size = 0;
2130
116
    }
2131
116
    _stream_reader = std::make_unique<io::BufferedFileStreamReader>(file, chunk_start, chunk_len,
2132
116
                                                                    prefetch_buffer_size);
2133
116
    ParquetPageReadContext ctx(
2134
116
            (state == nullptr) ? true : state->query_options().enable_parquet_file_page_cache);
2135
2136
116
    _chunk_reader = std::make_unique<ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>>(
2137
116
            _stream_reader.get(), &_chunk_meta, field, _offset_index, _total_rows, _io_ctx, ctx);
2138
116
    RETURN_IF_ERROR(_chunk_reader->init());
2139
116
    return Status::OK();
2140
116
}
2141
2142
template <bool IN_COLLECTION, bool OFFSET_INDEX>
2143
244
Status ScalarColumnReader<IN_COLLECTION, OFFSET_INDEX>::_skip_values(size_t num_values) {
2144
244
    if (num_values == 0) {
2145
142
        return Status::OK();
2146
142
    }
2147
102
    if (_chunk_reader->max_def_level() > 0) {
2148
102
        LevelDecoder& def_decoder = _chunk_reader->def_level_decoder();
2149
102
        size_t skipped = 0;
2150
102
        size_t null_size = 0;
2151
102
        size_t nonnull_size = 0;
2152
217
        while (skipped < num_values) {
2153
115
            level_t def_level = -1;
2154
115
            size_t loop_skip = def_decoder.get_next_run(&def_level, num_values - skipped);
2155
115
            if (loop_skip == 0) {
2156
0
                std::stringstream ss;
2157
0
                const auto& bit_reader = def_decoder.rle_decoder().bit_reader();
2158
0
                ss << "def_decoder buffer (hex): ";
2159
0
                for (size_t i = 0; i < bit_reader.max_bytes(); ++i) {
2160
0
                    ss << std::hex << std::setw(2) << std::setfill('0')
2161
0
                       << static_cast<int>(bit_reader.buffer()[i]) << " ";
2162
0
                }
2163
0
                LOG(WARNING) << ss.str();
2164
0
                return Status::InternalError("Failed to decode definition level.");
2165
0
            }
2166
115
            if (def_level < _field_schema->definition_level) {
2167
8
                null_size += loop_skip;
2168
107
            } else {
2169
107
                nonnull_size += loop_skip;
2170
107
            }
2171
115
            skipped += loop_skip;
2172
115
        }
2173
102
        if (null_size > 0) {
2174
5
            RETURN_IF_ERROR(_chunk_reader->skip_values(null_size, false));
2175
5
        }
2176
102
        if (nonnull_size > 0) {
2177
101
            RETURN_IF_ERROR(_chunk_reader->skip_values(nonnull_size, true));
2178
101
        }
2179
102
    } else {
2180
0
        RETURN_IF_ERROR(_chunk_reader->skip_values(num_values));
2181
0
    }
2182
102
    return Status::OK();
2183
102
}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE12_skip_valuesEm
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb0EE12_skip_valuesEm
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE12_skip_valuesEm
_ZN5doris18ScalarColumnReaderILb0ELb0EE12_skip_valuesEm
Line
Count
Source
2143
244
Status ScalarColumnReader<IN_COLLECTION, OFFSET_INDEX>::_skip_values(size_t num_values) {
2144
244
    if (num_values == 0) {
2145
142
        return Status::OK();
2146
142
    }
2147
102
    if (_chunk_reader->max_def_level() > 0) {
2148
102
        LevelDecoder& def_decoder = _chunk_reader->def_level_decoder();
2149
102
        size_t skipped = 0;
2150
102
        size_t null_size = 0;
2151
102
        size_t nonnull_size = 0;
2152
217
        while (skipped < num_values) {
2153
115
            level_t def_level = -1;
2154
115
            size_t loop_skip = def_decoder.get_next_run(&def_level, num_values - skipped);
2155
115
            if (loop_skip == 0) {
2156
0
                std::stringstream ss;
2157
0
                const auto& bit_reader = def_decoder.rle_decoder().bit_reader();
2158
0
                ss << "def_decoder buffer (hex): ";
2159
0
                for (size_t i = 0; i < bit_reader.max_bytes(); ++i) {
2160
0
                    ss << std::hex << std::setw(2) << std::setfill('0')
2161
0
                       << static_cast<int>(bit_reader.buffer()[i]) << " ";
2162
0
                }
2163
0
                LOG(WARNING) << ss.str();
2164
0
                return Status::InternalError("Failed to decode definition level.");
2165
0
            }
2166
115
            if (def_level < _field_schema->definition_level) {
2167
8
                null_size += loop_skip;
2168
107
            } else {
2169
107
                nonnull_size += loop_skip;
2170
107
            }
2171
115
            skipped += loop_skip;
2172
115
        }
2173
102
        if (null_size > 0) {
2174
5
            RETURN_IF_ERROR(_chunk_reader->skip_values(null_size, false));
2175
5
        }
2176
102
        if (nonnull_size > 0) {
2177
101
            RETURN_IF_ERROR(_chunk_reader->skip_values(nonnull_size, true));
2178
101
        }
2179
102
    } else {
2180
0
        RETURN_IF_ERROR(_chunk_reader->skip_values(num_values));
2181
0
    }
2182
102
    return Status::OK();
2183
102
}
2184
2185
template <bool IN_COLLECTION, bool OFFSET_INDEX>
2186
Status ScalarColumnReader<IN_COLLECTION, OFFSET_INDEX>::_read_values(size_t num_values,
2187
                                                                     ColumnPtr& doris_column,
2188
                                                                     DataTypePtr& type,
2189
                                                                     FilterMap& filter_map,
2190
244
                                                                     bool is_dict_filter) {
2191
244
    if (num_values == 0) {
2192
0
        return Status::OK();
2193
0
    }
2194
244
    MutableColumnPtr data_column;
2195
244
    std::vector<uint16_t> null_map;
2196
244
    NullMap* map_data_column = nullptr;
2197
244
    if (doris_column->is_nullable()) {
2198
242
        SCOPED_RAW_TIMER(&_decode_null_map_time);
2199
        // doris_column either originates from a mutable block in vparquet_group_reader
2200
        // or is a newly created ColumnPtr, and therefore can be modified.
2201
242
        auto* nullable_column =
2202
242
                assert_cast<ColumnNullable*>(const_cast<IColumn*>(doris_column.get()));
2203
2204
242
        data_column = nullable_column->get_nested_column_ptr();
2205
242
        map_data_column = &(nullable_column->get_null_map_data());
2206
242
        if (_chunk_reader->max_def_level() > 0) {
2207
174
            LevelDecoder& def_decoder = _chunk_reader->def_level_decoder();
2208
174
            size_t has_read = 0;
2209
174
            bool prev_is_null = true;
2210
348
            while (has_read < num_values) {
2211
174
                level_t def_level;
2212
174
                size_t loop_read = def_decoder.get_next_run(&def_level, num_values - has_read);
2213
174
                if (loop_read == 0) {
2214
0
                    std::stringstream ss;
2215
0
                    const auto& bit_reader = def_decoder.rle_decoder().bit_reader();
2216
0
                    ss << "def_decoder buffer (hex): ";
2217
0
                    for (size_t i = 0; i < bit_reader.max_bytes(); ++i) {
2218
0
                        ss << std::hex << std::setw(2) << std::setfill('0')
2219
0
                           << static_cast<int>(bit_reader.buffer()[i]) << " ";
2220
0
                    }
2221
0
                    LOG(WARNING) << ss.str();
2222
0
                    return Status::InternalError("Failed to decode definition level.");
2223
0
                }
2224
2225
174
                bool is_null = def_level < _field_schema->definition_level;
2226
174
                if (!(prev_is_null ^ is_null)) {
2227
57
                    null_map.emplace_back(0);
2228
57
                }
2229
174
                size_t remaining = loop_read;
2230
174
                while (remaining > USHRT_MAX) {
2231
0
                    null_map.emplace_back(USHRT_MAX);
2232
0
                    null_map.emplace_back(0);
2233
0
                    remaining -= USHRT_MAX;
2234
0
                }
2235
174
                null_map.emplace_back((u_short)remaining);
2236
174
                prev_is_null = is_null;
2237
174
                has_read += loop_read;
2238
174
            }
2239
174
        }
2240
242
    } else {
2241
2
        if (_chunk_reader->max_def_level() > 0) {
2242
0
            return Status::Corruption("Not nullable column has null values in parquet file");
2243
0
        }
2244
2
        data_column = doris_column->assume_mutable();
2245
2
    }
2246
244
    if (null_map.empty()) {
2247
70
        size_t remaining = num_values;
2248
70
        while (remaining > USHRT_MAX) {
2249
0
            null_map.emplace_back(USHRT_MAX);
2250
0
            null_map.emplace_back(0);
2251
0
            remaining -= USHRT_MAX;
2252
0
        }
2253
70
        null_map.emplace_back((u_short)remaining);
2254
70
    }
2255
244
    ColumnSelectVector select_vector;
2256
244
    {
2257
244
        SCOPED_RAW_TIMER(&_decode_null_map_time);
2258
244
        RETURN_IF_ERROR(select_vector.init(null_map, num_values, map_data_column, &filter_map,
2259
244
                                           _filter_map_index));
2260
244
        _filter_map_index += num_values;
2261
244
    }
2262
0
    return _chunk_reader->decode_values(data_column, type, select_vector, is_dict_filter);
2263
244
}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE12_read_valuesEmRNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERSt10shared_ptrIKNS_9IDataTypeEERNS_9FilterMapEb
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb0EE12_read_valuesEmRNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERSt10shared_ptrIKNS_9IDataTypeEERNS_9FilterMapEb
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE12_read_valuesEmRNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERSt10shared_ptrIKNS_9IDataTypeEERNS_9FilterMapEb
_ZN5doris18ScalarColumnReaderILb0ELb0EE12_read_valuesEmRNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERSt10shared_ptrIKNS_9IDataTypeEERNS_9FilterMapEb
Line
Count
Source
2190
244
                                                                     bool is_dict_filter) {
2191
244
    if (num_values == 0) {
2192
0
        return Status::OK();
2193
0
    }
2194
244
    MutableColumnPtr data_column;
2195
244
    std::vector<uint16_t> null_map;
2196
244
    NullMap* map_data_column = nullptr;
2197
244
    if (doris_column->is_nullable()) {
2198
242
        SCOPED_RAW_TIMER(&_decode_null_map_time);
2199
        // doris_column either originates from a mutable block in vparquet_group_reader
2200
        // or is a newly created ColumnPtr, and therefore can be modified.
2201
242
        auto* nullable_column =
2202
242
                assert_cast<ColumnNullable*>(const_cast<IColumn*>(doris_column.get()));
2203
2204
242
        data_column = nullable_column->get_nested_column_ptr();
2205
242
        map_data_column = &(nullable_column->get_null_map_data());
2206
242
        if (_chunk_reader->max_def_level() > 0) {
2207
174
            LevelDecoder& def_decoder = _chunk_reader->def_level_decoder();
2208
174
            size_t has_read = 0;
2209
174
            bool prev_is_null = true;
2210
348
            while (has_read < num_values) {
2211
174
                level_t def_level;
2212
174
                size_t loop_read = def_decoder.get_next_run(&def_level, num_values - has_read);
2213
174
                if (loop_read == 0) {
2214
0
                    std::stringstream ss;
2215
0
                    const auto& bit_reader = def_decoder.rle_decoder().bit_reader();
2216
0
                    ss << "def_decoder buffer (hex): ";
2217
0
                    for (size_t i = 0; i < bit_reader.max_bytes(); ++i) {
2218
0
                        ss << std::hex << std::setw(2) << std::setfill('0')
2219
0
                           << static_cast<int>(bit_reader.buffer()[i]) << " ";
2220
0
                    }
2221
0
                    LOG(WARNING) << ss.str();
2222
0
                    return Status::InternalError("Failed to decode definition level.");
2223
0
                }
2224
2225
174
                bool is_null = def_level < _field_schema->definition_level;
2226
174
                if (!(prev_is_null ^ is_null)) {
2227
57
                    null_map.emplace_back(0);
2228
57
                }
2229
174
                size_t remaining = loop_read;
2230
174
                while (remaining > USHRT_MAX) {
2231
0
                    null_map.emplace_back(USHRT_MAX);
2232
0
                    null_map.emplace_back(0);
2233
0
                    remaining -= USHRT_MAX;
2234
0
                }
2235
174
                null_map.emplace_back((u_short)remaining);
2236
174
                prev_is_null = is_null;
2237
174
                has_read += loop_read;
2238
174
            }
2239
174
        }
2240
242
    } else {
2241
2
        if (_chunk_reader->max_def_level() > 0) {
2242
0
            return Status::Corruption("Not nullable column has null values in parquet file");
2243
0
        }
2244
2
        data_column = doris_column->assume_mutable();
2245
2
    }
2246
244
    if (null_map.empty()) {
2247
70
        size_t remaining = num_values;
2248
70
        while (remaining > USHRT_MAX) {
2249
0
            null_map.emplace_back(USHRT_MAX);
2250
0
            null_map.emplace_back(0);
2251
0
            remaining -= USHRT_MAX;
2252
0
        }
2253
70
        null_map.emplace_back((u_short)remaining);
2254
70
    }
2255
244
    ColumnSelectVector select_vector;
2256
244
    {
2257
244
        SCOPED_RAW_TIMER(&_decode_null_map_time);
2258
244
        RETURN_IF_ERROR(select_vector.init(null_map, num_values, map_data_column, &filter_map,
2259
244
                                           _filter_map_index));
2260
244
        _filter_map_index += num_values;
2261
244
    }
2262
0
    return _chunk_reader->decode_values(data_column, type, select_vector, is_dict_filter);
2263
244
}
2264
2265
/**
2266
 * Load the nested column data of complex type.
2267
 * A row of complex type may be stored across two(or more) pages, and the parameter `align_rows` indicates that
2268
 * whether the reader should read the remaining value of the last row in previous page.
2269
 */
2270
template <bool IN_COLLECTION, bool OFFSET_INDEX>
2271
// Existing nested scalar reader is the central row/page alignment loop for complex values.
2272
// NOLINTNEXTLINE(readability-function-cognitive-complexity,readability-function-size)
2273
Status ScalarColumnReader<IN_COLLECTION, OFFSET_INDEX>::_read_nested_column(
2274
        ColumnPtr& doris_column, DataTypePtr& type, FilterMap& filter_map, size_t batch_size,
2275
13
        size_t* read_rows, bool* eof, bool is_dict_filter) {
2276
13
    _rep_levels.clear();
2277
13
    _def_levels.clear();
2278
2279
    // Handle nullable columns
2280
13
    MutableColumnPtr data_column;
2281
13
    NullMap* map_data_column = nullptr;
2282
13
    if (doris_column->is_nullable()) {
2283
13
        SCOPED_RAW_TIMER(&_decode_null_map_time);
2284
        // doris_column either originates from a mutable block in vparquet_group_reader
2285
        // or is a newly created ColumnPtr, and therefore can be modified.
2286
13
        auto* nullable_column =
2287
13
                const_cast<ColumnNullable*>(assert_cast<const ColumnNullable*>(doris_column.get()));
2288
13
        data_column = nullable_column->get_nested_column_ptr();
2289
13
        map_data_column = &(nullable_column->get_null_map_data());
2290
13
    } else {
2291
0
        if (_field_schema->data_type->is_nullable()) {
2292
0
            return Status::Corruption("Not nullable column has null values in parquet file");
2293
0
        }
2294
0
        data_column = doris_column->assume_mutable();
2295
0
    }
2296
2297
13
    std::vector<uint16_t> null_map;
2298
13
    std::unordered_set<size_t> ancestor_null_indices;
2299
13
    std::vector<uint8_t> nested_filter_map_data;
2300
2301
13
    auto read_and_fill_data = [&](size_t before_rep_level_sz, size_t filter_map_index) {
2302
13
        RETURN_IF_ERROR(_chunk_reader->fill_def(_def_levels));
2303
13
        std::unique_ptr<FilterMap> nested_filter_map = std::make_unique<FilterMap>();
2304
13
        if (filter_map.has_filter()) {
2305
0
            RETURN_IF_ERROR(gen_filter_map(filter_map, filter_map_index, before_rep_level_sz,
2306
0
                                           _rep_levels.size(), nested_filter_map_data,
2307
0
                                           &nested_filter_map));
2308
0
        }
2309
2310
13
        null_map.clear();
2311
13
        ancestor_null_indices.clear();
2312
13
        RETURN_IF_ERROR(gen_nested_null_map(before_rep_level_sz, _rep_levels.size(), null_map,
2313
13
                                            ancestor_null_indices));
2314
2315
13
        ColumnSelectVector select_vector;
2316
13
        {
2317
13
            SCOPED_RAW_TIMER(&_decode_null_map_time);
2318
13
            RETURN_IF_ERROR(select_vector.init(
2319
13
                    null_map,
2320
13
                    _rep_levels.size() - before_rep_level_sz - ancestor_null_indices.size(),
2321
13
                    map_data_column, nested_filter_map.get(), 0, &ancestor_null_indices));
2322
13
        }
2323
2324
13
        RETURN_IF_ERROR(
2325
13
                _chunk_reader->decode_values(data_column, type, select_vector, is_dict_filter));
2326
13
        if (!ancestor_null_indices.empty()) {
2327
0
            RETURN_IF_ERROR(_chunk_reader->skip_values(ancestor_null_indices.size(), false));
2328
0
        }
2329
13
        if (filter_map.has_filter()) {
2330
0
            auto new_rep_sz = before_rep_level_sz;
2331
0
            for (size_t idx = before_rep_level_sz; idx < _rep_levels.size(); idx++) {
2332
0
                if (nested_filter_map_data[idx - before_rep_level_sz]) {
2333
0
                    _rep_levels[new_rep_sz] = _rep_levels[idx];
2334
0
                    _def_levels[new_rep_sz] = _def_levels[idx];
2335
0
                    new_rep_sz++;
2336
0
                }
2337
0
            }
2338
0
            _rep_levels.resize(new_rep_sz);
2339
0
            _def_levels.resize(new_rep_sz);
2340
0
        }
2341
13
        return Status::OK();
2342
13
    };
Unexecuted instantiation: _ZZN5doris18ScalarColumnReaderILb1ELb1EE19_read_nested_columnERNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERSt10shared_ptrIKNS_9IDataTypeEERNS_9FilterMapEmPmPbbENKUlmmE_clEmm
_ZZN5doris18ScalarColumnReaderILb1ELb0EE19_read_nested_columnERNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERSt10shared_ptrIKNS_9IDataTypeEERNS_9FilterMapEmPmPbbENKUlmmE_clEmm
Line
Count
Source
2301
3
    auto read_and_fill_data = [&](size_t before_rep_level_sz, size_t filter_map_index) {
2302
3
        RETURN_IF_ERROR(_chunk_reader->fill_def(_def_levels));
2303
3
        std::unique_ptr<FilterMap> nested_filter_map = std::make_unique<FilterMap>();
2304
3
        if (filter_map.has_filter()) {
2305
0
            RETURN_IF_ERROR(gen_filter_map(filter_map, filter_map_index, before_rep_level_sz,
2306
0
                                           _rep_levels.size(), nested_filter_map_data,
2307
0
                                           &nested_filter_map));
2308
0
        }
2309
2310
3
        null_map.clear();
2311
3
        ancestor_null_indices.clear();
2312
3
        RETURN_IF_ERROR(gen_nested_null_map(before_rep_level_sz, _rep_levels.size(), null_map,
2313
3
                                            ancestor_null_indices));
2314
2315
3
        ColumnSelectVector select_vector;
2316
3
        {
2317
3
            SCOPED_RAW_TIMER(&_decode_null_map_time);
2318
3
            RETURN_IF_ERROR(select_vector.init(
2319
3
                    null_map,
2320
3
                    _rep_levels.size() - before_rep_level_sz - ancestor_null_indices.size(),
2321
3
                    map_data_column, nested_filter_map.get(), 0, &ancestor_null_indices));
2322
3
        }
2323
2324
3
        RETURN_IF_ERROR(
2325
3
                _chunk_reader->decode_values(data_column, type, select_vector, is_dict_filter));
2326
3
        if (!ancestor_null_indices.empty()) {
2327
0
            RETURN_IF_ERROR(_chunk_reader->skip_values(ancestor_null_indices.size(), false));
2328
0
        }
2329
3
        if (filter_map.has_filter()) {
2330
0
            auto new_rep_sz = before_rep_level_sz;
2331
0
            for (size_t idx = before_rep_level_sz; idx < _rep_levels.size(); idx++) {
2332
0
                if (nested_filter_map_data[idx - before_rep_level_sz]) {
2333
0
                    _rep_levels[new_rep_sz] = _rep_levels[idx];
2334
0
                    _def_levels[new_rep_sz] = _def_levels[idx];
2335
0
                    new_rep_sz++;
2336
0
                }
2337
0
            }
2338
0
            _rep_levels.resize(new_rep_sz);
2339
0
            _def_levels.resize(new_rep_sz);
2340
0
        }
2341
3
        return Status::OK();
2342
3
    };
Unexecuted instantiation: _ZZN5doris18ScalarColumnReaderILb0ELb1EE19_read_nested_columnERNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERSt10shared_ptrIKNS_9IDataTypeEERNS_9FilterMapEmPmPbbENKUlmmE_clEmm
_ZZN5doris18ScalarColumnReaderILb0ELb0EE19_read_nested_columnERNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERSt10shared_ptrIKNS_9IDataTypeEERNS_9FilterMapEmPmPbbENKUlmmE_clEmm
Line
Count
Source
2301
10
    auto read_and_fill_data = [&](size_t before_rep_level_sz, size_t filter_map_index) {
2302
10
        RETURN_IF_ERROR(_chunk_reader->fill_def(_def_levels));
2303
10
        std::unique_ptr<FilterMap> nested_filter_map = std::make_unique<FilterMap>();
2304
10
        if (filter_map.has_filter()) {
2305
0
            RETURN_IF_ERROR(gen_filter_map(filter_map, filter_map_index, before_rep_level_sz,
2306
0
                                           _rep_levels.size(), nested_filter_map_data,
2307
0
                                           &nested_filter_map));
2308
0
        }
2309
2310
10
        null_map.clear();
2311
10
        ancestor_null_indices.clear();
2312
10
        RETURN_IF_ERROR(gen_nested_null_map(before_rep_level_sz, _rep_levels.size(), null_map,
2313
10
                                            ancestor_null_indices));
2314
2315
10
        ColumnSelectVector select_vector;
2316
10
        {
2317
10
            SCOPED_RAW_TIMER(&_decode_null_map_time);
2318
10
            RETURN_IF_ERROR(select_vector.init(
2319
10
                    null_map,
2320
10
                    _rep_levels.size() - before_rep_level_sz - ancestor_null_indices.size(),
2321
10
                    map_data_column, nested_filter_map.get(), 0, &ancestor_null_indices));
2322
10
        }
2323
2324
10
        RETURN_IF_ERROR(
2325
10
                _chunk_reader->decode_values(data_column, type, select_vector, is_dict_filter));
2326
10
        if (!ancestor_null_indices.empty()) {
2327
0
            RETURN_IF_ERROR(_chunk_reader->skip_values(ancestor_null_indices.size(), false));
2328
0
        }
2329
10
        if (filter_map.has_filter()) {
2330
0
            auto new_rep_sz = before_rep_level_sz;
2331
0
            for (size_t idx = before_rep_level_sz; idx < _rep_levels.size(); idx++) {
2332
0
                if (nested_filter_map_data[idx - before_rep_level_sz]) {
2333
0
                    _rep_levels[new_rep_sz] = _rep_levels[idx];
2334
0
                    _def_levels[new_rep_sz] = _def_levels[idx];
2335
0
                    new_rep_sz++;
2336
0
                }
2337
0
            }
2338
0
            _rep_levels.resize(new_rep_sz);
2339
0
            _def_levels.resize(new_rep_sz);
2340
0
        }
2341
10
        return Status::OK();
2342
10
    };
2343
2344
15
    while (_current_range_idx < _row_ranges.range_size()) {
2345
13
        size_t left_row =
2346
13
                std::max(_current_row_index, _row_ranges.get_range_from(_current_range_idx));
2347
13
        size_t right_row = std::min(left_row + batch_size - *read_rows,
2348
13
                                    (size_t)_row_ranges.get_range_to(_current_range_idx));
2349
13
        _current_row_index = left_row;
2350
13
        RETURN_IF_ERROR(_chunk_reader->seek_to_nested_row(left_row));
2351
13
        size_t load_rows = 0;
2352
13
        bool cross_page = false;
2353
13
        size_t before_rep_level_sz = _rep_levels.size();
2354
13
        RETURN_IF_ERROR(_chunk_reader->load_page_nested_rows(_rep_levels, right_row - left_row,
2355
13
                                                             &load_rows, &cross_page));
2356
13
        RETURN_IF_ERROR(read_and_fill_data(before_rep_level_sz, _filter_map_index));
2357
13
        _filter_map_index += load_rows;
2358
13
        while (cross_page) {
2359
0
            before_rep_level_sz = _rep_levels.size();
2360
0
            RETURN_IF_ERROR(_chunk_reader->load_cross_page_nested_row(_rep_levels, &cross_page));
2361
0
            RETURN_IF_ERROR(read_and_fill_data(before_rep_level_sz, _filter_map_index - 1));
2362
0
        }
2363
13
        *read_rows += load_rows;
2364
13
        _current_row_index += load_rows;
2365
13
        _current_range_idx += (_current_row_index == _row_ranges.get_range_to(_current_range_idx));
2366
13
        if (*read_rows == batch_size) {
2367
11
            break;
2368
11
        }
2369
13
    }
2370
13
    *eof = _current_range_idx == _row_ranges.range_size();
2371
13
    return Status::OK();
2372
13
}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE19_read_nested_columnERNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERSt10shared_ptrIKNS_9IDataTypeEERNS_9FilterMapEmPmPbb
_ZN5doris18ScalarColumnReaderILb1ELb0EE19_read_nested_columnERNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERSt10shared_ptrIKNS_9IDataTypeEERNS_9FilterMapEmPmPbb
Line
Count
Source
2275
3
        size_t* read_rows, bool* eof, bool is_dict_filter) {
2276
3
    _rep_levels.clear();
2277
3
    _def_levels.clear();
2278
2279
    // Handle nullable columns
2280
3
    MutableColumnPtr data_column;
2281
3
    NullMap* map_data_column = nullptr;
2282
3
    if (doris_column->is_nullable()) {
2283
3
        SCOPED_RAW_TIMER(&_decode_null_map_time);
2284
        // doris_column either originates from a mutable block in vparquet_group_reader
2285
        // or is a newly created ColumnPtr, and therefore can be modified.
2286
3
        auto* nullable_column =
2287
3
                const_cast<ColumnNullable*>(assert_cast<const ColumnNullable*>(doris_column.get()));
2288
3
        data_column = nullable_column->get_nested_column_ptr();
2289
3
        map_data_column = &(nullable_column->get_null_map_data());
2290
3
    } else {
2291
0
        if (_field_schema->data_type->is_nullable()) {
2292
0
            return Status::Corruption("Not nullable column has null values in parquet file");
2293
0
        }
2294
0
        data_column = doris_column->assume_mutable();
2295
0
    }
2296
2297
3
    std::vector<uint16_t> null_map;
2298
3
    std::unordered_set<size_t> ancestor_null_indices;
2299
3
    std::vector<uint8_t> nested_filter_map_data;
2300
2301
3
    auto read_and_fill_data = [&](size_t before_rep_level_sz, size_t filter_map_index) {
2302
3
        RETURN_IF_ERROR(_chunk_reader->fill_def(_def_levels));
2303
3
        std::unique_ptr<FilterMap> nested_filter_map = std::make_unique<FilterMap>();
2304
3
        if (filter_map.has_filter()) {
2305
3
            RETURN_IF_ERROR(gen_filter_map(filter_map, filter_map_index, before_rep_level_sz,
2306
3
                                           _rep_levels.size(), nested_filter_map_data,
2307
3
                                           &nested_filter_map));
2308
3
        }
2309
2310
3
        null_map.clear();
2311
3
        ancestor_null_indices.clear();
2312
3
        RETURN_IF_ERROR(gen_nested_null_map(before_rep_level_sz, _rep_levels.size(), null_map,
2313
3
                                            ancestor_null_indices));
2314
2315
3
        ColumnSelectVector select_vector;
2316
3
        {
2317
3
            SCOPED_RAW_TIMER(&_decode_null_map_time);
2318
3
            RETURN_IF_ERROR(select_vector.init(
2319
3
                    null_map,
2320
3
                    _rep_levels.size() - before_rep_level_sz - ancestor_null_indices.size(),
2321
3
                    map_data_column, nested_filter_map.get(), 0, &ancestor_null_indices));
2322
3
        }
2323
2324
3
        RETURN_IF_ERROR(
2325
3
                _chunk_reader->decode_values(data_column, type, select_vector, is_dict_filter));
2326
3
        if (!ancestor_null_indices.empty()) {
2327
3
            RETURN_IF_ERROR(_chunk_reader->skip_values(ancestor_null_indices.size(), false));
2328
3
        }
2329
3
        if (filter_map.has_filter()) {
2330
3
            auto new_rep_sz = before_rep_level_sz;
2331
3
            for (size_t idx = before_rep_level_sz; idx < _rep_levels.size(); idx++) {
2332
3
                if (nested_filter_map_data[idx - before_rep_level_sz]) {
2333
3
                    _rep_levels[new_rep_sz] = _rep_levels[idx];
2334
3
                    _def_levels[new_rep_sz] = _def_levels[idx];
2335
3
                    new_rep_sz++;
2336
3
                }
2337
3
            }
2338
3
            _rep_levels.resize(new_rep_sz);
2339
3
            _def_levels.resize(new_rep_sz);
2340
3
        }
2341
3
        return Status::OK();
2342
3
    };
2343
2344
3
    while (_current_range_idx < _row_ranges.range_size()) {
2345
3
        size_t left_row =
2346
3
                std::max(_current_row_index, _row_ranges.get_range_from(_current_range_idx));
2347
3
        size_t right_row = std::min(left_row + batch_size - *read_rows,
2348
3
                                    (size_t)_row_ranges.get_range_to(_current_range_idx));
2349
3
        _current_row_index = left_row;
2350
3
        RETURN_IF_ERROR(_chunk_reader->seek_to_nested_row(left_row));
2351
3
        size_t load_rows = 0;
2352
3
        bool cross_page = false;
2353
3
        size_t before_rep_level_sz = _rep_levels.size();
2354
3
        RETURN_IF_ERROR(_chunk_reader->load_page_nested_rows(_rep_levels, right_row - left_row,
2355
3
                                                             &load_rows, &cross_page));
2356
3
        RETURN_IF_ERROR(read_and_fill_data(before_rep_level_sz, _filter_map_index));
2357
3
        _filter_map_index += load_rows;
2358
3
        while (cross_page) {
2359
0
            before_rep_level_sz = _rep_levels.size();
2360
0
            RETURN_IF_ERROR(_chunk_reader->load_cross_page_nested_row(_rep_levels, &cross_page));
2361
0
            RETURN_IF_ERROR(read_and_fill_data(before_rep_level_sz, _filter_map_index - 1));
2362
0
        }
2363
3
        *read_rows += load_rows;
2364
3
        _current_row_index += load_rows;
2365
3
        _current_range_idx += (_current_row_index == _row_ranges.get_range_to(_current_range_idx));
2366
3
        if (*read_rows == batch_size) {
2367
3
            break;
2368
3
        }
2369
3
    }
2370
3
    *eof = _current_range_idx == _row_ranges.range_size();
2371
3
    return Status::OK();
2372
3
}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE19_read_nested_columnERNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERSt10shared_ptrIKNS_9IDataTypeEERNS_9FilterMapEmPmPbb
_ZN5doris18ScalarColumnReaderILb0ELb0EE19_read_nested_columnERNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERSt10shared_ptrIKNS_9IDataTypeEERNS_9FilterMapEmPmPbb
Line
Count
Source
2275
10
        size_t* read_rows, bool* eof, bool is_dict_filter) {
2276
10
    _rep_levels.clear();
2277
10
    _def_levels.clear();
2278
2279
    // Handle nullable columns
2280
10
    MutableColumnPtr data_column;
2281
10
    NullMap* map_data_column = nullptr;
2282
10
    if (doris_column->is_nullable()) {
2283
10
        SCOPED_RAW_TIMER(&_decode_null_map_time);
2284
        // doris_column either originates from a mutable block in vparquet_group_reader
2285
        // or is a newly created ColumnPtr, and therefore can be modified.
2286
10
        auto* nullable_column =
2287
10
                const_cast<ColumnNullable*>(assert_cast<const ColumnNullable*>(doris_column.get()));
2288
10
        data_column = nullable_column->get_nested_column_ptr();
2289
10
        map_data_column = &(nullable_column->get_null_map_data());
2290
10
    } else {
2291
0
        if (_field_schema->data_type->is_nullable()) {
2292
0
            return Status::Corruption("Not nullable column has null values in parquet file");
2293
0
        }
2294
0
        data_column = doris_column->assume_mutable();
2295
0
    }
2296
2297
10
    std::vector<uint16_t> null_map;
2298
10
    std::unordered_set<size_t> ancestor_null_indices;
2299
10
    std::vector<uint8_t> nested_filter_map_data;
2300
2301
10
    auto read_and_fill_data = [&](size_t before_rep_level_sz, size_t filter_map_index) {
2302
10
        RETURN_IF_ERROR(_chunk_reader->fill_def(_def_levels));
2303
10
        std::unique_ptr<FilterMap> nested_filter_map = std::make_unique<FilterMap>();
2304
10
        if (filter_map.has_filter()) {
2305
10
            RETURN_IF_ERROR(gen_filter_map(filter_map, filter_map_index, before_rep_level_sz,
2306
10
                                           _rep_levels.size(), nested_filter_map_data,
2307
10
                                           &nested_filter_map));
2308
10
        }
2309
2310
10
        null_map.clear();
2311
10
        ancestor_null_indices.clear();
2312
10
        RETURN_IF_ERROR(gen_nested_null_map(before_rep_level_sz, _rep_levels.size(), null_map,
2313
10
                                            ancestor_null_indices));
2314
2315
10
        ColumnSelectVector select_vector;
2316
10
        {
2317
10
            SCOPED_RAW_TIMER(&_decode_null_map_time);
2318
10
            RETURN_IF_ERROR(select_vector.init(
2319
10
                    null_map,
2320
10
                    _rep_levels.size() - before_rep_level_sz - ancestor_null_indices.size(),
2321
10
                    map_data_column, nested_filter_map.get(), 0, &ancestor_null_indices));
2322
10
        }
2323
2324
10
        RETURN_IF_ERROR(
2325
10
                _chunk_reader->decode_values(data_column, type, select_vector, is_dict_filter));
2326
10
        if (!ancestor_null_indices.empty()) {
2327
10
            RETURN_IF_ERROR(_chunk_reader->skip_values(ancestor_null_indices.size(), false));
2328
10
        }
2329
10
        if (filter_map.has_filter()) {
2330
10
            auto new_rep_sz = before_rep_level_sz;
2331
10
            for (size_t idx = before_rep_level_sz; idx < _rep_levels.size(); idx++) {
2332
10
                if (nested_filter_map_data[idx - before_rep_level_sz]) {
2333
10
                    _rep_levels[new_rep_sz] = _rep_levels[idx];
2334
10
                    _def_levels[new_rep_sz] = _def_levels[idx];
2335
10
                    new_rep_sz++;
2336
10
                }
2337
10
            }
2338
10
            _rep_levels.resize(new_rep_sz);
2339
10
            _def_levels.resize(new_rep_sz);
2340
10
        }
2341
10
        return Status::OK();
2342
10
    };
2343
2344
12
    while (_current_range_idx < _row_ranges.range_size()) {
2345
10
        size_t left_row =
2346
10
                std::max(_current_row_index, _row_ranges.get_range_from(_current_range_idx));
2347
10
        size_t right_row = std::min(left_row + batch_size - *read_rows,
2348
10
                                    (size_t)_row_ranges.get_range_to(_current_range_idx));
2349
10
        _current_row_index = left_row;
2350
10
        RETURN_IF_ERROR(_chunk_reader->seek_to_nested_row(left_row));
2351
10
        size_t load_rows = 0;
2352
10
        bool cross_page = false;
2353
10
        size_t before_rep_level_sz = _rep_levels.size();
2354
10
        RETURN_IF_ERROR(_chunk_reader->load_page_nested_rows(_rep_levels, right_row - left_row,
2355
10
                                                             &load_rows, &cross_page));
2356
10
        RETURN_IF_ERROR(read_and_fill_data(before_rep_level_sz, _filter_map_index));
2357
10
        _filter_map_index += load_rows;
2358
10
        while (cross_page) {
2359
0
            before_rep_level_sz = _rep_levels.size();
2360
0
            RETURN_IF_ERROR(_chunk_reader->load_cross_page_nested_row(_rep_levels, &cross_page));
2361
0
            RETURN_IF_ERROR(read_and_fill_data(before_rep_level_sz, _filter_map_index - 1));
2362
0
        }
2363
10
        *read_rows += load_rows;
2364
10
        _current_row_index += load_rows;
2365
10
        _current_range_idx += (_current_row_index == _row_ranges.get_range_to(_current_range_idx));
2366
10
        if (*read_rows == batch_size) {
2367
8
            break;
2368
8
        }
2369
10
    }
2370
10
    *eof = _current_range_idx == _row_ranges.range_size();
2371
10
    return Status::OK();
2372
10
}
2373
2374
template <bool IN_COLLECTION, bool OFFSET_INDEX>
2375
Status ScalarColumnReader<IN_COLLECTION, OFFSET_INDEX>::_read_and_skip_nested_levels(
2376
        FilterMap& filter_map, size_t before_rep_level_sz, size_t filter_map_index,
2377
0
        std::vector<uint8_t>& nested_filter_map_data) {
2378
0
    RETURN_IF_ERROR(_chunk_reader->fill_def(_def_levels));
2379
0
    RETURN_IF_ERROR(_chunk_reader->skip_nested_values(_def_levels, before_rep_level_sz,
2380
0
                                                      _def_levels.size()));
2381
0
    if (!filter_map.has_filter()) {
2382
0
        return Status::OK();
2383
0
    }
2384
2385
0
    std::unique_ptr<FilterMap> nested_filter_map = std::make_unique<FilterMap>();
2386
0
    RETURN_IF_ERROR(gen_filter_map(filter_map, filter_map_index, before_rep_level_sz,
2387
0
                                   _rep_levels.size(), nested_filter_map_data, &nested_filter_map));
2388
0
    auto new_rep_sz = before_rep_level_sz;
2389
0
    for (size_t idx = before_rep_level_sz; idx < _rep_levels.size(); idx++) {
2390
0
        if (nested_filter_map_data[idx - before_rep_level_sz]) {
2391
0
            _rep_levels[new_rep_sz] = _rep_levels[idx];
2392
0
            _def_levels[new_rep_sz] = _def_levels[idx];
2393
0
            new_rep_sz++;
2394
0
        }
2395
0
    }
2396
0
    _rep_levels.resize(new_rep_sz);
2397
0
    _def_levels.resize(new_rep_sz);
2398
0
    return Status::OK();
2399
0
}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE28_read_and_skip_nested_levelsERNS_9FilterMapEmmRSt6vectorIhSaIhEE
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb0EE28_read_and_skip_nested_levelsERNS_9FilterMapEmmRSt6vectorIhSaIhEE
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE28_read_and_skip_nested_levelsERNS_9FilterMapEmmRSt6vectorIhSaIhEE
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb0EE28_read_and_skip_nested_levelsERNS_9FilterMapEmmRSt6vectorIhSaIhEE
2400
2401
template <bool IN_COLLECTION, bool OFFSET_INDEX>
2402
Status ScalarColumnReader<IN_COLLECTION, OFFSET_INDEX>::read_nested_levels(FilterMap& filter_map,
2403
                                                                           size_t batch_size,
2404
                                                                           size_t* read_rows,
2405
0
                                                                           bool* eof) {
2406
0
    _rep_levels.clear();
2407
0
    _def_levels.clear();
2408
0
    *read_rows = 0;
2409
2410
0
    std::vector<uint8_t> nested_filter_map_data;
2411
2412
0
    while (_current_range_idx < _row_ranges.range_size()) {
2413
0
        size_t left_row =
2414
0
                std::max(_current_row_index, _row_ranges.get_range_from(_current_range_idx));
2415
0
        size_t right_row = std::min(left_row + batch_size - *read_rows,
2416
0
                                    (size_t)_row_ranges.get_range_to(_current_range_idx));
2417
0
        _current_row_index = left_row;
2418
0
        RETURN_IF_ERROR(_chunk_reader->seek_to_nested_row(left_row));
2419
0
        size_t load_rows = 0;
2420
0
        bool cross_page = false;
2421
0
        size_t before_rep_level_sz = _rep_levels.size();
2422
0
        RETURN_IF_ERROR(_chunk_reader->load_page_nested_rows(_rep_levels, right_row - left_row,
2423
0
                                                             &load_rows, &cross_page));
2424
0
        RETURN_IF_ERROR(_read_and_skip_nested_levels(filter_map, before_rep_level_sz,
2425
0
                                                     _filter_map_index, nested_filter_map_data));
2426
0
        _filter_map_index += load_rows;
2427
0
        while (cross_page) {
2428
0
            before_rep_level_sz = _rep_levels.size();
2429
0
            RETURN_IF_ERROR(_chunk_reader->load_cross_page_nested_row(_rep_levels, &cross_page));
2430
0
            RETURN_IF_ERROR(_read_and_skip_nested_levels(filter_map, before_rep_level_sz,
2431
0
                                                         _filter_map_index - 1,
2432
0
                                                         nested_filter_map_data));
2433
0
        }
2434
0
        *read_rows += load_rows;
2435
0
        _current_row_index += load_rows;
2436
0
        _current_range_idx += (_current_row_index == _row_ranges.get_range_to(_current_range_idx));
2437
0
        if (*read_rows == batch_size) {
2438
0
            break;
2439
0
        }
2440
0
    }
2441
0
    *eof = _current_range_idx == _row_ranges.range_size();
2442
0
    return Status::OK();
2443
0
}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE18read_nested_levelsERNS_9FilterMapEmPmPb
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb0EE18read_nested_levelsERNS_9FilterMapEmPmPb
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE18read_nested_levelsERNS_9FilterMapEmPmPb
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb0EE18read_nested_levelsERNS_9FilterMapEmPmPb
2444
2445
template <bool IN_COLLECTION, bool OFFSET_INDEX>
2446
Status ScalarColumnReader<IN_COLLECTION, OFFSET_INDEX>::read_dict_values_to_column(
2447
2
        MutableColumnPtr& doris_column, bool* has_dict) {
2448
2
    bool loaded;
2449
2
    RETURN_IF_ERROR(_try_load_dict_page(&loaded, has_dict));
2450
2
    if (loaded && *has_dict) {
2451
2
        return _chunk_reader->read_dict_values_to_column(doris_column);
2452
2
    }
2453
0
    return Status::OK();
2454
2
}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE26read_dict_values_to_columnERNS_3COWINS_7IColumnEE11mutable_ptrIS3_EEPb
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb0EE26read_dict_values_to_columnERNS_3COWINS_7IColumnEE11mutable_ptrIS3_EEPb
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE26read_dict_values_to_columnERNS_3COWINS_7IColumnEE11mutable_ptrIS3_EEPb
_ZN5doris18ScalarColumnReaderILb0ELb0EE26read_dict_values_to_columnERNS_3COWINS_7IColumnEE11mutable_ptrIS3_EEPb
Line
Count
Source
2447
2
        MutableColumnPtr& doris_column, bool* has_dict) {
2448
2
    bool loaded;
2449
2
    RETURN_IF_ERROR(_try_load_dict_page(&loaded, has_dict));
2450
2
    if (loaded && *has_dict) {
2451
2
        return _chunk_reader->read_dict_values_to_column(doris_column);
2452
2
    }
2453
0
    return Status::OK();
2454
2
}
2455
template <bool IN_COLLECTION, bool OFFSET_INDEX>
2456
Result<MutableColumnPtr>
2457
ScalarColumnReader<IN_COLLECTION, OFFSET_INDEX>::convert_dict_column_to_string_column(
2458
0
        const ColumnInt32* dict_column) {
2459
0
    return _chunk_reader->convert_dict_column_to_string_column(dict_column);
2460
0
}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE36convert_dict_column_to_string_columnEPKNS_12ColumnVectorILNS_13PrimitiveTypeE5EEE
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb0EE36convert_dict_column_to_string_columnEPKNS_12ColumnVectorILNS_13PrimitiveTypeE5EEE
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE36convert_dict_column_to_string_columnEPKNS_12ColumnVectorILNS_13PrimitiveTypeE5EEE
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb0EE36convert_dict_column_to_string_columnEPKNS_12ColumnVectorILNS_13PrimitiveTypeE5EEE
2461
2462
template <bool IN_COLLECTION, bool OFFSET_INDEX>
2463
Status ScalarColumnReader<IN_COLLECTION, OFFSET_INDEX>::_try_load_dict_page(bool* loaded,
2464
2
                                                                            bool* has_dict) {
2465
    // _chunk_reader init will load first page header to check whether has dict page
2466
2
    *loaded = true;
2467
2
    *has_dict = _chunk_reader->has_dict();
2468
2
    return Status::OK();
2469
2
}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE19_try_load_dict_pageEPbS2_
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb0EE19_try_load_dict_pageEPbS2_
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE19_try_load_dict_pageEPbS2_
_ZN5doris18ScalarColumnReaderILb0ELb0EE19_try_load_dict_pageEPbS2_
Line
Count
Source
2464
2
                                                                            bool* has_dict) {
2465
    // _chunk_reader init will load first page header to check whether has dict page
2466
2
    *loaded = true;
2467
2
    *has_dict = _chunk_reader->has_dict();
2468
2
    return Status::OK();
2469
2
}
2470
2471
template <bool IN_COLLECTION, bool OFFSET_INDEX>
2472
// Existing scalar read path handles page iteration, filtering, and conversion in one dispatch loop.
2473
// NOLINTNEXTLINE(readability-function-cognitive-complexity,readability-function-size)
2474
Status ScalarColumnReader<IN_COLLECTION, OFFSET_INDEX>::read_column_data(
2475
        ColumnPtr& doris_column, const DataTypePtr& type,
2476
        const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, FilterMap& filter_map,
2477
        size_t batch_size, size_t* read_rows, bool* eof, bool is_dict_filter,
2478
287
        int64_t real_column_size) {
2479
287
    if (_converter == nullptr) {
2480
114
        _converter = parquet::PhysicalToLogicalConverter::get_converter(
2481
114
                _field_schema, _field_schema->data_type, type, _ctz, is_dict_filter);
2482
114
        if (!_converter->support()) {
2483
0
            return Status::InternalError(
2484
0
                    "The column type of '{}' is not supported: {}, is_dict_filter: {}, "
2485
0
                    "src_logical_type: {}, dst_logical_type: {}",
2486
0
                    _field_schema->name, _converter->get_error_msg(), is_dict_filter,
2487
0
                    _field_schema->data_type->get_name(), type->get_name());
2488
0
        }
2489
114
    }
2490
    // !FIXME: We should verify whether the get_physical_column logic is correct, why do we return a doris_column?
2491
287
    ColumnPtr resolved_column =
2492
287
            _converter->get_physical_column(_field_schema->physical_type, _field_schema->data_type,
2493
287
                                            doris_column, type, is_dict_filter);
2494
287
    DataTypePtr& resolved_type = _converter->get_physical_type();
2495
2496
287
    _def_levels.clear();
2497
287
    _rep_levels.clear();
2498
287
    *read_rows = 0;
2499
2500
287
    if (_in_nested) {
2501
13
        RETURN_IF_ERROR(_read_nested_column(resolved_column, resolved_type, filter_map, batch_size,
2502
13
                                            read_rows, eof, is_dict_filter));
2503
13
        return _converter->convert(resolved_column, _field_schema->data_type, type, doris_column,
2504
13
                                   is_dict_filter);
2505
13
    }
2506
2507
274
    int64_t right_row = 0;
2508
274
    if constexpr (OFFSET_INDEX == false) {
2509
274
        RETURN_IF_ERROR(_chunk_reader->parse_page_header());
2510
274
        right_row = _chunk_reader->page_end_row();
2511
274
    } else {
2512
0
        right_row = _chunk_reader->page_end_row();
2513
0
    }
2514
2515
274
    do {
2516
        // generate the row ranges that should be read
2517
274
        RowRanges read_ranges;
2518
274
        _generate_read_ranges(RowRange {_current_row_index, right_row}, &read_ranges);
2519
274
        if (read_ranges.count() == 0) {
2520
            // skip the whole page
2521
63
            _current_row_index = right_row;
2522
211
        } else {
2523
211
            bool skip_whole_batch = false;
2524
            // Determining whether to skip page or batch will increase the calculation time.
2525
            // When the filtering effect is greater than 60%, it is possible to skip the page or batch.
2526
211
            if (filter_map.has_filter() && filter_map.filter_ratio() > 0.6) {
2527
                // lazy read
2528
0
                size_t remaining_num_values = read_ranges.count();
2529
0
                if (batch_size >= remaining_num_values &&
2530
0
                    filter_map.can_filter_all(remaining_num_values, _filter_map_index)) {
2531
                    // We can skip the whole page if the remaining values are filtered by predicate columns
2532
0
                    _filter_map_index += remaining_num_values;
2533
0
                    _current_row_index = right_row;
2534
0
                    *read_rows = remaining_num_values;
2535
0
                    break;
2536
0
                }
2537
0
                skip_whole_batch = batch_size <= remaining_num_values &&
2538
0
                                   filter_map.can_filter_all(batch_size, _filter_map_index);
2539
0
                if (skip_whole_batch) {
2540
0
                    _filter_map_index += batch_size;
2541
0
                }
2542
0
            }
2543
            // load page data to decode or skip values
2544
211
            RETURN_IF_ERROR(_chunk_reader->parse_page_header());
2545
211
            RETURN_IF_ERROR(_chunk_reader->load_page_data_idempotent());
2546
211
            size_t has_read = 0;
2547
344
            for (size_t idx = 0; idx < read_ranges.range_size(); idx++) {
2548
244
                auto range = read_ranges.get_range(idx);
2549
                // generate the skipped values
2550
244
                size_t skip_values = range.from() - _current_row_index;
2551
244
                RETURN_IF_ERROR(_skip_values(skip_values));
2552
244
                _current_row_index += skip_values;
2553
                // generate the read values
2554
244
                size_t read_values =
2555
244
                        std::min((size_t)(range.to() - range.from()), batch_size - has_read);
2556
244
                if (skip_whole_batch) {
2557
0
                    RETURN_IF_ERROR(_skip_values(read_values));
2558
244
                } else {
2559
244
                    RETURN_IF_ERROR(_read_values(read_values, resolved_column, resolved_type,
2560
244
                                                 filter_map, is_dict_filter));
2561
244
                }
2562
244
                has_read += read_values;
2563
244
                *read_rows += read_values;
2564
244
                _current_row_index += read_values;
2565
244
                if (has_read == batch_size) {
2566
111
                    break;
2567
111
                }
2568
244
            }
2569
211
        }
2570
274
    } while (false);
2571
2572
274
    if (right_row == _current_row_index) {
2573
101
        if (!_chunk_reader->has_next_page()) {
2574
101
            *eof = true;
2575
101
        } else {
2576
0
            RETURN_IF_ERROR(_chunk_reader->next_page());
2577
0
        }
2578
101
    }
2579
2580
274
    {
2581
274
        SCOPED_RAW_TIMER(&_convert_time);
2582
274
        RETURN_IF_ERROR(_converter->convert(resolved_column, _field_schema->data_type, type,
2583
274
                                            doris_column, is_dict_filter));
2584
274
    }
2585
274
    return Status::OK();
2586
274
}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE16read_column_dataERNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERKSt10shared_ptrIKNS_9IDataTypeEERKS8_INS_23TableSchemaChangeHelper4NodeEERNS_9FilterMapEmPmPbbl
_ZN5doris18ScalarColumnReaderILb1ELb0EE16read_column_dataERNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERKSt10shared_ptrIKNS_9IDataTypeEERKS8_INS_23TableSchemaChangeHelper4NodeEERNS_9FilterMapEmPmPbbl
Line
Count
Source
2478
3
        int64_t real_column_size) {
2479
3
    if (_converter == nullptr) {
2480
3
        _converter = parquet::PhysicalToLogicalConverter::get_converter(
2481
3
                _field_schema, _field_schema->data_type, type, _ctz, is_dict_filter);
2482
3
        if (!_converter->support()) {
2483
0
            return Status::InternalError(
2484
0
                    "The column type of '{}' is not supported: {}, is_dict_filter: {}, "
2485
0
                    "src_logical_type: {}, dst_logical_type: {}",
2486
0
                    _field_schema->name, _converter->get_error_msg(), is_dict_filter,
2487
0
                    _field_schema->data_type->get_name(), type->get_name());
2488
0
        }
2489
3
    }
2490
    // !FIXME: We should verify whether the get_physical_column logic is correct, why do we return a doris_column?
2491
3
    ColumnPtr resolved_column =
2492
3
            _converter->get_physical_column(_field_schema->physical_type, _field_schema->data_type,
2493
3
                                            doris_column, type, is_dict_filter);
2494
3
    DataTypePtr& resolved_type = _converter->get_physical_type();
2495
2496
3
    _def_levels.clear();
2497
3
    _rep_levels.clear();
2498
3
    *read_rows = 0;
2499
2500
3
    if (_in_nested) {
2501
3
        RETURN_IF_ERROR(_read_nested_column(resolved_column, resolved_type, filter_map, batch_size,
2502
3
                                            read_rows, eof, is_dict_filter));
2503
3
        return _converter->convert(resolved_column, _field_schema->data_type, type, doris_column,
2504
3
                                   is_dict_filter);
2505
3
    }
2506
2507
0
    int64_t right_row = 0;
2508
0
    if constexpr (OFFSET_INDEX == false) {
2509
0
        RETURN_IF_ERROR(_chunk_reader->parse_page_header());
2510
0
        right_row = _chunk_reader->page_end_row();
2511
    } else {
2512
        right_row = _chunk_reader->page_end_row();
2513
    }
2514
2515
0
    do {
2516
        // generate the row ranges that should be read
2517
0
        RowRanges read_ranges;
2518
0
        _generate_read_ranges(RowRange {_current_row_index, right_row}, &read_ranges);
2519
0
        if (read_ranges.count() == 0) {
2520
            // skip the whole page
2521
0
            _current_row_index = right_row;
2522
0
        } else {
2523
0
            bool skip_whole_batch = false;
2524
            // Determining whether to skip page or batch will increase the calculation time.
2525
            // When the filtering effect is greater than 60%, it is possible to skip the page or batch.
2526
0
            if (filter_map.has_filter() && filter_map.filter_ratio() > 0.6) {
2527
                // lazy read
2528
0
                size_t remaining_num_values = read_ranges.count();
2529
0
                if (batch_size >= remaining_num_values &&
2530
0
                    filter_map.can_filter_all(remaining_num_values, _filter_map_index)) {
2531
                    // We can skip the whole page if the remaining values are filtered by predicate columns
2532
0
                    _filter_map_index += remaining_num_values;
2533
0
                    _current_row_index = right_row;
2534
0
                    *read_rows = remaining_num_values;
2535
0
                    break;
2536
0
                }
2537
0
                skip_whole_batch = batch_size <= remaining_num_values &&
2538
0
                                   filter_map.can_filter_all(batch_size, _filter_map_index);
2539
0
                if (skip_whole_batch) {
2540
0
                    _filter_map_index += batch_size;
2541
0
                }
2542
0
            }
2543
            // load page data to decode or skip values
2544
0
            RETURN_IF_ERROR(_chunk_reader->parse_page_header());
2545
0
            RETURN_IF_ERROR(_chunk_reader->load_page_data_idempotent());
2546
0
            size_t has_read = 0;
2547
0
            for (size_t idx = 0; idx < read_ranges.range_size(); idx++) {
2548
0
                auto range = read_ranges.get_range(idx);
2549
                // generate the skipped values
2550
0
                size_t skip_values = range.from() - _current_row_index;
2551
0
                RETURN_IF_ERROR(_skip_values(skip_values));
2552
0
                _current_row_index += skip_values;
2553
                // generate the read values
2554
0
                size_t read_values =
2555
0
                        std::min((size_t)(range.to() - range.from()), batch_size - has_read);
2556
0
                if (skip_whole_batch) {
2557
0
                    RETURN_IF_ERROR(_skip_values(read_values));
2558
0
                } else {
2559
0
                    RETURN_IF_ERROR(_read_values(read_values, resolved_column, resolved_type,
2560
0
                                                 filter_map, is_dict_filter));
2561
0
                }
2562
0
                has_read += read_values;
2563
0
                *read_rows += read_values;
2564
0
                _current_row_index += read_values;
2565
0
                if (has_read == batch_size) {
2566
0
                    break;
2567
0
                }
2568
0
            }
2569
0
        }
2570
0
    } while (false);
2571
2572
0
    if (right_row == _current_row_index) {
2573
0
        if (!_chunk_reader->has_next_page()) {
2574
0
            *eof = true;
2575
0
        } else {
2576
0
            RETURN_IF_ERROR(_chunk_reader->next_page());
2577
0
        }
2578
0
    }
2579
2580
0
    {
2581
0
        SCOPED_RAW_TIMER(&_convert_time);
2582
0
        RETURN_IF_ERROR(_converter->convert(resolved_column, _field_schema->data_type, type,
2583
0
                                            doris_column, is_dict_filter));
2584
0
    }
2585
0
    return Status::OK();
2586
0
}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE16read_column_dataERNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERKSt10shared_ptrIKNS_9IDataTypeEERKS8_INS_23TableSchemaChangeHelper4NodeEERNS_9FilterMapEmPmPbbl
_ZN5doris18ScalarColumnReaderILb0ELb0EE16read_column_dataERNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERKSt10shared_ptrIKNS_9IDataTypeEERKS8_INS_23TableSchemaChangeHelper4NodeEERNS_9FilterMapEmPmPbbl
Line
Count
Source
2478
284
        int64_t real_column_size) {
2479
284
    if (_converter == nullptr) {
2480
111
        _converter = parquet::PhysicalToLogicalConverter::get_converter(
2481
111
                _field_schema, _field_schema->data_type, type, _ctz, is_dict_filter);
2482
111
        if (!_converter->support()) {
2483
0
            return Status::InternalError(
2484
0
                    "The column type of '{}' is not supported: {}, is_dict_filter: {}, "
2485
0
                    "src_logical_type: {}, dst_logical_type: {}",
2486
0
                    _field_schema->name, _converter->get_error_msg(), is_dict_filter,
2487
0
                    _field_schema->data_type->get_name(), type->get_name());
2488
0
        }
2489
111
    }
2490
    // !FIXME: We should verify whether the get_physical_column logic is correct, why do we return a doris_column?
2491
284
    ColumnPtr resolved_column =
2492
284
            _converter->get_physical_column(_field_schema->physical_type, _field_schema->data_type,
2493
284
                                            doris_column, type, is_dict_filter);
2494
284
    DataTypePtr& resolved_type = _converter->get_physical_type();
2495
2496
284
    _def_levels.clear();
2497
284
    _rep_levels.clear();
2498
284
    *read_rows = 0;
2499
2500
284
    if (_in_nested) {
2501
10
        RETURN_IF_ERROR(_read_nested_column(resolved_column, resolved_type, filter_map, batch_size,
2502
10
                                            read_rows, eof, is_dict_filter));
2503
10
        return _converter->convert(resolved_column, _field_schema->data_type, type, doris_column,
2504
10
                                   is_dict_filter);
2505
10
    }
2506
2507
274
    int64_t right_row = 0;
2508
274
    if constexpr (OFFSET_INDEX == false) {
2509
274
        RETURN_IF_ERROR(_chunk_reader->parse_page_header());
2510
274
        right_row = _chunk_reader->page_end_row();
2511
    } else {
2512
        right_row = _chunk_reader->page_end_row();
2513
    }
2514
2515
274
    do {
2516
        // generate the row ranges that should be read
2517
274
        RowRanges read_ranges;
2518
274
        _generate_read_ranges(RowRange {_current_row_index, right_row}, &read_ranges);
2519
274
        if (read_ranges.count() == 0) {
2520
            // skip the whole page
2521
63
            _current_row_index = right_row;
2522
211
        } else {
2523
211
            bool skip_whole_batch = false;
2524
            // Determining whether to skip page or batch will increase the calculation time.
2525
            // When the filtering effect is greater than 60%, it is possible to skip the page or batch.
2526
211
            if (filter_map.has_filter() && filter_map.filter_ratio() > 0.6) {
2527
                // lazy read
2528
0
                size_t remaining_num_values = read_ranges.count();
2529
0
                if (batch_size >= remaining_num_values &&
2530
0
                    filter_map.can_filter_all(remaining_num_values, _filter_map_index)) {
2531
                    // We can skip the whole page if the remaining values are filtered by predicate columns
2532
0
                    _filter_map_index += remaining_num_values;
2533
0
                    _current_row_index = right_row;
2534
0
                    *read_rows = remaining_num_values;
2535
0
                    break;
2536
0
                }
2537
0
                skip_whole_batch = batch_size <= remaining_num_values &&
2538
0
                                   filter_map.can_filter_all(batch_size, _filter_map_index);
2539
0
                if (skip_whole_batch) {
2540
0
                    _filter_map_index += batch_size;
2541
0
                }
2542
0
            }
2543
            // load page data to decode or skip values
2544
211
            RETURN_IF_ERROR(_chunk_reader->parse_page_header());
2545
211
            RETURN_IF_ERROR(_chunk_reader->load_page_data_idempotent());
2546
211
            size_t has_read = 0;
2547
344
            for (size_t idx = 0; idx < read_ranges.range_size(); idx++) {
2548
244
                auto range = read_ranges.get_range(idx);
2549
                // generate the skipped values
2550
244
                size_t skip_values = range.from() - _current_row_index;
2551
244
                RETURN_IF_ERROR(_skip_values(skip_values));
2552
244
                _current_row_index += skip_values;
2553
                // generate the read values
2554
244
                size_t read_values =
2555
244
                        std::min((size_t)(range.to() - range.from()), batch_size - has_read);
2556
244
                if (skip_whole_batch) {
2557
0
                    RETURN_IF_ERROR(_skip_values(read_values));
2558
244
                } else {
2559
244
                    RETURN_IF_ERROR(_read_values(read_values, resolved_column, resolved_type,
2560
244
                                                 filter_map, is_dict_filter));
2561
244
                }
2562
244
                has_read += read_values;
2563
244
                *read_rows += read_values;
2564
244
                _current_row_index += read_values;
2565
244
                if (has_read == batch_size) {
2566
111
                    break;
2567
111
                }
2568
244
            }
2569
211
        }
2570
274
    } while (false);
2571
2572
274
    if (right_row == _current_row_index) {
2573
101
        if (!_chunk_reader->has_next_page()) {
2574
101
            *eof = true;
2575
101
        } else {
2576
0
            RETURN_IF_ERROR(_chunk_reader->next_page());
2577
0
        }
2578
101
    }
2579
2580
274
    {
2581
274
        SCOPED_RAW_TIMER(&_convert_time);
2582
274
        RETURN_IF_ERROR(_converter->convert(resolved_column, _field_schema->data_type, type,
2583
274
                                            doris_column, is_dict_filter));
2584
274
    }
2585
274
    return Status::OK();
2586
274
}
2587
2588
Status ArrayColumnReader::init(std::unique_ptr<ParquetColumnReader> element_reader,
2589
2
                               FieldSchema* field, bool offset_only) {
2590
2
    _field_schema = field;
2591
2
    _element_reader = std::move(element_reader);
2592
2
    _offset_only = offset_only;
2593
2
    return Status::OK();
2594
2
}
2595
2596
Status ArrayColumnReader::read_column_data(
2597
        ColumnPtr& doris_column, const DataTypePtr& type,
2598
        const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, FilterMap& filter_map,
2599
        size_t batch_size, size_t* read_rows, bool* eof, bool is_dict_filter,
2600
2
        int64_t real_column_size) {
2601
2
    MutableColumnPtr data_column;
2602
2
    NullMap* null_map_ptr = nullptr;
2603
2
    if (doris_column->is_nullable()) {
2604
2
        auto mutable_column = doris_column->assume_mutable();
2605
2
        auto* nullable_column = assert_cast<ColumnNullable*>(mutable_column.get());
2606
2
        null_map_ptr = &nullable_column->get_null_map_data();
2607
2
        data_column = nullable_column->get_nested_column_ptr();
2608
2
    } else {
2609
0
        if (_field_schema->data_type->is_nullable()) {
2610
0
            return Status::Corruption("Not nullable column has null values in parquet file");
2611
0
        }
2612
0
        data_column = doris_column->assume_mutable();
2613
0
    }
2614
2
    if (type->get_primitive_type() != PrimitiveType::TYPE_ARRAY) {
2615
0
        return Status::Corruption(
2616
0
                "Wrong data type for column '{}', expected Array type, actual type: {}.",
2617
0
                _field_schema->name, type->get_name());
2618
0
    }
2619
2620
2
    ColumnPtr& element_column = assert_cast<ColumnArray&>(*data_column).get_data_ptr();
2621
2
    const DataTypePtr& element_type =
2622
2
            (assert_cast<const DataTypeArray*>(remove_nullable(type).get()))->get_nested_type();
2623
2
    if (_offset_only) {
2624
        // Cardinality needs collection levels and offsets, but not element payloads.
2625
0
        RETURN_IF_ERROR(
2626
0
                _element_reader->read_nested_levels(filter_map, batch_size, read_rows, eof));
2627
2
    } else {
2628
2
        RETURN_IF_ERROR(_element_reader->read_column_data(
2629
2
                element_column, element_type, root_node->get_element_node(), filter_map, batch_size,
2630
2
                read_rows, eof, is_dict_filter));
2631
2
    }
2632
2
    if (*read_rows == 0) {
2633
0
        return Status::OK();
2634
0
    }
2635
2636
2
    ColumnArray::Offsets64& offsets_data = assert_cast<ColumnArray&>(*data_column).get_offsets();
2637
    // fill offset and null map
2638
2
    fill_array_offset(_field_schema, offsets_data, null_map_ptr, _element_reader->get_rep_level(),
2639
2
                      _element_reader->get_def_level());
2640
2
    if (_offset_only && offsets_data.back() > element_column->size()) {
2641
0
        auto mutable_element_column = element_column->assume_mutable();
2642
0
        mutable_element_column->insert_many_defaults(offsets_data.back() - element_column->size());
2643
0
        element_column = std::move(mutable_element_column);
2644
0
    }
2645
2
    DCHECK_EQ(element_column->size(), offsets_data.back());
2646
2
#ifndef NDEBUG
2647
2
    doris_column->sanity_check();
2648
2
#endif
2649
2
    return Status::OK();
2650
2
}
2651
2652
Status MapColumnReader::init(std::unique_ptr<ParquetColumnReader> key_reader,
2653
                             std::unique_ptr<ParquetColumnReader> value_reader,
2654
0
                             FieldSchema* field) {
2655
0
    _field_schema = field;
2656
0
    _key_reader = std::move(key_reader);
2657
0
    _value_reader = std::move(value_reader);
2658
0
    return Status::OK();
2659
0
}
2660
2661
Status MapColumnReader::read_column_data(
2662
        ColumnPtr& doris_column, const DataTypePtr& type,
2663
        const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, FilterMap& filter_map,
2664
        size_t batch_size, size_t* read_rows, bool* eof, bool is_dict_filter,
2665
0
        int64_t real_column_size) {
2666
0
    MutableColumnPtr data_column;
2667
0
    NullMap* null_map_ptr = nullptr;
2668
0
    if (doris_column->is_nullable()) {
2669
0
        auto mutable_column = doris_column->assume_mutable();
2670
0
        auto* nullable_column = assert_cast<ColumnNullable*>(mutable_column.get());
2671
0
        null_map_ptr = &nullable_column->get_null_map_data();
2672
0
        data_column = nullable_column->get_nested_column_ptr();
2673
0
    } else {
2674
0
        if (_field_schema->data_type->is_nullable()) {
2675
0
            return Status::Corruption("Not nullable column has null values in parquet file");
2676
0
        }
2677
0
        data_column = doris_column->assume_mutable();
2678
0
    }
2679
0
    if (remove_nullable(type)->get_primitive_type() != PrimitiveType::TYPE_MAP) {
2680
0
        return Status::Corruption(
2681
0
                "Wrong data type for column '{}', expected Map type, actual type id {}.",
2682
0
                _field_schema->name, type->get_name());
2683
0
    }
2684
2685
0
    auto& map = assert_cast<ColumnMap&>(*data_column);
2686
0
    const DataTypePtr& key_type =
2687
0
            assert_cast<const DataTypeMap*>(remove_nullable(type).get())->get_key_type();
2688
0
    const DataTypePtr& value_type =
2689
0
            assert_cast<const DataTypeMap*>(remove_nullable(type).get())->get_value_type();
2690
0
    ColumnPtr& key_column = map.get_keys_ptr();
2691
0
    ColumnPtr& value_column = map.get_values_ptr();
2692
2693
0
    size_t key_rows = 0;
2694
0
    size_t value_rows = 0;
2695
0
    bool key_eof = false;
2696
0
    bool value_eof = false;
2697
0
    int64_t orig_col_column_size = key_column->size();
2698
2699
0
    RETURN_IF_ERROR(_key_reader->read_column_data(key_column, key_type, root_node->get_key_node(),
2700
0
                                                  filter_map, batch_size, &key_rows, &key_eof,
2701
0
                                                  is_dict_filter));
2702
2703
0
    while (value_rows < key_rows && !value_eof) {
2704
0
        size_t loop_rows = 0;
2705
0
        RETURN_IF_ERROR(_value_reader->read_column_data(
2706
0
                value_column, value_type, root_node->get_value_node(), filter_map,
2707
0
                key_rows - value_rows, &loop_rows, &value_eof, is_dict_filter,
2708
0
                key_column->size() - orig_col_column_size));
2709
0
        value_rows += loop_rows;
2710
0
    }
2711
0
    DCHECK_EQ(key_rows, value_rows);
2712
0
    *read_rows = key_rows;
2713
0
    *eof = key_eof;
2714
2715
0
    if (*read_rows == 0) {
2716
0
        return Status::OK();
2717
0
    }
2718
2719
0
    DCHECK_EQ(key_column->size(), value_column->size());
2720
    // fill offset and null map
2721
0
    fill_array_offset(_field_schema, map.get_offsets(), null_map_ptr, _key_reader->get_rep_level(),
2722
0
                      _key_reader->get_def_level());
2723
0
    DCHECK_EQ(key_column->size(), map.get_offsets().back());
2724
0
#ifndef NDEBUG
2725
0
    doris_column->sanity_check();
2726
0
#endif
2727
0
    return Status::OK();
2728
0
}
2729
2730
Status StructColumnReader::init(
2731
        std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>>&& child_readers,
2732
11
        FieldSchema* field) {
2733
11
    _field_schema = field;
2734
11
    _child_readers = std::move(child_readers);
2735
11
    return Status::OK();
2736
11
}
2737
2738
Status StructColumnReader::read_nested_levels(FilterMap& filter_map, size_t batch_size,
2739
0
                                              size_t* read_rows, bool* eof) {
2740
0
    _read_column_names.clear();
2741
0
    for (const auto& child : _field_schema->children) {
2742
0
        auto it = _child_readers.find(child.name);
2743
0
        if (it == _child_readers.end() ||
2744
0
            dynamic_cast<SkipReadingReader*>(it->second.get()) != nullptr) {
2745
0
            continue;
2746
0
        }
2747
0
        _read_column_names.emplace_back(child.name);
2748
0
        return it->second->read_nested_levels(filter_map, batch_size, read_rows, eof);
2749
0
    }
2750
0
    return Status::Corruption("Cannot read struct '{}' levels without a reference column",
2751
0
                              _field_schema->name);
2752
0
}
2753
2754
// Existing struct reader coordinates child readers, missing columns, and selection state.
2755
// NOLINTNEXTLINE(readability-function-cognitive-complexity,readability-function-size)
2756
Status StructColumnReader::read_column_data(
2757
        ColumnPtr& doris_column, const DataTypePtr& type,
2758
        const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, FilterMap& filter_map,
2759
        size_t batch_size, size_t* read_rows, bool* eof, bool is_dict_filter,
2760
11
        int64_t real_column_size) {
2761
11
    MutableColumnPtr data_column;
2762
11
    NullMap* null_map_ptr = nullptr;
2763
11
    if (doris_column->is_nullable()) {
2764
11
        auto mutable_column = doris_column->assume_mutable();
2765
11
        auto* nullable_column = assert_cast<ColumnNullable*>(mutable_column.get());
2766
11
        null_map_ptr = &nullable_column->get_null_map_data();
2767
11
        data_column = nullable_column->get_nested_column_ptr();
2768
11
    } else {
2769
0
        if (_field_schema->data_type->is_nullable()) {
2770
0
            return Status::Corruption("Not nullable column has null values in parquet file");
2771
0
        }
2772
0
        data_column = doris_column->assume_mutable();
2773
0
    }
2774
11
    if (type->get_primitive_type() != PrimitiveType::TYPE_STRUCT) {
2775
0
        return Status::Corruption(
2776
0
                "Wrong data type for column '{}', expected Struct type, actual type id {}.",
2777
0
                _field_schema->name, type->get_name());
2778
0
    }
2779
2780
11
    auto& doris_struct = assert_cast<ColumnStruct&>(*data_column);
2781
11
    const auto* doris_struct_type = assert_cast<const DataTypeStruct*>(remove_nullable(type).get());
2782
2783
11
    int64_t not_missing_column_id = -1;
2784
11
    size_t not_missing_orig_column_size = 0;
2785
11
    std::vector<size_t> missing_column_idxs {};
2786
11
    std::vector<size_t> skip_reading_column_idxs {};
2787
2788
11
    _read_column_names.clear();
2789
2790
37
    for (size_t i = 0; i < doris_struct.tuple_size(); ++i) {
2791
26
        ColumnPtr& doris_field = doris_struct.get_column_ptr(i);
2792
26
        const auto& doris_type = doris_struct_type->get_element(i);
2793
26
        const auto& doris_name = doris_struct_type->get_element_name(i);
2794
26
        if (!root_node->children_column_exists(doris_name)) {
2795
0
            missing_column_idxs.push_back(i);
2796
0
            VLOG_DEBUG << "[ParquetReader] Missing column in schema: column_idx[" << i
2797
0
                       << "], doris_name: " << doris_name << " (column not exists in root node)";
2798
0
            continue;
2799
0
        }
2800
26
        auto file_name = root_node->children_file_column_name(doris_name);
2801
2802
        // Check if this is a SkipReadingReader - we should skip it when choosing reference column
2803
        // because SkipReadingReader doesn't know the actual data size in nested context
2804
26
        bool is_skip_reader =
2805
26
                dynamic_cast<SkipReadingReader*>(_child_readers[file_name].get()) != nullptr;
2806
2807
26
        if (is_skip_reader) {
2808
            // Store SkipReadingReader columns to fill them later based on reference column size
2809
4
            skip_reading_column_idxs.push_back(i);
2810
4
            continue;
2811
4
        }
2812
2813
        // Only add non-SkipReadingReader columns to _read_column_names
2814
        // This ensures get_rep_level() and get_def_level() return valid levels
2815
22
        _read_column_names.emplace_back(file_name);
2816
2817
22
        size_t field_rows = 0;
2818
22
        bool field_eof = false;
2819
22
        if (not_missing_column_id == -1) {
2820
11
            not_missing_column_id = i;
2821
11
            not_missing_orig_column_size = doris_field->size();
2822
11
            RETURN_IF_ERROR(_child_readers[file_name]->read_column_data(
2823
11
                    doris_field, doris_type, root_node->get_children_node(doris_name), filter_map,
2824
11
                    batch_size, &field_rows, &field_eof, is_dict_filter));
2825
11
            *read_rows = field_rows;
2826
11
            *eof = field_eof;
2827
            /*
2828
             * Considering the issue in the `_read_nested_column` function where data may span across pages, leading
2829
             * to missing definition and repetition levels, when filling the null_map of the struct later, it is
2830
             * crucial to use the definition and repetition levels from the first read column
2831
             * (since `_read_nested_column` is not called repeatedly).
2832
             *
2833
             *  It is worth mentioning that, theoretically, any sub-column can be chosen to fill the null_map,
2834
             *  and selecting the shortest one will offer better performance
2835
             */
2836
11
        } else {
2837
22
            while (field_rows < *read_rows && !field_eof) {
2838
11
                size_t loop_rows = 0;
2839
11
                RETURN_IF_ERROR(_child_readers[file_name]->read_column_data(
2840
11
                        doris_field, doris_type, root_node->get_children_node(doris_name),
2841
11
                        filter_map, *read_rows - field_rows, &loop_rows, &field_eof,
2842
11
                        is_dict_filter));
2843
11
                field_rows += loop_rows;
2844
11
            }
2845
11
            DCHECK_EQ(*read_rows, field_rows);
2846
            //            DCHECK_EQ(*eof, field_eof);
2847
11
        }
2848
22
    }
2849
2850
11
    int64_t missing_column_sz = -1;
2851
2852
11
    if (not_missing_column_id == -1) {
2853
        // All queried columns are missing in the file (e.g., all added after schema change)
2854
        // We need to pick a column from _field_schema children that exists in the file for RL/DL reference
2855
0
        std::string reference_file_column_name;
2856
0
        std::unique_ptr<ParquetColumnReader>* reference_reader = nullptr;
2857
2858
0
        for (const auto& child : _field_schema->children) {
2859
0
            auto it = _child_readers.find(child.name);
2860
0
            if (it != _child_readers.end()) {
2861
                // Skip SkipReadingReader as they don't have valid RL/DL
2862
0
                bool is_skip_reader = dynamic_cast<SkipReadingReader*>(it->second.get()) != nullptr;
2863
0
                if (!is_skip_reader) {
2864
0
                    reference_file_column_name = child.name;
2865
0
                    reference_reader = &(it->second);
2866
0
                    break;
2867
0
                }
2868
0
            }
2869
0
        }
2870
2871
0
        if (reference_reader != nullptr) {
2872
            // Read the reference column to get correct RL/DL information
2873
            // TODO: Optimize by only reading RL/DL without actual data decoding
2874
2875
            // We need to find the FieldSchema for the reference column from _field_schema children
2876
0
            FieldSchema* ref_field_schema = nullptr;
2877
0
            for (auto& child : _field_schema->children) {
2878
0
                if (child.name == reference_file_column_name) {
2879
0
                    ref_field_schema = &child;
2880
0
                    break;
2881
0
                }
2882
0
            }
2883
2884
0
            if (ref_field_schema == nullptr) {
2885
0
                return Status::InternalError(
2886
0
                        "Cannot find field schema for reference column '{}' in struct '{}'",
2887
0
                        reference_file_column_name, _field_schema->name);
2888
0
            }
2889
2890
            // Create a temporary column to hold the data (we'll use its size for missing_column_sz)
2891
0
            ColumnPtr temp_column = ref_field_schema->data_type->create_column();
2892
0
            auto temp_type = ref_field_schema->data_type;
2893
2894
0
            size_t field_rows = 0;
2895
0
            bool field_eof = false;
2896
2897
            // Use ConstNode for the reference column instead of looking up from root_node.
2898
            // The reference column is only used to get RL/DL information for determining the number
2899
            // of elements in the struct. It may be a column that has been dropped from the table
2900
            // schema (e.g., 'removed' field), but still exists in older parquet files.
2901
            // Since we don't need schema mapping for this column (we just need its RL/DL levels),
2902
            // using ConstNode is safe and avoids the issue where the reference column doesn't exist
2903
            // in root_node (because it was dropped from table schema).
2904
0
            auto ref_child_node = TableSchemaChangeHelper::ConstNode::get_instance();
2905
0
            not_missing_orig_column_size = temp_column->size();
2906
2907
0
            RETURN_IF_ERROR((*reference_reader)
2908
0
                                    ->read_column_data(temp_column, temp_type, ref_child_node,
2909
0
                                                       filter_map, batch_size, &field_rows,
2910
0
                                                       &field_eof, is_dict_filter));
2911
2912
0
            *read_rows = field_rows;
2913
0
            *eof = field_eof;
2914
2915
            // Store this reference column name for get_rep_level/get_def_level to use
2916
0
            _read_column_names.emplace_back(reference_file_column_name);
2917
2918
0
            missing_column_sz = temp_column->size() - not_missing_orig_column_size;
2919
0
        } else {
2920
0
            return Status::Corruption(
2921
0
                    "Cannot read struct '{}': all queried columns are missing and no reference "
2922
0
                    "column found in file",
2923
0
                    _field_schema->name);
2924
0
        }
2925
0
    }
2926
2927
    //  This missing_column_sz is not *read_rows. Because read_rows returns the number of rows.
2928
    //  For example: suppose we have a column array<struct<a:int,b:string>>,
2929
    //  where b is a newly added column, that is, a missing column.
2930
    //  There are two rows of data in this column,
2931
    //      [{1,null},{2,null},{3,null}]
2932
    //      [{4,null},{5,null}]
2933
    //  When you first read subcolumn a, you read 5 data items and the value of *read_rows is 2.
2934
    //  You should insert 5 records into subcolumn b instead of 2.
2935
11
    if (missing_column_sz == -1) {
2936
11
        missing_column_sz = doris_struct.get_column(not_missing_column_id).size() -
2937
11
                            not_missing_orig_column_size;
2938
11
    }
2939
2940
    // Fill SkipReadingReader columns with the correct amount of data based on reference column
2941
    // Let SkipReadingReader handle the data filling through its read_column_data method
2942
11
    for (auto idx : skip_reading_column_idxs) {
2943
4
        auto& doris_field = doris_struct.get_column_ptr(idx);
2944
4
        auto& doris_type = const_cast<DataTypePtr&>(doris_struct_type->get_element(idx));
2945
4
        auto& doris_name = const_cast<String&>(doris_struct_type->get_element_name(idx));
2946
4
        auto file_name = root_node->children_file_column_name(doris_name);
2947
2948
4
        size_t field_rows = 0;
2949
4
        bool field_eof = false;
2950
4
        RETURN_IF_ERROR(_child_readers[file_name]->read_column_data(
2951
4
                doris_field, doris_type, root_node->get_children_node(doris_name), filter_map,
2952
4
                missing_column_sz, &field_rows, &field_eof, is_dict_filter, missing_column_sz));
2953
4
    }
2954
2955
    // Fill truly missing columns (not in root_node) with null or default value
2956
11
    for (auto idx : missing_column_idxs) {
2957
0
        auto& doris_field = doris_struct.get_column_ptr(idx);
2958
0
        const auto& doris_type = doris_struct_type->get_element(idx);
2959
0
        DCHECK(doris_type->is_nullable());
2960
0
        auto mutable_column = doris_field->assume_mutable();
2961
0
        auto* nullable_column = static_cast<ColumnNullable*>(mutable_column.get());
2962
0
        nullable_column->insert_many_defaults(missing_column_sz);
2963
0
    }
2964
2965
11
    if (null_map_ptr != nullptr) {
2966
11
        fill_struct_null_map(_field_schema, *null_map_ptr, this->get_rep_level(),
2967
11
                             this->get_def_level());
2968
11
    }
2969
11
#ifndef NDEBUG
2970
11
    doris_column->sanity_check();
2971
11
#endif
2972
11
    return Status::OK();
2973
11
}
2974
2975
Status VariantColumnReader::init(io::FileReaderSPtr file, FieldSchema* field,
2976
                                 const tparquet::RowGroup& row_group, size_t max_buf_size,
2977
                                 std::unordered_map<int, tparquet::OffsetIndex>& col_offsets,
2978
                                 RuntimeState* state, bool in_collection,
2979
                                 const std::set<uint64_t>& column_ids,
2980
0
                                 const std::set<uint64_t>& filter_column_ids) {
2981
0
    _field_schema = field;
2982
0
    _column_ids = column_ids;
2983
0
    _variant_struct_field = std::make_unique<FieldSchema>(*field);
2984
2985
0
    DataTypePtr variant_struct_type = make_variant_struct_reader_type(*field);
2986
0
    _variant_struct_field->data_type = variant_struct_type;
2987
2988
0
    RETURN_IF_ERROR(ParquetColumnReader::create(file, _variant_struct_field.get(), row_group,
2989
0
                                                _row_ranges, _ctz, _io_ctx, _struct_reader,
2990
0
                                                max_buf_size, col_offsets, state, in_collection,
2991
0
                                                column_ids, filter_column_ids));
2992
0
    _struct_reader->set_column_in_nested();
2993
0
    return Status::OK();
2994
0
}
2995
2996
Status VariantColumnReader::read_column_data(
2997
        ColumnPtr& doris_column, const DataTypePtr& type,
2998
        const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, FilterMap& filter_map,
2999
        size_t batch_size, size_t* read_rows, bool* eof, bool is_dict_filter,
3000
0
        int64_t real_column_size) {
3001
0
    (void)root_node;
3002
0
    if (remove_nullable(type)->get_primitive_type() != PrimitiveType::TYPE_VARIANT) {
3003
0
        return Status::Corruption(
3004
0
                "Wrong data type for column '{}', expected Variant type, actual type: {}.",
3005
0
                _field_schema->name, type->get_name());
3006
0
    }
3007
3008
0
    const auto& variant_struct_type = _variant_struct_field->data_type;
3009
0
    ColumnPtr struct_column = make_variant_struct_read_column(*_field_schema, variant_struct_type);
3010
0
    const size_t old_struct_rows = struct_column->size();
3011
0
    auto const_node = TableSchemaChangeHelper::ConstNode::get_instance();
3012
0
    RETURN_IF_ERROR(_struct_reader->read_column_data(struct_column, variant_struct_type, const_node,
3013
0
                                                     filter_map, batch_size, read_rows, eof,
3014
0
                                                     is_dict_filter, real_column_size));
3015
3016
0
    const size_t new_struct_rows = struct_column->size() - old_struct_rows;
3017
0
    if (new_struct_rows == 0) {
3018
0
        return Status::OK();
3019
0
    }
3020
3021
0
    const IColumn* variant_struct_source = struct_column.get();
3022
0
    const NullMap* struct_null_map = nullptr;
3023
0
    if (const auto* nullable_struct = check_and_get_column<ColumnNullable>(variant_struct_source)) {
3024
0
        struct_null_map = &nullable_struct->get_null_map_data();
3025
0
        variant_struct_source = &nullable_struct->get_nested_column();
3026
0
    }
3027
0
    const auto& variant_struct_column = assert_cast<const ColumnStruct&>(*variant_struct_source);
3028
3029
0
    RETURN_IF_ERROR(append_variant_struct_rows_to_column(
3030
0
            *_field_schema, variant_struct_column, struct_null_map, old_struct_rows,
3031
0
            new_struct_rows, _column_ids, doris_column, &_variant_statistics));
3032
0
#ifndef NDEBUG
3033
0
    doris_column->sanity_check();
3034
0
#endif
3035
0
    return Status::OK();
3036
0
}
3037
3038
template class ScalarColumnReader<true, true>;
3039
template class ScalarColumnReader<true, false>;
3040
template class ScalarColumnReader<false, true>;
3041
template class ScalarColumnReader<false, false>;
3042
3043
}; // namespace doris