Coverage Report

Created: 2026-05-16 22:15

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/parquet/vparquet_column_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/parquet/vparquet_column_reader.h"
19
20
#include <cctz/time_zone.h>
21
#include <gen_cpp/parquet_types.h>
22
#include <rapidjson/document.h>
23
#include <sys/types.h>
24
25
#include <algorithm>
26
#include <cmath>
27
#include <limits>
28
#include <map>
29
#include <string_view>
30
#include <utility>
31
#include <vector>
32
33
#include "common/exception.h"
34
#include "common/status.h"
35
#include "core/column/column.h"
36
#include "core/column/column_array.h"
37
#include "core/column/column_map.h"
38
#include "core/column/column_nullable.h"
39
#include "core/column/column_string.h"
40
#include "core/column/column_struct.h"
41
#include "core/column/column_variant.h"
42
#include "core/data_type/data_type_array.h"
43
#include "core/data_type/data_type_factory.hpp"
44
#include "core/data_type/data_type_map.h"
45
#include "core/data_type/data_type_nullable.h"
46
#include "core/data_type/data_type_number.h"
47
#include "core/data_type/data_type_struct.h"
48
#include "core/data_type/data_type_variant.h"
49
#include "core/data_type/define_primitive_type.h"
50
#include "core/data_type_serde/data_type_serde.h"
51
#include "core/string_buffer.hpp"
52
#include "core/value/jsonb_value.h"
53
#include "core/value/timestamptz_value.h"
54
#include "core/value/vdatetime_value.h"
55
#include "exec/common/variant_util.h"
56
#include "format/parquet/level_decoder.h"
57
#include "format/parquet/parquet_variant_reader.h"
58
#include "format/parquet/schema_desc.h"
59
#include "format/parquet/vparquet_column_chunk_reader.h"
60
#include "io/fs/tracing_file_reader.h"
61
#include "runtime/runtime_profile.h"
62
#include "util/jsonb_document.h"
63
64
namespace doris {
65
static void fill_struct_null_map(FieldSchema* field, NullMap& null_map,
66
                                 const std::vector<level_t>& rep_levels,
67
11
                                 const std::vector<level_t>& def_levels) {
68
11
    size_t num_levels = def_levels.size();
69
11
    DCHECK_EQ(num_levels, rep_levels.size());
70
11
    size_t origin_size = null_map.size();
71
11
    null_map.resize(origin_size + num_levels);
72
11
    size_t pos = origin_size;
73
26
    for (size_t i = 0; i < num_levels; ++i) {
74
        // skip the levels affect its ancestor or its descendants
75
15
        if (def_levels[i] < field->repeated_parent_def_level ||
76
15
            rep_levels[i] > field->repetition_level) {
77
0
            continue;
78
0
        }
79
15
        if (def_levels[i] >= field->definition_level) {
80
15
            null_map[pos++] = 0;
81
15
        } else {
82
0
            null_map[pos++] = 1;
83
0
        }
84
15
    }
85
11
    null_map.resize(pos);
86
11
}
87
88
static void fill_array_offset(FieldSchema* field, ColumnArray::Offsets64& offsets_data,
89
                              NullMap* null_map_ptr, const std::vector<level_t>& rep_levels,
90
2
                              const std::vector<level_t>& def_levels) {
91
2
    size_t num_levels = rep_levels.size();
92
2
    DCHECK_EQ(num_levels, def_levels.size());
93
2
    size_t origin_size = offsets_data.size();
94
2
    offsets_data.resize(origin_size + num_levels);
95
2
    if (null_map_ptr != nullptr) {
96
2
        null_map_ptr->resize(origin_size + num_levels);
97
2
    }
98
2
    size_t offset_pos = origin_size - 1;
99
8
    for (size_t i = 0; i < num_levels; ++i) {
100
        // skip the levels affect its ancestor or its descendants
101
6
        if (def_levels[i] < field->repeated_parent_def_level ||
102
6
            rep_levels[i] > field->repetition_level) {
103
0
            continue;
104
0
        }
105
6
        if (rep_levels[i] == field->repetition_level) {
106
4
            offsets_data[offset_pos]++;
107
4
            continue;
108
4
        }
109
2
        offset_pos++;
110
2
        offsets_data[offset_pos] = offsets_data[offset_pos - 1];
111
2
        if (def_levels[i] >= field->definition_level) {
112
2
            offsets_data[offset_pos]++;
113
2
        }
114
2
        if (def_levels[i] >= field->definition_level - 1) {
115
2
            (*null_map_ptr)[offset_pos] = 0;
116
2
        } else {
117
0
            (*null_map_ptr)[offset_pos] = 1;
118
0
        }
119
2
    }
120
2
    offsets_data.resize(offset_pos + 1);
121
2
    if (null_map_ptr != nullptr) {
122
2
        null_map_ptr->resize(offset_pos + 1);
123
2
    }
124
2
}
125
126
static constexpr int64_t UNIX_EPOCH_DAYNR = 719528;
127
static constexpr int64_t MICROS_PER_SECOND = 1000000;
128
129
0
static int64_t variant_date_value(const VecDateTimeValue& value) {
130
0
    return value.daynr() - UNIX_EPOCH_DAYNR;
131
0
}
132
133
1
static int64_t variant_date_value(const DateV2Value<DateV2ValueType>& value) {
134
1
    return value.daynr() - UNIX_EPOCH_DAYNR;
135
1
}
136
137
0
static int64_t variant_datetime_value(const VecDateTimeValue& value) {
138
0
    int64_t timestamp = 0;
139
0
    value.unix_timestamp(&timestamp, cctz::utc_time_zone());
140
0
    return timestamp * MICROS_PER_SECOND;
141
0
}
142
143
1
static int64_t variant_datetime_value(const DateV2Value<DateTimeV2ValueType>& value) {
144
1
    int64_t timestamp = 0;
145
1
    value.unix_timestamp(&timestamp, cctz::utc_time_zone());
146
1
    return timestamp * MICROS_PER_SECOND + value.microsecond();
147
1
}
148
149
0
static int64_t variant_datetime_value(const TimestampTzValue& value) {
150
0
    int64_t timestamp = 0;
151
0
    value.unix_timestamp(&timestamp, cctz::utc_time_zone());
152
0
    return timestamp * MICROS_PER_SECOND + value.microsecond();
153
0
}
154
155
52
static int find_child_idx(const FieldSchema& field, std::string_view name) {
156
108
    for (int i = 0; i < field.children.size(); ++i) {
157
97
        if (field.children[i].lower_case_name == name) {
158
41
            return i;
159
41
        }
160
97
    }
161
11
    return -1;
162
52
}
163
164
0
static bool is_variant_wrapper_typed_value_child(const FieldSchema& field) {
165
0
    auto type = remove_nullable(field.data_type);
166
0
    return type->get_primitive_type() == TYPE_STRUCT || type->get_primitive_type() == TYPE_ARRAY;
167
0
}
168
169
static bool is_variant_wrapper_field(const FieldSchema& field,
170
                                     bool allow_scalar_typed_value_only_wrapper,
171
36
                                     bool allow_value_only_wrapper = false) {
172
36
    auto type = remove_nullable(field.data_type);
173
36
    if (type->get_primitive_type() != TYPE_STRUCT && type->get_primitive_type() != TYPE_VARIANT) {
174
30
        return false;
175
30
    }
176
177
6
    bool has_metadata = false;
178
6
    bool has_value = false;
179
6
    const FieldSchema* typed_value = nullptr;
180
10
    for (const auto& child : field.children) {
181
10
        if (child.lower_case_name == "metadata") {
182
1
            if (child.physical_type != tparquet::Type::BYTE_ARRAY) {
183
0
                return false;
184
0
            }
185
1
            has_metadata = true;
186
1
            continue;
187
1
        }
188
9
        if (child.lower_case_name == "value") {
189
5
            if (child.physical_type != tparquet::Type::BYTE_ARRAY) {
190
2
                return false;
191
2
            }
192
3
            has_value = true;
193
3
            continue;
194
5
        }
195
4
        if (child.lower_case_name == "typed_value") {
196
3
            typed_value = &child;
197
3
            continue;
198
3
        }
199
1
        return false;
200
4
    }
201
3
    if (has_metadata && has_value) {
202
1
        return type->get_primitive_type() == TYPE_VARIANT || typed_value != nullptr;
203
1
    }
204
2
    if (has_value) {
205
2
        return typed_value != nullptr || allow_value_only_wrapper;
206
2
    }
207
0
    return typed_value != nullptr && (allow_scalar_typed_value_only_wrapper ||
208
0
                                      is_variant_wrapper_typed_value_child(*typed_value));
209
2
}
210
211
22
static Status get_binary_field(const Field& field, std::string* value, bool* present) {
212
22
    if (field.is_null()) {
213
3
        *present = false;
214
3
        return Status::OK();
215
3
    }
216
19
    *present = true;
217
19
    switch (field.get_type()) {
218
19
    case TYPE_STRING:
219
19
        *value = field.get<TYPE_STRING>();
220
19
        return Status::OK();
221
0
    case TYPE_CHAR:
222
0
        *value = field.get<TYPE_CHAR>();
223
0
        return Status::OK();
224
0
    case TYPE_VARCHAR:
225
0
        *value = field.get<TYPE_VARCHAR>();
226
0
        return Status::OK();
227
0
    case TYPE_VARBINARY: {
228
0
        auto ref = field.get<TYPE_VARBINARY>().to_string_ref();
229
0
        value->assign(ref.data, ref.size);
230
0
        return Status::OK();
231
0
    }
232
0
    default:
233
0
        return Status::Corruption("Parquet VARIANT binary field has unexpected Doris type {}",
234
0
                                  field.get_type_name());
235
19
    }
236
19
}
237
238
8
static PathInData append_path(const PathInData& prefix, const PathInData& suffix) {
239
8
    if (prefix.empty()) {
240
7
        return suffix;
241
7
    }
242
1
    if (suffix.empty()) {
243
0
        return prefix;
244
0
    }
245
1
    PathInDataBuilder builder;
246
1
    builder.append(prefix.get_parts(), false);
247
1
    builder.append(suffix.get_parts(), false);
248
1
    return builder.build();
249
1
}
250
251
7
static Status insert_empty_object_marker(const PathInData& path, VariantMap* values) {
252
7
    JsonBinaryValue empty_object;
253
7
    RETURN_IF_ERROR(empty_object.from_json_string("{}"));
254
7
    (*values)[path] = FieldWithDataType {.field = Field::create_field<TYPE_JSONB>(JsonbField(
255
7
                                                 empty_object.value(), empty_object.size())),
256
7
                                         .base_scalar_type_id = TYPE_JSONB};
257
7
    return Status::OK();
258
7
}
259
260
25
static bool is_empty_object_marker(const FieldWithDataType& value) {
261
25
    if (value.field.get_type() != TYPE_JSONB) {
262
15
        return false;
263
15
    }
264
10
    const auto& jsonb = value.field.get<TYPE_JSONB>();
265
10
    const JsonbDocument* document = nullptr;
266
10
    Status st =
267
10
            JsonbDocument::checkAndCreateDocument(jsonb.get_value(), jsonb.get_size(), &document);
268
10
    if (!st.ok() || document == nullptr || document->getValue() == nullptr ||
269
10
        !document->getValue()->isObject()) {
270
0
        return false;
271
0
    }
272
10
    return document->getValue()->unpack<ObjectVal>()->numElem() == 0;
273
10
}
274
275
static Status collect_empty_object_markers(const rapidjson::Value& value, PathInDataBuilder* path,
276
5
                                           VariantMap* values) {
277
5
    if (!value.IsObject()) {
278
0
        return Status::OK();
279
0
    }
280
5
    if (value.MemberCount() == 0) {
281
4
        return insert_empty_object_marker(path->build(), values);
282
4
    }
283
2
    for (auto it = value.MemberBegin(); it != value.MemberEnd(); ++it) {
284
1
        if (it->value.IsObject()) {
285
1
            path->append(std::string_view(it->name.GetString(), it->name.GetStringLength()), false);
286
1
            RETURN_IF_ERROR(collect_empty_object_markers(it->value, path, values));
287
1
            path->pop_back();
288
1
        }
289
1
    }
290
1
    return Status::OK();
291
1
}
292
293
static Status add_empty_object_markers_from_json(const std::string& json, const PathInData& prefix,
294
12
                                                 VariantMap* values) {
295
12
    if (json.find("{}") == std::string::npos) {
296
8
        return Status::OK();
297
8
    }
298
4
    rapidjson::Document document;
299
4
    document.Parse(json.data(), json.size());
300
4
    if (document.HasParseError()) {
301
0
        return Status::Corruption("Invalid Parquet VARIANT decoded JSON");
302
0
    }
303
4
    PathInDataBuilder path;
304
4
    path.append(prefix.get_parts(), false);
305
4
    return collect_empty_object_markers(document, &path, values);
306
4
}
307
308
static Status parse_json_to_variant_map(const std::string& json, const PathInData& prefix,
309
12
                                        VariantMap* values) {
310
12
    auto parsed_column = ColumnVariant::create(0, false);
311
12
    ParseConfig parse_config;
312
12
    StringRef json_ref(json.data(), json.size());
313
12
    RETURN_IF_CATCH_EXCEPTION(
314
12
            variant_util::parse_json_to_variant(*parsed_column, json_ref, nullptr, parse_config));
315
12
    Field parsed = (*parsed_column)[0];
316
12
    if (!parsed.is_null()) {
317
8
        auto& parsed_values = parsed.get<TYPE_VARIANT>();
318
8
        for (auto& [path, value] : parsed_values) {
319
8
            (*values)[append_path(prefix, path)] = std::move(value);
320
8
        }
321
8
    }
322
12
    RETURN_IF_ERROR(add_empty_object_markers_from_json(json, prefix, values));
323
12
    return Status::OK();
324
12
}
325
326
2
static Status variant_map_to_json(VariantMap values, std::string* json) {
327
2
    auto variant_column = ColumnVariant::create(0, false);
328
2
    RETURN_IF_CATCH_EXCEPTION(
329
2
            variant_column->insert(Field::create_field<TYPE_VARIANT>(std::move(values))));
330
2
    DataTypeSerDe::FormatOptions options;
331
2
    variant_column->serialize_one_row_to_string(0, json, options);
332
2
    return Status::OK();
333
2
}
334
335
14
static bool path_has_prefix(const PathInData& path, const PathInData& prefix) {
336
14
    const auto& parts = path.get_parts();
337
14
    const auto& prefix_parts = prefix.get_parts();
338
14
    if (parts.size() < prefix_parts.size()) {
339
0
        return false;
340
0
    }
341
15
    for (size_t i = 0; i < prefix_parts.size(); ++i) {
342
1
        if (parts[i] != prefix_parts[i]) {
343
0
            return false;
344
0
        }
345
1
    }
346
14
    return true;
347
14
}
348
349
15
static bool has_descendant_path(const VariantMap& values, const PathInData& prefix) {
350
15
    const size_t prefix_size = prefix.get_parts().size();
351
15
    return std::ranges::any_of(values, [&](const auto& entry) {
352
12
        const auto& path = entry.first;
353
12
        return path.get_parts().size() > prefix_size && path_has_prefix(path, prefix);
354
12
    });
355
15
}
356
357
static void erase_shadowed_empty_object_markers(VariantMap* values,
358
26
                                                const VariantMap& shadowing_values) {
359
46
    for (auto it = values->begin(); it != values->end();) {
360
20
        if (is_empty_object_marker(it->second) &&
361
20
            (has_descendant_path(*values, it->first) ||
362
7
             has_descendant_path(shadowing_values, it->first))) {
363
3
            it = values->erase(it);
364
3
            continue;
365
3
        }
366
17
        ++it;
367
17
    }
368
26
}
369
370
static void erase_shadowed_empty_object_markers(VariantMap* value_values,
371
13
                                                VariantMap* typed_values) {
372
13
    erase_shadowed_empty_object_markers(value_values, *typed_values);
373
13
    erase_shadowed_empty_object_markers(typed_values, *value_values);
374
13
}
375
376
static Status check_no_shredded_value_typed_duplicates(const VariantMap& value_values,
377
                                                       const VariantMap& typed_values,
378
13
                                                       const PathInData& prefix) {
379
13
    const size_t prefix_size = prefix.get_parts().size();
380
13
    for (const auto& value_entry : value_values) {
381
7
        const auto& value_path = value_entry.first;
382
7
        if (!path_has_prefix(value_path, prefix)) {
383
0
            continue;
384
0
        }
385
7
        if (value_path.get_parts().size() == prefix_size) {
386
1
            if (is_empty_object_marker(value_entry.second) &&
387
1
                !has_descendant_path(typed_values, value_path)) {
388
1
                continue;
389
1
            }
390
0
            if (!typed_values.empty()) {
391
0
                return Status::Corruption(
392
0
                        "Parquet VARIANT residual value conflicts with typed_value at path {}",
393
0
                        value_path.get_path());
394
0
            }
395
0
            continue;
396
0
        }
397
6
        for (const auto& typed_entry : typed_values) {
398
4
            const auto& typed_path = typed_entry.first;
399
4
            if (!path_has_prefix(typed_path, prefix)) {
400
0
                continue;
401
0
            }
402
4
            if (typed_path.get_parts().size() == prefix_size) {
403
0
                if (is_empty_object_marker(typed_entry.second) &&
404
0
                    !has_descendant_path(value_values, typed_path)) {
405
0
                    continue;
406
0
                }
407
0
                return Status::Corruption(
408
0
                        "Parquet VARIANT residual value and typed_value contain duplicate field {}",
409
0
                        value_path.get_parts()[prefix_size].key);
410
0
            }
411
4
            if (value_path.get_parts()[prefix_size] == typed_path.get_parts()[prefix_size]) {
412
3
                if (value_path == typed_path && is_empty_object_marker(value_entry.second) &&
413
3
                    is_empty_object_marker(typed_entry.second)) {
414
1
                    continue;
415
1
                }
416
2
                return Status::Corruption(
417
2
                        "Parquet VARIANT residual value and typed_value contain duplicate field {}",
418
2
                        value_path.get_parts()[prefix_size].key);
419
3
            }
420
4
        }
421
6
    }
422
11
    return Status::OK();
423
13
}
424
425
9
static bool has_direct_typed_parent_null(const std::vector<const NullMap*>& null_maps, size_t row) {
426
18
    return std::ranges::any_of(null_maps, [&](const NullMap* null_map) {
427
18
        DCHECK_LT(row, null_map->size());
428
18
        return (*null_map)[row];
429
18
    });
430
9
}
431
432
static void insert_direct_typed_leaf_range(const IColumn& column, size_t start, size_t rows,
433
                                           const std::vector<const NullMap*>& parent_null_maps,
434
5
                                           IColumn* variant_leaf) {
435
5
    auto& nullable_leaf = assert_cast<ColumnNullable&>(*variant_leaf);
436
5
    const IColumn* value_column = &column;
437
5
    const NullMap* leaf_null_map = nullptr;
438
5
    if (const auto* nullable_column = check_and_get_column<ColumnNullable>(&column)) {
439
0
        value_column = &nullable_column->get_nested_column();
440
0
        leaf_null_map = &nullable_column->get_null_map_data();
441
0
    }
442
443
5
    nullable_leaf.get_nested_column().insert_range_from(*value_column, start, rows);
444
5
    auto& null_map = nullable_leaf.get_null_map_data();
445
5
    null_map.reserve(null_map.size() + rows);
446
11
    for (size_t i = 0; i < rows; ++i) {
447
6
        const size_t row = start + i;
448
6
        const bool leaf_is_null = leaf_null_map != nullptr && (*leaf_null_map)[row];
449
6
        null_map.push_back(leaf_is_null || has_direct_typed_parent_null(parent_null_maps, row));
450
6
    }
451
5
}
452
453
16
static bool is_temporal_variant_leaf_type(PrimitiveType type) {
454
16
    switch (type) {
455
2
    case TYPE_TIMEV2:
456
2
    case TYPE_DATE:
457
2
    case TYPE_DATETIME:
458
4
    case TYPE_DATEV2:
459
6
    case TYPE_DATETIMEV2:
460
6
    case TYPE_TIMESTAMPTZ:
461
6
        return true;
462
10
    default:
463
10
        return false;
464
16
    }
465
16
}
466
467
8
static DataTypePtr direct_variant_leaf_type(const DataTypePtr& data_type) {
468
8
    const auto& type = remove_nullable(data_type);
469
8
    if (is_temporal_variant_leaf_type(type->get_primitive_type())) {
470
3
        return std::make_shared<DataTypeInt64>();
471
3
    }
472
5
    return type;
473
8
}
474
475
0
static bool contains_temporal_variant_leaf_type(const DataTypePtr& data_type) {
476
0
    const auto& type = remove_nullable(data_type);
477
0
    if (is_temporal_variant_leaf_type(type->get_primitive_type())) {
478
0
        return true;
479
0
    }
480
0
    if (type->get_primitive_type() == TYPE_ARRAY) {
481
0
        return contains_temporal_variant_leaf_type(
482
0
                assert_cast<const DataTypeArray*>(type.get())->get_nested_type());
483
0
    }
484
0
    return false;
485
0
}
486
487
static int64_t direct_temporal_variant_value(PrimitiveType type, const IColumn& column,
488
3
                                             size_t row) {
489
3
    switch (type) {
490
1
    case TYPE_TIMEV2:
491
1
        return static_cast<int64_t>(
492
1
                std::llround(assert_cast<const ColumnTimeV2&>(column).get_data()[row]));
493
0
    case TYPE_DATE:
494
0
        return variant_date_value(assert_cast<const ColumnDate&>(column).get_data()[row]);
495
0
    case TYPE_DATETIME:
496
0
        return variant_datetime_value(assert_cast<const ColumnDateTime&>(column).get_data()[row]);
497
1
    case TYPE_DATEV2:
498
1
        return variant_date_value(assert_cast<const ColumnDateV2&>(column).get_data()[row]);
499
1
    case TYPE_DATETIMEV2:
500
1
        return variant_datetime_value(assert_cast<const ColumnDateTimeV2&>(column).get_data()[row]);
501
0
    case TYPE_TIMESTAMPTZ:
502
0
        return variant_datetime_value(
503
0
                assert_cast<const ColumnTimeStampTz&>(column).get_data()[row]);
504
0
    default:
505
0
        DORIS_CHECK(false);
506
0
        return 0;
507
3
    }
508
3
}
509
510
static void insert_direct_typed_temporal_leaf_range(
511
        PrimitiveType type, const IColumn& column, size_t start, size_t rows,
512
3
        const std::vector<const NullMap*>& parent_null_maps, IColumn* variant_leaf) {
513
3
    auto& nullable_leaf = assert_cast<ColumnNullable&>(*variant_leaf);
514
3
    const IColumn* value_column = &column;
515
3
    const NullMap* leaf_null_map = nullptr;
516
3
    if (const auto* nullable_column = check_and_get_column<ColumnNullable>(&column)) {
517
0
        value_column = &nullable_column->get_nested_column();
518
0
        leaf_null_map = &nullable_column->get_null_map_data();
519
0
    }
520
521
3
    auto& data = assert_cast<ColumnInt64&>(nullable_leaf.get_nested_column()).get_data();
522
3
    data.reserve(data.size() + rows);
523
3
    auto& null_map = nullable_leaf.get_null_map_data();
524
3
    null_map.reserve(null_map.size() + rows);
525
6
    for (size_t i = 0; i < rows; ++i) {
526
3
        const size_t row = start + i;
527
3
        data.push_back(direct_temporal_variant_value(type, *value_column, row));
528
3
        const bool leaf_is_null = leaf_null_map != nullptr && (*leaf_null_map)[row];
529
3
        null_map.push_back(leaf_is_null || has_direct_typed_parent_null(parent_null_maps, row));
530
3
    }
531
3
}
532
533
3
static void append_json_string(std::string_view value, std::string* json) {
534
3
    auto column = ColumnString::create();
535
3
    VectorBufferWriter writer(*column);
536
3
    writer.write_json_string(value);
537
3
    writer.commit();
538
3
    json->append(column->get_data_at(0).data, column->get_data_at(0).size);
539
3
}
540
541
static bool is_column_selected(const FieldSchema& field_schema,
542
53
                               const std::set<uint64_t>& column_ids) {
543
53
    return column_ids.empty() || column_ids.find(field_schema.get_column_id()) != column_ids.end();
544
53
}
545
546
static bool has_selected_column(const FieldSchema& field_schema,
547
53
                                const std::set<uint64_t>& column_ids) {
548
53
    if (is_column_selected(field_schema, column_ids)) {
549
45
        return true;
550
45
    }
551
8
    return std::any_of(field_schema.children.begin(), field_schema.children.end(),
552
8
                       [&column_ids](const FieldSchema& child) {
553
5
                           return has_selected_column(child, column_ids);
554
5
                       });
555
53
}
556
557
12
static bool is_direct_variant_leaf_type(const DataTypePtr& data_type) {
558
12
    const auto& type = remove_nullable(data_type);
559
12
    switch (type->get_primitive_type()) {
560
0
    case TYPE_BOOLEAN:
561
0
    case TYPE_TINYINT:
562
0
    case TYPE_SMALLINT:
563
4
    case TYPE_INT:
564
9
    case TYPE_BIGINT:
565
9
    case TYPE_LARGEINT:
566
9
    case TYPE_DECIMALV2:
567
9
    case TYPE_DECIMAL32:
568
9
    case TYPE_DECIMAL64:
569
9
    case TYPE_DECIMAL128I:
570
9
    case TYPE_DECIMAL256:
571
9
    case TYPE_STRING:
572
9
    case TYPE_CHAR:
573
9
    case TYPE_VARCHAR:
574
9
        return true;
575
1
    case TYPE_TIMEV2:
576
1
    case TYPE_DATE:
577
1
    case TYPE_DATETIME:
578
2
    case TYPE_DATEV2:
579
3
    case TYPE_DATETIMEV2:
580
3
    case TYPE_TIMESTAMPTZ:
581
3
        return true;
582
0
    case TYPE_ARRAY: {
583
0
        const auto* array_type = assert_cast<const DataTypeArray*>(type.get());
584
0
        return !contains_temporal_variant_leaf_type(array_type->get_nested_type()) &&
585
0
               is_direct_variant_leaf_type(array_type->get_nested_type());
586
3
    }
587
0
    default:
588
0
        return false;
589
12
    }
590
12
}
591
592
static bool can_direct_read_typed_value(const FieldSchema& field_schema, bool allow_variant_wrapper,
593
17
                                        const std::set<uint64_t>& column_ids) {
594
17
    if (!has_selected_column(field_schema, column_ids)) {
595
0
        return true;
596
0
    }
597
17
    if (allow_variant_wrapper && is_variant_wrapper_field(field_schema, false)) {
598
0
        const int value_idx = find_child_idx(field_schema, "value");
599
0
        const int typed_value_idx = find_child_idx(field_schema, "typed_value");
600
0
        return (value_idx < 0 ||
601
0
                !has_selected_column(field_schema.children[value_idx], column_ids)) &&
602
0
               typed_value_idx >= 0 &&
603
0
               can_direct_read_typed_value(field_schema.children[typed_value_idx], false,
604
0
                                           column_ids);
605
0
    }
606
607
17
    const auto& type = remove_nullable(field_schema.data_type);
608
17
    if (type->get_primitive_type() == TYPE_STRUCT) {
609
7
        return std::all_of(field_schema.children.begin(), field_schema.children.end(),
610
11
                           [&column_ids](const FieldSchema& child) {
611
11
                               return can_direct_read_typed_value(child, true, column_ids);
612
11
                           });
613
7
    }
614
10
    return is_direct_variant_leaf_type(field_schema.data_type);
615
17
}
616
617
static bool has_selected_direct_typed_leaf(const FieldSchema& field_schema,
618
                                           bool allow_variant_wrapper,
619
7
                                           const std::set<uint64_t>& column_ids) {
620
7
    if (!has_selected_column(field_schema, column_ids)) {
621
2
        return false;
622
2
    }
623
5
    if (allow_variant_wrapper && is_variant_wrapper_field(field_schema, false)) {
624
0
        const int typed_value_idx = find_child_idx(field_schema, "typed_value");
625
0
        DCHECK_GE(typed_value_idx, 0);
626
0
        return has_selected_direct_typed_leaf(field_schema.children[typed_value_idx], false,
627
0
                                              column_ids);
628
0
    }
629
630
5
    const auto& type = remove_nullable(field_schema.data_type);
631
5
    if (type->get_primitive_type() == TYPE_STRUCT) {
632
3
        return std::any_of(field_schema.children.begin(), field_schema.children.end(),
633
3
                           [&column_ids](const FieldSchema& child) {
634
3
                               return has_selected_direct_typed_leaf(child, true, column_ids);
635
3
                           });
636
3
    }
637
2
    return is_direct_variant_leaf_type(field_schema.data_type);
638
5
}
639
640
static bool can_use_direct_typed_only_value(const FieldSchema& variant_field,
641
5
                                            const std::set<uint64_t>& column_ids) {
642
5
    const int value_idx = find_child_idx(variant_field, "value");
643
5
    const int typed_value_idx = find_child_idx(variant_field, "typed_value");
644
5
    return (value_idx < 0 || !has_selected_column(variant_field.children[value_idx], column_ids)) &&
645
5
           typed_value_idx >= 0 &&
646
5
           has_selected_direct_typed_leaf(variant_field.children[typed_value_idx], false,
647
4
                                          column_ids) &&
648
5
           can_direct_read_typed_value(variant_field.children[typed_value_idx], false, column_ids);
649
5
}
650
651
7
static void fill_variant_field_info(FieldWithDataType* value) {
652
7
    FieldInfo info;
653
7
    variant_util::get_field_info(value->field, &info);
654
7
    DCHECK_LE(info.num_dimensions, std::numeric_limits<uint8_t>::max());
655
7
    value->base_scalar_type_id = info.scalar_type_id;
656
7
    value->num_dimensions = static_cast<uint8_t>(info.num_dimensions);
657
7
}
658
659
static Status field_to_variant_field(const FieldSchema& field_schema, const Field& field,
660
7
                                     FieldWithDataType* value, bool* present) {
661
7
    if (field.is_null()) {
662
0
        *present = false;
663
0
        return Status::OK();
664
0
    }
665
7
    *present = true;
666
7
    const DataTypePtr& type = remove_nullable(field_schema.data_type);
667
7
    switch (type->get_primitive_type()) {
668
0
    case TYPE_BOOLEAN:
669
0
    case TYPE_TINYINT:
670
0
    case TYPE_SMALLINT:
671
5
    case TYPE_INT:
672
5
    case TYPE_BIGINT:
673
5
    case TYPE_LARGEINT:
674
5
    case TYPE_DECIMALV2:
675
5
    case TYPE_DECIMAL32:
676
5
    case TYPE_DECIMAL64:
677
5
    case TYPE_DECIMAL128I:
678
5
    case TYPE_DECIMAL256:
679
7
    case TYPE_STRING:
680
7
    case TYPE_CHAR:
681
7
    case TYPE_VARCHAR:
682
7
    case TYPE_ARRAY:
683
7
        value->field = field;
684
7
        fill_variant_field_info(value);
685
7
        value->precision = type->get_precision();
686
7
        value->scale = type->get_scale();
687
7
        return Status::OK();
688
0
    case TYPE_FLOAT: {
689
0
        const auto float_value = field.get<TYPE_FLOAT>();
690
0
        if (!std::isfinite(float_value)) {
691
0
            return Status::NotSupported(
692
0
                    "Parquet VARIANT non-finite floating point typed_value is not supported");
693
0
        }
694
0
        value->field = field;
695
0
        fill_variant_field_info(value);
696
0
        return Status::OK();
697
0
    }
698
0
    case TYPE_DOUBLE: {
699
0
        const auto double_value = field.get<TYPE_DOUBLE>();
700
0
        if (!std::isfinite(double_value)) {
701
0
            return Status::NotSupported(
702
0
                    "Parquet VARIANT non-finite floating point typed_value is not supported");
703
0
        }
704
0
        value->field = field;
705
0
        fill_variant_field_info(value);
706
0
        return Status::OK();
707
0
    }
708
0
    case TYPE_TIMEV2:
709
0
        value->field = Field::create_field<TYPE_BIGINT>(
710
0
                static_cast<int64_t>(std::llround(field.get<TYPE_TIMEV2>())));
711
0
        value->base_scalar_type_id = TYPE_BIGINT;
712
0
        return Status::OK();
713
0
    case TYPE_DATE:
714
0
        value->field = Field::create_field<TYPE_BIGINT>(variant_date_value(field.get<TYPE_DATE>()));
715
0
        value->base_scalar_type_id = TYPE_BIGINT;
716
0
        return Status::OK();
717
0
    case TYPE_DATETIME:
718
0
        value->field = Field::create_field<TYPE_BIGINT>(
719
0
                variant_datetime_value(field.get<TYPE_DATETIME>()));
720
0
        value->base_scalar_type_id = TYPE_BIGINT;
721
0
        return Status::OK();
722
0
    case TYPE_DATEV2:
723
0
        value->field =
724
0
                Field::create_field<TYPE_BIGINT>(variant_date_value(field.get<TYPE_DATEV2>()));
725
0
        value->base_scalar_type_id = TYPE_BIGINT;
726
0
        return Status::OK();
727
0
    case TYPE_DATETIMEV2:
728
0
        value->field = Field::create_field<TYPE_BIGINT>(
729
0
                variant_datetime_value(field.get<TYPE_DATETIMEV2>()));
730
0
        value->base_scalar_type_id = TYPE_BIGINT;
731
0
        return Status::OK();
732
0
    case TYPE_TIMESTAMPTZ:
733
0
        value->field = Field::create_field<TYPE_BIGINT>(
734
0
                variant_datetime_value(field.get<TYPE_TIMESTAMPTZ>()));
735
0
        value->base_scalar_type_id = TYPE_BIGINT;
736
0
        return Status::OK();
737
0
    case TYPE_VARBINARY:
738
0
        return Status::NotSupported("Parquet VARIANT binary typed_value is not supported");
739
0
    default:
740
0
        return Status::Corruption("Unsupported Parquet VARIANT typed_value Doris type {}",
741
0
                                  type->get_name());
742
7
    }
743
7
}
744
745
static Status typed_value_to_json(const FieldSchema& typed_value_field, const Field& field,
746
                                  const std::string& metadata, std::string* json, bool* present);
747
748
static Status serialize_field_to_json(const DataTypePtr& data_type, const Field& field,
749
3
                                      std::string* json) {
750
3
    MutableColumnPtr column = data_type->create_column();
751
3
    column->insert(field);
752
753
3
    auto json_column = ColumnString::create();
754
3
    VectorBufferWriter writer(*json_column);
755
3
    auto serde = data_type->get_serde();
756
3
    DataTypeSerDe::FormatOptions options;
757
3
    RETURN_IF_ERROR(serde->serialize_one_cell_to_json(*column, 0, writer, options));
758
3
    writer.commit();
759
3
    *json = json_column->get_data_at(0).to_string();
760
3
    return Status::OK();
761
3
}
762
763
static Status scalar_typed_value_to_json(const FieldSchema& field_schema, const Field& field,
764
3
                                         std::string* json, bool* present) {
765
3
    FieldWithDataType value;
766
3
    RETURN_IF_ERROR(field_to_variant_field(field_schema, field, &value, present));
767
3
    if (!*present) {
768
0
        return Status::OK();
769
0
    }
770
3
    if (value.field.is_null()) {
771
0
        *json = "null";
772
0
        return Status::OK();
773
0
    }
774
775
3
    DataTypePtr json_type;
776
3
    if (value.base_scalar_type_id != PrimitiveType::INVALID_TYPE) {
777
3
        json_type = DataTypeFactory::instance().create_data_type(value.base_scalar_type_id, false,
778
3
                                                                 value.precision, value.scale);
779
3
    } else {
780
0
        json_type = remove_nullable(field_schema.data_type);
781
0
    }
782
3
    return serialize_field_to_json(json_type, value.field, json);
783
3
}
784
785
static Status variant_to_json(const FieldSchema& variant_field, const Field& field,
786
                              const std::string* inherited_metadata, std::string* json,
787
4
                              bool* present) {
788
4
    if (field.is_null()) {
789
1
        *present = false;
790
1
        return Status::OK();
791
1
    }
792
793
3
    const auto& fields = field.get<TYPE_STRUCT>();
794
3
    const int metadata_idx = find_child_idx(variant_field, "metadata");
795
3
    const int value_idx = find_child_idx(variant_field, "value");
796
3
    const int typed_value_idx = find_child_idx(variant_field, "typed_value");
797
798
3
    std::string metadata;
799
3
    bool has_metadata = false;
800
3
    if (inherited_metadata != nullptr) {
801
3
        metadata = *inherited_metadata;
802
3
        has_metadata = true;
803
3
    }
804
3
    if (metadata_idx >= 0) {
805
0
        bool metadata_present = false;
806
0
        RETURN_IF_ERROR(get_binary_field(fields[metadata_idx], &metadata, &metadata_present));
807
0
        has_metadata = metadata_present;
808
0
    }
809
810
3
    std::string typed_json;
811
3
    bool typed_present = false;
812
3
    if (typed_value_idx >= 0) {
813
3
        RETURN_IF_ERROR(typed_value_to_json(variant_field.children[typed_value_idx],
814
3
                                            fields[typed_value_idx], metadata, &typed_json,
815
3
                                            &typed_present));
816
3
    }
817
818
3
    std::string value_json;
819
3
    bool value_present = false;
820
3
    if (value_idx >= 0) {
821
3
        std::string value;
822
3
        RETURN_IF_ERROR(get_binary_field(fields[value_idx], &value, &value_present));
823
3
        if (value_present) {
824
3
            if (!has_metadata) {
825
0
                return Status::Corruption("Parquet VARIANT value is present without metadata");
826
0
            }
827
3
            RETURN_IF_ERROR(parquet::decode_variant_to_json(
828
3
                    StringRef(metadata.data(), metadata.size()),
829
3
                    StringRef(value.data(), value.size()), &value_json));
830
3
        }
831
3
    }
832
833
3
    if (value_present && typed_present) {
834
3
        VariantMap value_values;
835
3
        RETURN_IF_ERROR(parse_json_to_variant_map(value_json, PathInData(), &value_values));
836
3
        VariantMap typed_values;
837
3
        RETURN_IF_ERROR(parse_json_to_variant_map(typed_json, PathInData(), &typed_values));
838
3
        erase_shadowed_empty_object_markers(&value_values, &typed_values);
839
3
        auto root_value = value_values.find(PathInData());
840
3
        if (root_value != value_values.end() && !is_empty_object_marker(root_value->second)) {
841
0
            return Status::Corruption(
842
0
                    "Parquet VARIANT has conflicting non-object value and typed_value");
843
0
        }
844
3
        RETURN_IF_ERROR(
845
3
                check_no_shredded_value_typed_duplicates(value_values, typed_values, PathInData()));
846
2
        value_values.merge(std::move(typed_values));
847
2
        RETURN_IF_ERROR(variant_map_to_json(std::move(value_values), json));
848
2
        *present = true;
849
2
        return Status::OK();
850
2
    }
851
852
0
    if (typed_present) {
853
0
        *json = std::move(typed_json);
854
0
        *present = true;
855
0
        return Status::OK();
856
0
    }
857
0
    if (value_present) {
858
0
        *json = std::move(value_json);
859
0
        *present = true;
860
0
        return Status::OK();
861
0
    }
862
863
0
    *present = false;
864
0
    return Status::OK();
865
0
}
866
867
static Status shredded_field_to_json(const FieldSchema& field_schema, const Field& field,
868
                                     const std::string& metadata, std::string* json, bool* present,
869
4
                                     bool allow_scalar_typed_value_only_wrapper) {
870
4
    if (is_variant_wrapper_field(field_schema, allow_scalar_typed_value_only_wrapper, true)) {
871
1
        return variant_to_json(field_schema, field, &metadata, json, present);
872
1
    }
873
3
    return typed_value_to_json(field_schema, field, metadata, json, present);
874
4
}
875
876
static Status typed_array_to_json(const FieldSchema& typed_value_field, const Field& field,
877
1
                                  const std::string& metadata, std::string* json, bool* present) {
878
1
    if (field.is_null()) {
879
0
        *present = false;
880
0
        return Status::OK();
881
0
    }
882
1
    if (typed_value_field.children.empty()) {
883
0
        return Status::Corruption("Parquet VARIANT array typed_value has no element schema");
884
0
    }
885
886
1
    const auto& elements = field.get<TYPE_ARRAY>();
887
1
    const auto& element_schema = typed_value_field.children[0];
888
1
    json->clear();
889
1
    json->push_back('[');
890
1
    for (size_t i = 0; i < elements.size(); ++i) {
891
1
        if (i != 0) {
892
0
            json->push_back(',');
893
0
        }
894
1
        std::string element_json;
895
1
        bool element_present = false;
896
1
        RETURN_IF_ERROR(shredded_field_to_json(element_schema, elements[i], metadata, &element_json,
897
1
                                               &element_present, true));
898
1
        if (element_present) {
899
0
            json->append(element_json);
900
1
        } else {
901
1
            return Status::Corruption("Parquet VARIANT array typed_value element is missing");
902
1
        }
903
1
    }
904
0
    json->push_back(']');
905
0
    *present = true;
906
0
    return Status::OK();
907
1
}
908
909
static Status typed_struct_to_json(const FieldSchema& typed_value_field, const Field& field,
910
3
                                   const std::string& metadata, std::string* json, bool* present) {
911
3
    if (field.is_null()) {
912
0
        *present = false;
913
0
        return Status::OK();
914
0
    }
915
916
3
    const auto& fields = field.get<TYPE_STRUCT>();
917
3
    json->clear();
918
3
    json->push_back('{');
919
3
    bool first = true;
920
6
    for (int i = 0; i < typed_value_field.children.size(); ++i) {
921
3
        std::string child_json;
922
3
        bool child_present = false;
923
3
        RETURN_IF_ERROR(shredded_field_to_json(typed_value_field.children[i], fields[i], metadata,
924
3
                                               &child_json, &child_present, false));
925
3
        if (!child_present) {
926
0
            continue;
927
0
        }
928
3
        if (!first) {
929
0
            json->push_back(',');
930
0
        }
931
3
        append_json_string(typed_value_field.children[i].name, json);
932
3
        json->push_back(':');
933
3
        json->append(child_json);
934
3
        first = false;
935
3
    }
936
3
    json->push_back('}');
937
3
    *present = true;
938
3
    return Status::OK();
939
3
}
940
941
static Status typed_value_to_json(const FieldSchema& typed_value_field, const Field& field,
942
7
                                  const std::string& metadata, std::string* json, bool* present) {
943
7
    const DataTypePtr& typed_type = remove_nullable(typed_value_field.data_type);
944
7
    switch (typed_type->get_primitive_type()) {
945
3
    case TYPE_STRUCT:
946
3
        return typed_struct_to_json(typed_value_field, field, metadata, json, present);
947
1
    case TYPE_ARRAY:
948
1
        return typed_array_to_json(typed_value_field, field, metadata, json, present);
949
3
    default:
950
3
        return scalar_typed_value_to_json(typed_value_field, field, json, present);
951
7
    }
952
7
}
953
954
static Status typed_value_to_variant_map(const FieldSchema& typed_value_field, const Field& field,
955
                                         const std::string& metadata, PathInDataBuilder* path,
956
                                         VariantMap* values, bool* present);
957
958
static Status variant_to_variant_map(const FieldSchema& variant_field, const Field& field,
959
                                     const std::string* inherited_metadata, PathInDataBuilder* path,
960
11
                                     VariantMap* values, bool* present) {
961
11
    if (field.is_null()) {
962
0
        *present = false;
963
0
        return Status::OK();
964
0
    }
965
11
    const auto& fields = field.get<TYPE_STRUCT>();
966
11
    const int metadata_idx = find_child_idx(variant_field, "metadata");
967
11
    const int value_idx = find_child_idx(variant_field, "value");
968
11
    const int typed_value_idx = find_child_idx(variant_field, "typed_value");
969
970
11
    std::string metadata;
971
11
    bool has_metadata = false;
972
11
    if (inherited_metadata != nullptr) {
973
1
        metadata = *inherited_metadata;
974
1
        has_metadata = true;
975
1
    }
976
11
    if (metadata_idx >= 0) {
977
10
        bool metadata_present = false;
978
10
        RETURN_IF_ERROR(get_binary_field(fields[metadata_idx], &metadata, &metadata_present));
979
10
        has_metadata = metadata_present;
980
10
    }
981
982
11
    VariantMap value_values;
983
11
    bool value_present = false;
984
11
    const PathInData current_path = path->build();
985
11
    if (value_idx >= 0) {
986
9
        std::string value;
987
9
        RETURN_IF_ERROR(get_binary_field(fields[value_idx], &value, &value_present));
988
9
        if (value_present) {
989
6
            if (!has_metadata) {
990
0
                return Status::Corruption("Parquet VARIANT value is present without metadata");
991
0
            }
992
6
            std::string value_json;
993
6
            RETURN_IF_ERROR(parquet::decode_variant_to_json(
994
6
                    StringRef(metadata.data(), metadata.size()),
995
6
                    StringRef(value.data(), value.size()), &value_json));
996
6
            RETURN_IF_ERROR(parse_json_to_variant_map(value_json, current_path, &value_values));
997
6
        }
998
9
    }
999
1000
11
    VariantMap typed_values;
1001
11
    bool typed_present = false;
1002
11
    if (typed_value_idx >= 0) {
1003
9
        RETURN_IF_ERROR(typed_value_to_variant_map(variant_field.children[typed_value_idx],
1004
9
                                                   fields[typed_value_idx], metadata, path,
1005
9
                                                   &typed_values, &typed_present));
1006
9
    }
1007
1008
10
    erase_shadowed_empty_object_markers(&value_values, &typed_values);
1009
10
    auto current_value = value_values.find(current_path);
1010
10
    if (value_present && typed_present && current_value != value_values.end() &&
1011
10
        !is_empty_object_marker(current_value->second)) {
1012
0
        return Status::Corruption(
1013
0
                "Parquet VARIANT has conflicting non-object value and typed_value");
1014
0
    }
1015
10
    RETURN_IF_ERROR(
1016
10
            check_no_shredded_value_typed_duplicates(value_values, typed_values, current_path));
1017
9
    values->merge(std::move(value_values));
1018
9
    values->merge(std::move(typed_values));
1019
9
    *present = value_present || typed_present;
1020
9
    return Status::OK();
1021
10
}
1022
1023
static Status shredded_field_to_variant_map(const FieldSchema& field_schema, const Field& field,
1024
                                            const std::string& metadata, PathInDataBuilder* path,
1025
10
                                            VariantMap* values, bool* present) {
1026
10
    if (is_variant_wrapper_field(field_schema, false, true)) {
1027
1
        return variant_to_variant_map(field_schema, field, &metadata, path, values, present);
1028
1
    }
1029
9
    return typed_value_to_variant_map(field_schema, field, metadata, path, values, present);
1030
10
}
1031
1032
static Status typed_value_to_variant_map(const FieldSchema& typed_value_field, const Field& field,
1033
                                         const std::string& metadata, PathInDataBuilder* path,
1034
18
                                         VariantMap* values, bool* present) {
1035
18
    if (field.is_null()) {
1036
4
        *present = false;
1037
4
        return Status::OK();
1038
4
    }
1039
14
    const DataTypePtr& typed_type = remove_nullable(typed_value_field.data_type);
1040
14
    if (typed_type->get_primitive_type() == TYPE_STRUCT) {
1041
9
        const auto& fields = field.get<TYPE_STRUCT>();
1042
9
        *present = true;
1043
9
        bool has_present_child = false;
1044
19
        for (int i = 0; i < typed_value_field.children.size(); ++i) {
1045
10
            path->append(typed_value_field.children[i].name, false);
1046
10
            bool child_present = false;
1047
10
            RETURN_IF_ERROR(shredded_field_to_variant_map(typed_value_field.children[i], fields[i],
1048
10
                                                          metadata, path, values, &child_present));
1049
10
            has_present_child |= child_present;
1050
10
            path->pop_back();
1051
10
        }
1052
9
        if (!has_present_child) {
1053
3
            RETURN_IF_ERROR(insert_empty_object_marker(path->build(), values));
1054
3
        }
1055
9
        return Status::OK();
1056
9
    }
1057
5
    if (typed_type->get_primitive_type() == TYPE_ARRAY) {
1058
1
        std::string value_json;
1059
1
        RETURN_IF_ERROR(
1060
1
                typed_value_to_json(typed_value_field, field, metadata, &value_json, present));
1061
0
        if (*present) {
1062
0
            RETURN_IF_ERROR(parse_json_to_variant_map(value_json, path->build(), values));
1063
0
        }
1064
0
        return Status::OK();
1065
0
    }
1066
1067
4
    FieldWithDataType value;
1068
4
    RETURN_IF_ERROR(field_to_variant_field(typed_value_field, field, &value, present));
1069
4
    if (*present) {
1070
4
        (*values)[path->build()] = std::move(value);
1071
4
    }
1072
4
    return Status::OK();
1073
4
}
1074
1075
static Status append_direct_typed_column_to_batch(const FieldSchema& field_schema,
1076
                                                  const IColumn& column, size_t start, size_t rows,
1077
                                                  PathInDataBuilder* path, ColumnVariant* batch,
1078
                                                  bool allow_variant_wrapper,
1079
                                                  const std::set<uint64_t>& column_ids,
1080
13
                                                  std::vector<const NullMap*> parent_null_maps) {
1081
13
    if (!has_selected_column(field_schema, column_ids)) {
1082
0
        return Status::OK();
1083
0
    }
1084
1085
13
    const IColumn* value_column = &column;
1086
13
    if (const auto* nullable_column = check_and_get_column<ColumnNullable>(&column)) {
1087
12
        parent_null_maps.push_back(&nullable_column->get_null_map_data());
1088
12
        value_column = &nullable_column->get_nested_column();
1089
12
    }
1090
1091
13
    if (allow_variant_wrapper && is_variant_wrapper_field(field_schema, false)) {
1092
0
        const int typed_value_idx = find_child_idx(field_schema, "typed_value");
1093
0
        DCHECK_GE(typed_value_idx, 0);
1094
0
        const auto& typed_struct = assert_cast<const ColumnStruct&>(*value_column);
1095
0
        return append_direct_typed_column_to_batch(
1096
0
                field_schema.children[typed_value_idx], typed_struct.get_column(typed_value_idx),
1097
0
                start, rows, path, batch, false, column_ids, parent_null_maps);
1098
0
    }
1099
1100
13
    const auto& type = remove_nullable(field_schema.data_type);
1101
13
    if (type->get_primitive_type() == TYPE_STRUCT) {
1102
5
        const auto& struct_column = assert_cast<const ColumnStruct&>(*value_column);
1103
14
        for (int i = 0; i < field_schema.children.size(); ++i) {
1104
9
            if (!has_selected_column(field_schema.children[i], column_ids)) {
1105
0
                continue;
1106
0
            }
1107
9
            path->append(field_schema.children[i].name, false);
1108
9
            RETURN_IF_ERROR(append_direct_typed_column_to_batch(
1109
9
                    field_schema.children[i], struct_column.get_column(i), start, rows, path, batch,
1110
9
                    true, column_ids, parent_null_maps));
1111
9
            path->pop_back();
1112
9
        }
1113
5
        return Status::OK();
1114
5
    }
1115
1116
8
    DataTypePtr variant_leaf_type = make_nullable(direct_variant_leaf_type(field_schema.data_type));
1117
8
    MutableColumnPtr variant_leaf = variant_leaf_type->create_column();
1118
8
    variant_leaf->insert_default();
1119
8
    if (is_temporal_variant_leaf_type(type->get_primitive_type())) {
1120
3
        insert_direct_typed_temporal_leaf_range(type->get_primitive_type(), *value_column, start,
1121
3
                                                rows, parent_null_maps, variant_leaf.get());
1122
5
    } else {
1123
5
        insert_direct_typed_leaf_range(*value_column, start, rows, parent_null_maps,
1124
5
                                       variant_leaf.get());
1125
5
    }
1126
8
    if (!batch->add_sub_column(path->build(), std::move(variant_leaf), variant_leaf_type)) {
1127
0
        return Status::Corruption("Failed to add Parquet VARIANT typed subcolumn {}",
1128
0
                                  path->build().get_path());
1129
0
    }
1130
8
    return Status::OK();
1131
8
}
1132
1133
#ifdef BE_TEST
1134
namespace parquet_variant_reader_test {
1135
4
bool can_direct_read_typed_value_for_test(const FieldSchema& typed_value_field) {
1136
4
    const std::set<uint64_t> column_ids;
1137
4
    return can_direct_read_typed_value(typed_value_field, false, column_ids);
1138
4
}
1139
1140
bool can_use_direct_typed_only_value_for_test(const FieldSchema& variant_field,
1141
5
                                              const std::set<uint64_t>& column_ids) {
1142
5
    return can_use_direct_typed_only_value(variant_field, column_ids);
1143
5
}
1144
1145
Status append_direct_typed_column_to_batch_for_test(const FieldSchema& typed_value_field,
1146
                                                    const IColumn& typed_value_column, size_t start,
1147
4
                                                    size_t rows, ColumnVariant* batch) {
1148
4
    PathInDataBuilder path;
1149
4
    const std::set<uint64_t> column_ids;
1150
4
    return append_direct_typed_column_to_batch(typed_value_field, typed_value_column, start, rows,
1151
4
                                               &path, batch, false, column_ids, {});
1152
4
}
1153
1154
Status read_variant_row_for_test(const FieldSchema& variant_field, const Field& field,
1155
11
                                 bool output_nullable, Field* result, bool* sql_null) {
1156
11
    if (field.is_null()) {
1157
1
        if (!output_nullable) {
1158
0
            return Status::Corruption("Not nullable column has null values in parquet file");
1159
0
        }
1160
1
        *sql_null = true;
1161
1
        return Status::OK();
1162
1
    }
1163
1164
10
    VariantMap values;
1165
10
    bool present = false;
1166
10
    PathInDataBuilder path;
1167
10
    RETURN_IF_ERROR(
1168
10
            variant_to_variant_map(variant_field, field, nullptr, &path, &values, &present));
1169
8
    if (!present) {
1170
1
        values[PathInData()] = FieldWithDataType {.field = Field()};
1171
1
    }
1172
1173
8
    auto variant_column = ColumnVariant::create(0, false);
1174
8
    RETURN_IF_CATCH_EXCEPTION(
1175
8
            variant_column->insert(Field::create_field<TYPE_VARIANT>(std::move(values))));
1176
8
    variant_column->get(0, *result);
1177
8
    *sql_null = false;
1178
8
    return Status::OK();
1179
8
}
1180
1181
Status variant_to_json_for_test(const FieldSchema& variant_field, const Field& field,
1182
                                const std::string& inherited_metadata, std::string* json,
1183
3
                                bool* present) {
1184
3
    return variant_to_json(variant_field, field, &inherited_metadata, json, present);
1185
3
}
1186
} // namespace parquet_variant_reader_test
1187
#endif
1188
1189
Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field,
1190
                                   const tparquet::RowGroup& row_group, const RowRanges& row_ranges,
1191
                                   const cctz::time_zone* ctz, io::IOContext* io_ctx,
1192
                                   std::unique_ptr<ParquetColumnReader>& reader,
1193
                                   size_t max_buf_size,
1194
                                   std::unordered_map<int, tparquet::OffsetIndex>& col_offsets,
1195
                                   RuntimeState* state, bool in_collection,
1196
                                   const std::set<uint64_t>& column_ids,
1197
132
                                   const std::set<uint64_t>& filter_column_ids) {
1198
132
    size_t total_rows = row_group.num_rows;
1199
132
    if (field->data_type->get_primitive_type() == TYPE_ARRAY) {
1200
2
        std::unique_ptr<ParquetColumnReader> element_reader;
1201
2
        RETURN_IF_ERROR(create(file, &field->children[0], row_group, row_ranges, ctz, io_ctx,
1202
2
                               element_reader, max_buf_size, col_offsets, state, true, column_ids,
1203
2
                               filter_column_ids));
1204
2
        auto array_reader = ArrayColumnReader::create_unique(row_ranges, total_rows, ctz, io_ctx);
1205
2
        element_reader->set_column_in_nested();
1206
2
        RETURN_IF_ERROR(array_reader->init(std::move(element_reader), field));
1207
2
        array_reader->_filter_column_ids = filter_column_ids;
1208
2
        reader.reset(array_reader.release());
1209
130
    } else if (field->data_type->get_primitive_type() == TYPE_MAP) {
1210
0
        std::unique_ptr<ParquetColumnReader> key_reader;
1211
0
        std::unique_ptr<ParquetColumnReader> value_reader;
1212
1213
0
        if (column_ids.empty() ||
1214
0
            column_ids.find(field->children[0].get_column_id()) != column_ids.end()) {
1215
            // Create key reader
1216
0
            RETURN_IF_ERROR(create(file, &field->children[0], row_group, row_ranges, ctz, io_ctx,
1217
0
                                   key_reader, max_buf_size, col_offsets, state, true, column_ids,
1218
0
                                   filter_column_ids));
1219
0
        } else {
1220
0
            auto skip_reader = std::make_unique<SkipReadingReader>(row_ranges, total_rows, ctz,
1221
0
                                                                   io_ctx, &field->children[0]);
1222
0
            key_reader = std::move(skip_reader);
1223
0
        }
1224
1225
0
        if (column_ids.empty() ||
1226
0
            column_ids.find(field->children[1].get_column_id()) != column_ids.end()) {
1227
            // Create value reader
1228
0
            RETURN_IF_ERROR(create(file, &field->children[1], row_group, row_ranges, ctz, io_ctx,
1229
0
                                   value_reader, max_buf_size, col_offsets, state, true, column_ids,
1230
0
                                   filter_column_ids));
1231
0
        } else {
1232
0
            auto skip_reader = std::make_unique<SkipReadingReader>(row_ranges, total_rows, ctz,
1233
0
                                                                   io_ctx, &field->children[0]);
1234
0
            value_reader = std::move(skip_reader);
1235
0
        }
1236
1237
0
        auto map_reader = MapColumnReader::create_unique(row_ranges, total_rows, ctz, io_ctx);
1238
0
        key_reader->set_column_in_nested();
1239
0
        value_reader->set_column_in_nested();
1240
0
        RETURN_IF_ERROR(map_reader->init(std::move(key_reader), std::move(value_reader), field));
1241
0
        map_reader->_filter_column_ids = filter_column_ids;
1242
0
        reader.reset(map_reader.release());
1243
130
    } else if (field->data_type->get_primitive_type() == TYPE_STRUCT) {
1244
11
        std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> child_readers;
1245
11
        child_readers.reserve(field->children.size());
1246
11
        int non_skip_reader_idx = -1;
1247
37
        for (int i = 0; i < field->children.size(); ++i) {
1248
26
            auto& child = field->children[i];
1249
26
            std::unique_ptr<ParquetColumnReader> child_reader;
1250
26
            if (column_ids.empty() || column_ids.find(child.get_column_id()) != column_ids.end()) {
1251
22
                RETURN_IF_ERROR(create(file, &child, row_group, row_ranges, ctz, io_ctx,
1252
22
                                       child_reader, max_buf_size, col_offsets, state,
1253
22
                                       in_collection, column_ids, filter_column_ids));
1254
22
                child_readers[child.name] = std::move(child_reader);
1255
                // Record the first non-SkippingReader
1256
22
                if (non_skip_reader_idx == -1) {
1257
11
                    non_skip_reader_idx = i;
1258
11
                }
1259
22
            } else {
1260
4
                auto skip_reader = std::make_unique<SkipReadingReader>(row_ranges, total_rows, ctz,
1261
4
                                                                       io_ctx, &child);
1262
4
                skip_reader->_filter_column_ids = filter_column_ids;
1263
4
                child_readers[child.name] = std::move(skip_reader);
1264
4
            }
1265
26
            child_readers[child.name]->set_column_in_nested();
1266
26
        }
1267
        // If all children are SkipReadingReader, force the first child to call create
1268
11
        if (non_skip_reader_idx == -1) {
1269
0
            std::unique_ptr<ParquetColumnReader> child_reader;
1270
0
            RETURN_IF_ERROR(create(file, &field->children[0], row_group, row_ranges, ctz, io_ctx,
1271
0
                                   child_reader, max_buf_size, col_offsets, state, in_collection,
1272
0
                                   column_ids, filter_column_ids));
1273
0
            child_reader->set_column_in_nested();
1274
0
            child_readers[field->children[0].name] = std::move(child_reader);
1275
0
        }
1276
11
        auto struct_reader = StructColumnReader::create_unique(row_ranges, total_rows, ctz, io_ctx);
1277
11
        RETURN_IF_ERROR(struct_reader->init(std::move(child_readers), field));
1278
11
        struct_reader->_filter_column_ids = filter_column_ids;
1279
11
        reader.reset(struct_reader.release());
1280
119
    } else if (remove_nullable(field->data_type)->get_primitive_type() == TYPE_VARIANT) {
1281
0
        auto variant_reader =
1282
0
                VariantColumnReader::create_unique(row_ranges, total_rows, ctz, io_ctx);
1283
0
        RETURN_IF_ERROR(variant_reader->init(file, field, row_group, max_buf_size, col_offsets,
1284
0
                                             state, in_collection, column_ids, filter_column_ids));
1285
0
        variant_reader->_filter_column_ids = filter_column_ids;
1286
0
        reader.reset(variant_reader.release());
1287
119
    } else {
1288
119
        auto physical_index = field->physical_column_index;
1289
119
        const tparquet::OffsetIndex* offset_index =
1290
119
                col_offsets.find(physical_index) != col_offsets.end() ? &col_offsets[physical_index]
1291
119
                                                                      : nullptr;
1292
1293
119
        const tparquet::ColumnChunk& chunk = row_group.columns[physical_index];
1294
119
        if (in_collection) {
1295
3
            if (offset_index == nullptr) {
1296
3
                auto scalar_reader = ScalarColumnReader<true, false>::create_unique(
1297
3
                        row_ranges, total_rows, chunk, offset_index, ctz, io_ctx);
1298
1299
3
                RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size, state));
1300
3
                scalar_reader->_filter_column_ids = filter_column_ids;
1301
3
                reader.reset(scalar_reader.release());
1302
3
            } else {
1303
0
                auto scalar_reader = ScalarColumnReader<true, true>::create_unique(
1304
0
                        row_ranges, total_rows, chunk, offset_index, ctz, io_ctx);
1305
1306
0
                RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size, state));
1307
0
                scalar_reader->_filter_column_ids = filter_column_ids;
1308
0
                reader.reset(scalar_reader.release());
1309
0
            }
1310
116
        } else {
1311
116
            if (offset_index == nullptr) {
1312
116
                auto scalar_reader = ScalarColumnReader<false, false>::create_unique(
1313
116
                        row_ranges, total_rows, chunk, offset_index, ctz, io_ctx);
1314
1315
116
                RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size, state));
1316
116
                scalar_reader->_filter_column_ids = filter_column_ids;
1317
116
                reader.reset(scalar_reader.release());
1318
116
            } else {
1319
0
                auto scalar_reader = ScalarColumnReader<false, true>::create_unique(
1320
0
                        row_ranges, total_rows, chunk, offset_index, ctz, io_ctx);
1321
1322
0
                RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size, state));
1323
0
                scalar_reader->_filter_column_ids = filter_column_ids;
1324
0
                reader.reset(scalar_reader.release());
1325
0
            }
1326
116
        }
1327
119
    }
1328
132
    return Status::OK();
1329
132
}
1330
1331
void ParquetColumnReader::_generate_read_ranges(RowRange page_row_range,
1332
274
                                                RowRanges* result_ranges) const {
1333
274
    result_ranges->add(page_row_range);
1334
274
    RowRanges::ranges_intersection(*result_ranges, _row_ranges, result_ranges);
1335
274
}
1336
1337
template <bool IN_COLLECTION, bool OFFSET_INDEX>
1338
Status ScalarColumnReader<IN_COLLECTION, OFFSET_INDEX>::init(io::FileReaderSPtr file,
1339
                                                             FieldSchema* field,
1340
                                                             size_t max_buf_size,
1341
119
                                                             RuntimeState* state) {
1342
119
    _field_schema = field;
1343
119
    auto& chunk_meta = _chunk_meta.meta_data;
1344
119
    int64_t chunk_start = has_dict_page(chunk_meta) ? chunk_meta.dictionary_page_offset
1345
119
                                                    : chunk_meta.data_page_offset;
1346
119
    size_t chunk_len = chunk_meta.total_compressed_size;
1347
119
    size_t prefetch_buffer_size = std::min(chunk_len, max_buf_size);
1348
119
    if ((typeid_cast<doris::io::TracingFileReader*>(file.get()) &&
1349
119
         typeid_cast<io::MergeRangeFileReader*>(
1350
53
                 ((doris::io::TracingFileReader*)(file.get()))->inner_reader().get())) ||
1351
119
        typeid_cast<io::MergeRangeFileReader*>(file.get())) {
1352
        // turn off prefetch data when using MergeRangeFileReader
1353
119
        prefetch_buffer_size = 0;
1354
119
    }
1355
119
    _stream_reader = std::make_unique<io::BufferedFileStreamReader>(file, chunk_start, chunk_len,
1356
119
                                                                    prefetch_buffer_size);
1357
119
    ParquetPageReadContext ctx(
1358
119
            (state == nullptr) ? true : state->query_options().enable_parquet_file_page_cache);
1359
1360
119
    _chunk_reader = std::make_unique<ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>>(
1361
119
            _stream_reader.get(), &_chunk_meta, field, _offset_index, _total_rows, _io_ctx, ctx);
1362
119
    RETURN_IF_ERROR(_chunk_reader->init());
1363
119
    return Status::OK();
1364
119
}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE4initESt10shared_ptrINS_2io10FileReaderEEPNS_11FieldSchemaEmPNS_12RuntimeStateE
_ZN5doris18ScalarColumnReaderILb1ELb0EE4initESt10shared_ptrINS_2io10FileReaderEEPNS_11FieldSchemaEmPNS_12RuntimeStateE
Line
Count
Source
1341
3
                                                             RuntimeState* state) {
1342
3
    _field_schema = field;
1343
3
    auto& chunk_meta = _chunk_meta.meta_data;
1344
3
    int64_t chunk_start = has_dict_page(chunk_meta) ? chunk_meta.dictionary_page_offset
1345
3
                                                    : chunk_meta.data_page_offset;
1346
3
    size_t chunk_len = chunk_meta.total_compressed_size;
1347
3
    size_t prefetch_buffer_size = std::min(chunk_len, max_buf_size);
1348
3
    if ((typeid_cast<doris::io::TracingFileReader*>(file.get()) &&
1349
3
         typeid_cast<io::MergeRangeFileReader*>(
1350
0
                 ((doris::io::TracingFileReader*)(file.get()))->inner_reader().get())) ||
1351
3
        typeid_cast<io::MergeRangeFileReader*>(file.get())) {
1352
        // turn off prefetch data when using MergeRangeFileReader
1353
3
        prefetch_buffer_size = 0;
1354
3
    }
1355
3
    _stream_reader = std::make_unique<io::BufferedFileStreamReader>(file, chunk_start, chunk_len,
1356
3
                                                                    prefetch_buffer_size);
1357
3
    ParquetPageReadContext ctx(
1358
3
            (state == nullptr) ? true : state->query_options().enable_parquet_file_page_cache);
1359
1360
3
    _chunk_reader = std::make_unique<ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>>(
1361
3
            _stream_reader.get(), &_chunk_meta, field, _offset_index, _total_rows, _io_ctx, ctx);
1362
3
    RETURN_IF_ERROR(_chunk_reader->init());
1363
3
    return Status::OK();
1364
3
}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE4initESt10shared_ptrINS_2io10FileReaderEEPNS_11FieldSchemaEmPNS_12RuntimeStateE
_ZN5doris18ScalarColumnReaderILb0ELb0EE4initESt10shared_ptrINS_2io10FileReaderEEPNS_11FieldSchemaEmPNS_12RuntimeStateE
Line
Count
Source
1341
116
                                                             RuntimeState* state) {
1342
116
    _field_schema = field;
1343
116
    auto& chunk_meta = _chunk_meta.meta_data;
1344
116
    int64_t chunk_start = has_dict_page(chunk_meta) ? chunk_meta.dictionary_page_offset
1345
116
                                                    : chunk_meta.data_page_offset;
1346
116
    size_t chunk_len = chunk_meta.total_compressed_size;
1347
116
    size_t prefetch_buffer_size = std::min(chunk_len, max_buf_size);
1348
116
    if ((typeid_cast<doris::io::TracingFileReader*>(file.get()) &&
1349
116
         typeid_cast<io::MergeRangeFileReader*>(
1350
53
                 ((doris::io::TracingFileReader*)(file.get()))->inner_reader().get())) ||
1351
116
        typeid_cast<io::MergeRangeFileReader*>(file.get())) {
1352
        // turn off prefetch data when using MergeRangeFileReader
1353
116
        prefetch_buffer_size = 0;
1354
116
    }
1355
116
    _stream_reader = std::make_unique<io::BufferedFileStreamReader>(file, chunk_start, chunk_len,
1356
116
                                                                    prefetch_buffer_size);
1357
116
    ParquetPageReadContext ctx(
1358
116
            (state == nullptr) ? true : state->query_options().enable_parquet_file_page_cache);
1359
1360
116
    _chunk_reader = std::make_unique<ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>>(
1361
116
            _stream_reader.get(), &_chunk_meta, field, _offset_index, _total_rows, _io_ctx, ctx);
1362
116
    RETURN_IF_ERROR(_chunk_reader->init());
1363
116
    return Status::OK();
1364
116
}
1365
1366
template <bool IN_COLLECTION, bool OFFSET_INDEX>
1367
244
Status ScalarColumnReader<IN_COLLECTION, OFFSET_INDEX>::_skip_values(size_t num_values) {
1368
244
    if (num_values == 0) {
1369
142
        return Status::OK();
1370
142
    }
1371
102
    if (_chunk_reader->max_def_level() > 0) {
1372
102
        LevelDecoder& def_decoder = _chunk_reader->def_level_decoder();
1373
102
        size_t skipped = 0;
1374
102
        size_t null_size = 0;
1375
102
        size_t nonnull_size = 0;
1376
217
        while (skipped < num_values) {
1377
115
            level_t def_level = -1;
1378
115
            size_t loop_skip = def_decoder.get_next_run(&def_level, num_values - skipped);
1379
115
            if (loop_skip == 0) {
1380
0
                std::stringstream ss;
1381
0
                auto& bit_reader = def_decoder.rle_decoder().bit_reader();
1382
0
                ss << "def_decoder buffer (hex): ";
1383
0
                for (size_t i = 0; i < bit_reader.max_bytes(); ++i) {
1384
0
                    ss << std::hex << std::setw(2) << std::setfill('0')
1385
0
                       << static_cast<int>(bit_reader.buffer()[i]) << " ";
1386
0
                }
1387
0
                LOG(WARNING) << ss.str();
1388
0
                return Status::InternalError("Failed to decode definition level.");
1389
0
            }
1390
115
            if (def_level < _field_schema->definition_level) {
1391
8
                null_size += loop_skip;
1392
107
            } else {
1393
107
                nonnull_size += loop_skip;
1394
107
            }
1395
115
            skipped += loop_skip;
1396
115
        }
1397
102
        if (null_size > 0) {
1398
5
            RETURN_IF_ERROR(_chunk_reader->skip_values(null_size, false));
1399
5
        }
1400
102
        if (nonnull_size > 0) {
1401
101
            RETURN_IF_ERROR(_chunk_reader->skip_values(nonnull_size, true));
1402
101
        }
1403
102
    } else {
1404
0
        RETURN_IF_ERROR(_chunk_reader->skip_values(num_values));
1405
0
    }
1406
102
    return Status::OK();
1407
102
}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE12_skip_valuesEm
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb0EE12_skip_valuesEm
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE12_skip_valuesEm
_ZN5doris18ScalarColumnReaderILb0ELb0EE12_skip_valuesEm
Line
Count
Source
1367
244
Status ScalarColumnReader<IN_COLLECTION, OFFSET_INDEX>::_skip_values(size_t num_values) {
1368
244
    if (num_values == 0) {
1369
142
        return Status::OK();
1370
142
    }
1371
102
    if (_chunk_reader->max_def_level() > 0) {
1372
102
        LevelDecoder& def_decoder = _chunk_reader->def_level_decoder();
1373
102
        size_t skipped = 0;
1374
102
        size_t null_size = 0;
1375
102
        size_t nonnull_size = 0;
1376
217
        while (skipped < num_values) {
1377
115
            level_t def_level = -1;
1378
115
            size_t loop_skip = def_decoder.get_next_run(&def_level, num_values - skipped);
1379
115
            if (loop_skip == 0) {
1380
0
                std::stringstream ss;
1381
0
                auto& bit_reader = def_decoder.rle_decoder().bit_reader();
1382
0
                ss << "def_decoder buffer (hex): ";
1383
0
                for (size_t i = 0; i < bit_reader.max_bytes(); ++i) {
1384
0
                    ss << std::hex << std::setw(2) << std::setfill('0')
1385
0
                       << static_cast<int>(bit_reader.buffer()[i]) << " ";
1386
0
                }
1387
0
                LOG(WARNING) << ss.str();
1388
0
                return Status::InternalError("Failed to decode definition level.");
1389
0
            }
1390
115
            if (def_level < _field_schema->definition_level) {
1391
8
                null_size += loop_skip;
1392
107
            } else {
1393
107
                nonnull_size += loop_skip;
1394
107
            }
1395
115
            skipped += loop_skip;
1396
115
        }
1397
102
        if (null_size > 0) {
1398
5
            RETURN_IF_ERROR(_chunk_reader->skip_values(null_size, false));
1399
5
        }
1400
102
        if (nonnull_size > 0) {
1401
101
            RETURN_IF_ERROR(_chunk_reader->skip_values(nonnull_size, true));
1402
101
        }
1403
102
    } else {
1404
0
        RETURN_IF_ERROR(_chunk_reader->skip_values(num_values));
1405
0
    }
1406
102
    return Status::OK();
1407
102
}
1408
1409
template <bool IN_COLLECTION, bool OFFSET_INDEX>
1410
Status ScalarColumnReader<IN_COLLECTION, OFFSET_INDEX>::_read_values(size_t num_values,
1411
                                                                     ColumnPtr& doris_column,
1412
                                                                     DataTypePtr& type,
1413
                                                                     FilterMap& filter_map,
1414
244
                                                                     bool is_dict_filter) {
1415
244
    if (num_values == 0) {
1416
0
        return Status::OK();
1417
0
    }
1418
244
    MutableColumnPtr data_column;
1419
244
    std::vector<uint16_t> null_map;
1420
244
    NullMap* map_data_column = nullptr;
1421
244
    if (doris_column->is_nullable()) {
1422
242
        SCOPED_RAW_TIMER(&_decode_null_map_time);
1423
        // doris_column either originates from a mutable block in vparquet_group_reader
1424
        // or is a newly created ColumnPtr, and therefore can be modified.
1425
242
        auto* nullable_column =
1426
242
                assert_cast<ColumnNullable*>(const_cast<IColumn*>(doris_column.get()));
1427
1428
242
        data_column = nullable_column->get_nested_column_ptr();
1429
242
        map_data_column = &(nullable_column->get_null_map_data());
1430
242
        if (_chunk_reader->max_def_level() > 0) {
1431
174
            LevelDecoder& def_decoder = _chunk_reader->def_level_decoder();
1432
174
            size_t has_read = 0;
1433
174
            bool prev_is_null = true;
1434
348
            while (has_read < num_values) {
1435
174
                level_t def_level;
1436
174
                size_t loop_read = def_decoder.get_next_run(&def_level, num_values - has_read);
1437
174
                if (loop_read == 0) {
1438
0
                    std::stringstream ss;
1439
0
                    auto& bit_reader = def_decoder.rle_decoder().bit_reader();
1440
0
                    ss << "def_decoder buffer (hex): ";
1441
0
                    for (size_t i = 0; i < bit_reader.max_bytes(); ++i) {
1442
0
                        ss << std::hex << std::setw(2) << std::setfill('0')
1443
0
                           << static_cast<int>(bit_reader.buffer()[i]) << " ";
1444
0
                    }
1445
0
                    LOG(WARNING) << ss.str();
1446
0
                    return Status::InternalError("Failed to decode definition level.");
1447
0
                }
1448
1449
174
                bool is_null = def_level < _field_schema->definition_level;
1450
174
                if (!(prev_is_null ^ is_null)) {
1451
57
                    null_map.emplace_back(0);
1452
57
                }
1453
174
                size_t remaining = loop_read;
1454
174
                while (remaining > USHRT_MAX) {
1455
0
                    null_map.emplace_back(USHRT_MAX);
1456
0
                    null_map.emplace_back(0);
1457
0
                    remaining -= USHRT_MAX;
1458
0
                }
1459
174
                null_map.emplace_back((u_short)remaining);
1460
174
                prev_is_null = is_null;
1461
174
                has_read += loop_read;
1462
174
            }
1463
174
        }
1464
242
    } else {
1465
2
        if (_chunk_reader->max_def_level() > 0) {
1466
0
            return Status::Corruption("Not nullable column has null values in parquet file");
1467
0
        }
1468
2
        data_column = doris_column->assume_mutable();
1469
2
    }
1470
244
    if (null_map.size() == 0) {
1471
70
        size_t remaining = num_values;
1472
70
        while (remaining > USHRT_MAX) {
1473
0
            null_map.emplace_back(USHRT_MAX);
1474
0
            null_map.emplace_back(0);
1475
0
            remaining -= USHRT_MAX;
1476
0
        }
1477
70
        null_map.emplace_back((u_short)remaining);
1478
70
    }
1479
244
    ColumnSelectVector select_vector;
1480
244
    {
1481
244
        SCOPED_RAW_TIMER(&_decode_null_map_time);
1482
244
        RETURN_IF_ERROR(select_vector.init(null_map, num_values, map_data_column, &filter_map,
1483
244
                                           _filter_map_index));
1484
244
        _filter_map_index += num_values;
1485
244
    }
1486
0
    return _chunk_reader->decode_values(data_column, type, select_vector, is_dict_filter);
1487
244
}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE12_read_valuesEmRNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERSt10shared_ptrIKNS_9IDataTypeEERNS_9FilterMapEb
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb0EE12_read_valuesEmRNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERSt10shared_ptrIKNS_9IDataTypeEERNS_9FilterMapEb
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE12_read_valuesEmRNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERSt10shared_ptrIKNS_9IDataTypeEERNS_9FilterMapEb
_ZN5doris18ScalarColumnReaderILb0ELb0EE12_read_valuesEmRNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERSt10shared_ptrIKNS_9IDataTypeEERNS_9FilterMapEb
Line
Count
Source
1414
244
                                                                     bool is_dict_filter) {
1415
244
    if (num_values == 0) {
1416
0
        return Status::OK();
1417
0
    }
1418
244
    MutableColumnPtr data_column;
1419
244
    std::vector<uint16_t> null_map;
1420
244
    NullMap* map_data_column = nullptr;
1421
244
    if (doris_column->is_nullable()) {
1422
242
        SCOPED_RAW_TIMER(&_decode_null_map_time);
1423
        // doris_column either originates from a mutable block in vparquet_group_reader
1424
        // or is a newly created ColumnPtr, and therefore can be modified.
1425
242
        auto* nullable_column =
1426
242
                assert_cast<ColumnNullable*>(const_cast<IColumn*>(doris_column.get()));
1427
1428
242
        data_column = nullable_column->get_nested_column_ptr();
1429
242
        map_data_column = &(nullable_column->get_null_map_data());
1430
242
        if (_chunk_reader->max_def_level() > 0) {
1431
174
            LevelDecoder& def_decoder = _chunk_reader->def_level_decoder();
1432
174
            size_t has_read = 0;
1433
174
            bool prev_is_null = true;
1434
348
            while (has_read < num_values) {
1435
174
                level_t def_level;
1436
174
                size_t loop_read = def_decoder.get_next_run(&def_level, num_values - has_read);
1437
174
                if (loop_read == 0) {
1438
0
                    std::stringstream ss;
1439
0
                    auto& bit_reader = def_decoder.rle_decoder().bit_reader();
1440
0
                    ss << "def_decoder buffer (hex): ";
1441
0
                    for (size_t i = 0; i < bit_reader.max_bytes(); ++i) {
1442
0
                        ss << std::hex << std::setw(2) << std::setfill('0')
1443
0
                           << static_cast<int>(bit_reader.buffer()[i]) << " ";
1444
0
                    }
1445
0
                    LOG(WARNING) << ss.str();
1446
0
                    return Status::InternalError("Failed to decode definition level.");
1447
0
                }
1448
1449
174
                bool is_null = def_level < _field_schema->definition_level;
1450
174
                if (!(prev_is_null ^ is_null)) {
1451
57
                    null_map.emplace_back(0);
1452
57
                }
1453
174
                size_t remaining = loop_read;
1454
174
                while (remaining > USHRT_MAX) {
1455
0
                    null_map.emplace_back(USHRT_MAX);
1456
0
                    null_map.emplace_back(0);
1457
0
                    remaining -= USHRT_MAX;
1458
0
                }
1459
174
                null_map.emplace_back((u_short)remaining);
1460
174
                prev_is_null = is_null;
1461
174
                has_read += loop_read;
1462
174
            }
1463
174
        }
1464
242
    } else {
1465
2
        if (_chunk_reader->max_def_level() > 0) {
1466
0
            return Status::Corruption("Not nullable column has null values in parquet file");
1467
0
        }
1468
2
        data_column = doris_column->assume_mutable();
1469
2
    }
1470
244
    if (null_map.size() == 0) {
1471
70
        size_t remaining = num_values;
1472
70
        while (remaining > USHRT_MAX) {
1473
0
            null_map.emplace_back(USHRT_MAX);
1474
0
            null_map.emplace_back(0);
1475
0
            remaining -= USHRT_MAX;
1476
0
        }
1477
70
        null_map.emplace_back((u_short)remaining);
1478
70
    }
1479
244
    ColumnSelectVector select_vector;
1480
244
    {
1481
244
        SCOPED_RAW_TIMER(&_decode_null_map_time);
1482
244
        RETURN_IF_ERROR(select_vector.init(null_map, num_values, map_data_column, &filter_map,
1483
244
                                           _filter_map_index));
1484
244
        _filter_map_index += num_values;
1485
244
    }
1486
0
    return _chunk_reader->decode_values(data_column, type, select_vector, is_dict_filter);
1487
244
}
1488
1489
/**
1490
 * Load the nested column data of complex type.
1491
 * A row of complex type may be stored across two(or more) pages, and the parameter `align_rows` indicates that
1492
 * whether the reader should read the remaining value of the last row in previous page.
1493
 */
1494
template <bool IN_COLLECTION, bool OFFSET_INDEX>
1495
Status ScalarColumnReader<IN_COLLECTION, OFFSET_INDEX>::_read_nested_column(
1496
        ColumnPtr& doris_column, DataTypePtr& type, FilterMap& filter_map, size_t batch_size,
1497
13
        size_t* read_rows, bool* eof, bool is_dict_filter) {
1498
13
    _rep_levels.clear();
1499
13
    _def_levels.clear();
1500
1501
    // Handle nullable columns
1502
13
    MutableColumnPtr data_column;
1503
13
    NullMap* map_data_column = nullptr;
1504
13
    if (doris_column->is_nullable()) {
1505
13
        SCOPED_RAW_TIMER(&_decode_null_map_time);
1506
        // doris_column either originates from a mutable block in vparquet_group_reader
1507
        // or is a newly created ColumnPtr, and therefore can be modified.
1508
13
        auto* nullable_column =
1509
13
                const_cast<ColumnNullable*>(assert_cast<const ColumnNullable*>(doris_column.get()));
1510
13
        data_column = nullable_column->get_nested_column_ptr();
1511
13
        map_data_column = &(nullable_column->get_null_map_data());
1512
13
    } else {
1513
0
        if (_field_schema->data_type->is_nullable()) {
1514
0
            return Status::Corruption("Not nullable column has null values in parquet file");
1515
0
        }
1516
0
        data_column = doris_column->assume_mutable();
1517
0
    }
1518
1519
13
    std::vector<uint16_t> null_map;
1520
13
    std::unordered_set<size_t> ancestor_null_indices;
1521
13
    std::vector<uint8_t> nested_filter_map_data;
1522
1523
13
    auto read_and_fill_data = [&](size_t before_rep_level_sz, size_t filter_map_index) {
1524
13
        RETURN_IF_ERROR(_chunk_reader->fill_def(_def_levels));
1525
13
        std::unique_ptr<FilterMap> nested_filter_map = std::make_unique<FilterMap>();
1526
13
        if (filter_map.has_filter()) {
1527
0
            RETURN_IF_ERROR(gen_filter_map(filter_map, filter_map_index, before_rep_level_sz,
1528
0
                                           _rep_levels.size(), nested_filter_map_data,
1529
0
                                           &nested_filter_map));
1530
0
        }
1531
1532
13
        null_map.clear();
1533
13
        ancestor_null_indices.clear();
1534
13
        RETURN_IF_ERROR(gen_nested_null_map(before_rep_level_sz, _rep_levels.size(), null_map,
1535
13
                                            ancestor_null_indices));
1536
1537
13
        ColumnSelectVector select_vector;
1538
13
        {
1539
13
            SCOPED_RAW_TIMER(&_decode_null_map_time);
1540
13
            RETURN_IF_ERROR(select_vector.init(
1541
13
                    null_map,
1542
13
                    _rep_levels.size() - before_rep_level_sz - ancestor_null_indices.size(),
1543
13
                    map_data_column, nested_filter_map.get(), 0, &ancestor_null_indices));
1544
13
        }
1545
1546
13
        RETURN_IF_ERROR(
1547
13
                _chunk_reader->decode_values(data_column, type, select_vector, is_dict_filter));
1548
13
        if (ancestor_null_indices.size() != 0) {
1549
0
            RETURN_IF_ERROR(_chunk_reader->skip_values(ancestor_null_indices.size(), false));
1550
0
        }
1551
13
        if (filter_map.has_filter()) {
1552
0
            auto new_rep_sz = before_rep_level_sz;
1553
0
            for (size_t idx = before_rep_level_sz; idx < _rep_levels.size(); idx++) {
1554
0
                if (nested_filter_map_data[idx - before_rep_level_sz]) {
1555
0
                    _rep_levels[new_rep_sz] = _rep_levels[idx];
1556
0
                    _def_levels[new_rep_sz] = _def_levels[idx];
1557
0
                    new_rep_sz++;
1558
0
                }
1559
0
            }
1560
0
            _rep_levels.resize(new_rep_sz);
1561
0
            _def_levels.resize(new_rep_sz);
1562
0
        }
1563
13
        return Status::OK();
1564
13
    };
Unexecuted instantiation: _ZZN5doris18ScalarColumnReaderILb1ELb1EE19_read_nested_columnERNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERSt10shared_ptrIKNS_9IDataTypeEERNS_9FilterMapEmPmPbbENKUlmmE_clEmm
_ZZN5doris18ScalarColumnReaderILb1ELb0EE19_read_nested_columnERNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERSt10shared_ptrIKNS_9IDataTypeEERNS_9FilterMapEmPmPbbENKUlmmE_clEmm
Line
Count
Source
1523
3
    auto read_and_fill_data = [&](size_t before_rep_level_sz, size_t filter_map_index) {
1524
3
        RETURN_IF_ERROR(_chunk_reader->fill_def(_def_levels));
1525
3
        std::unique_ptr<FilterMap> nested_filter_map = std::make_unique<FilterMap>();
1526
3
        if (filter_map.has_filter()) {
1527
0
            RETURN_IF_ERROR(gen_filter_map(filter_map, filter_map_index, before_rep_level_sz,
1528
0
                                           _rep_levels.size(), nested_filter_map_data,
1529
0
                                           &nested_filter_map));
1530
0
        }
1531
1532
3
        null_map.clear();
1533
3
        ancestor_null_indices.clear();
1534
3
        RETURN_IF_ERROR(gen_nested_null_map(before_rep_level_sz, _rep_levels.size(), null_map,
1535
3
                                            ancestor_null_indices));
1536
1537
3
        ColumnSelectVector select_vector;
1538
3
        {
1539
3
            SCOPED_RAW_TIMER(&_decode_null_map_time);
1540
3
            RETURN_IF_ERROR(select_vector.init(
1541
3
                    null_map,
1542
3
                    _rep_levels.size() - before_rep_level_sz - ancestor_null_indices.size(),
1543
3
                    map_data_column, nested_filter_map.get(), 0, &ancestor_null_indices));
1544
3
        }
1545
1546
3
        RETURN_IF_ERROR(
1547
3
                _chunk_reader->decode_values(data_column, type, select_vector, is_dict_filter));
1548
3
        if (ancestor_null_indices.size() != 0) {
1549
0
            RETURN_IF_ERROR(_chunk_reader->skip_values(ancestor_null_indices.size(), false));
1550
0
        }
1551
3
        if (filter_map.has_filter()) {
1552
0
            auto new_rep_sz = before_rep_level_sz;
1553
0
            for (size_t idx = before_rep_level_sz; idx < _rep_levels.size(); idx++) {
1554
0
                if (nested_filter_map_data[idx - before_rep_level_sz]) {
1555
0
                    _rep_levels[new_rep_sz] = _rep_levels[idx];
1556
0
                    _def_levels[new_rep_sz] = _def_levels[idx];
1557
0
                    new_rep_sz++;
1558
0
                }
1559
0
            }
1560
0
            _rep_levels.resize(new_rep_sz);
1561
0
            _def_levels.resize(new_rep_sz);
1562
0
        }
1563
3
        return Status::OK();
1564
3
    };
Unexecuted instantiation: _ZZN5doris18ScalarColumnReaderILb0ELb1EE19_read_nested_columnERNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERSt10shared_ptrIKNS_9IDataTypeEERNS_9FilterMapEmPmPbbENKUlmmE_clEmm
_ZZN5doris18ScalarColumnReaderILb0ELb0EE19_read_nested_columnERNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERSt10shared_ptrIKNS_9IDataTypeEERNS_9FilterMapEmPmPbbENKUlmmE_clEmm
Line
Count
Source
1523
10
    auto read_and_fill_data = [&](size_t before_rep_level_sz, size_t filter_map_index) {
1524
10
        RETURN_IF_ERROR(_chunk_reader->fill_def(_def_levels));
1525
10
        std::unique_ptr<FilterMap> nested_filter_map = std::make_unique<FilterMap>();
1526
10
        if (filter_map.has_filter()) {
1527
0
            RETURN_IF_ERROR(gen_filter_map(filter_map, filter_map_index, before_rep_level_sz,
1528
0
                                           _rep_levels.size(), nested_filter_map_data,
1529
0
                                           &nested_filter_map));
1530
0
        }
1531
1532
10
        null_map.clear();
1533
10
        ancestor_null_indices.clear();
1534
10
        RETURN_IF_ERROR(gen_nested_null_map(before_rep_level_sz, _rep_levels.size(), null_map,
1535
10
                                            ancestor_null_indices));
1536
1537
10
        ColumnSelectVector select_vector;
1538
10
        {
1539
10
            SCOPED_RAW_TIMER(&_decode_null_map_time);
1540
10
            RETURN_IF_ERROR(select_vector.init(
1541
10
                    null_map,
1542
10
                    _rep_levels.size() - before_rep_level_sz - ancestor_null_indices.size(),
1543
10
                    map_data_column, nested_filter_map.get(), 0, &ancestor_null_indices));
1544
10
        }
1545
1546
10
        RETURN_IF_ERROR(
1547
10
                _chunk_reader->decode_values(data_column, type, select_vector, is_dict_filter));
1548
10
        if (ancestor_null_indices.size() != 0) {
1549
0
            RETURN_IF_ERROR(_chunk_reader->skip_values(ancestor_null_indices.size(), false));
1550
0
        }
1551
10
        if (filter_map.has_filter()) {
1552
0
            auto new_rep_sz = before_rep_level_sz;
1553
0
            for (size_t idx = before_rep_level_sz; idx < _rep_levels.size(); idx++) {
1554
0
                if (nested_filter_map_data[idx - before_rep_level_sz]) {
1555
0
                    _rep_levels[new_rep_sz] = _rep_levels[idx];
1556
0
                    _def_levels[new_rep_sz] = _def_levels[idx];
1557
0
                    new_rep_sz++;
1558
0
                }
1559
0
            }
1560
0
            _rep_levels.resize(new_rep_sz);
1561
0
            _def_levels.resize(new_rep_sz);
1562
0
        }
1563
10
        return Status::OK();
1564
10
    };
1565
1566
15
    while (_current_range_idx < _row_ranges.range_size()) {
1567
13
        size_t left_row =
1568
13
                std::max(_current_row_index, _row_ranges.get_range_from(_current_range_idx));
1569
13
        size_t right_row = std::min(left_row + batch_size - *read_rows,
1570
13
                                    (size_t)_row_ranges.get_range_to(_current_range_idx));
1571
13
        _current_row_index = left_row;
1572
13
        RETURN_IF_ERROR(_chunk_reader->seek_to_nested_row(left_row));
1573
13
        size_t load_rows = 0;
1574
13
        bool cross_page = false;
1575
13
        size_t before_rep_level_sz = _rep_levels.size();
1576
13
        RETURN_IF_ERROR(_chunk_reader->load_page_nested_rows(_rep_levels, right_row - left_row,
1577
13
                                                             &load_rows, &cross_page));
1578
13
        RETURN_IF_ERROR(read_and_fill_data(before_rep_level_sz, _filter_map_index));
1579
13
        _filter_map_index += load_rows;
1580
13
        while (cross_page) {
1581
0
            before_rep_level_sz = _rep_levels.size();
1582
0
            RETURN_IF_ERROR(_chunk_reader->load_cross_page_nested_row(_rep_levels, &cross_page));
1583
0
            RETURN_IF_ERROR(read_and_fill_data(before_rep_level_sz, _filter_map_index - 1));
1584
0
        }
1585
13
        *read_rows += load_rows;
1586
13
        _current_row_index += load_rows;
1587
13
        _current_range_idx += (_current_row_index == _row_ranges.get_range_to(_current_range_idx));
1588
13
        if (*read_rows == batch_size) {
1589
11
            break;
1590
11
        }
1591
13
    }
1592
13
    *eof = _current_range_idx == _row_ranges.range_size();
1593
13
    return Status::OK();
1594
13
}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE19_read_nested_columnERNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERSt10shared_ptrIKNS_9IDataTypeEERNS_9FilterMapEmPmPbb
_ZN5doris18ScalarColumnReaderILb1ELb0EE19_read_nested_columnERNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERSt10shared_ptrIKNS_9IDataTypeEERNS_9FilterMapEmPmPbb
Line
Count
Source
1497
3
        size_t* read_rows, bool* eof, bool is_dict_filter) {
1498
3
    _rep_levels.clear();
1499
3
    _def_levels.clear();
1500
1501
    // Handle nullable columns
1502
3
    MutableColumnPtr data_column;
1503
3
    NullMap* map_data_column = nullptr;
1504
3
    if (doris_column->is_nullable()) {
1505
3
        SCOPED_RAW_TIMER(&_decode_null_map_time);
1506
        // doris_column either originates from a mutable block in vparquet_group_reader
1507
        // or is a newly created ColumnPtr, and therefore can be modified.
1508
3
        auto* nullable_column =
1509
3
                const_cast<ColumnNullable*>(assert_cast<const ColumnNullable*>(doris_column.get()));
1510
3
        data_column = nullable_column->get_nested_column_ptr();
1511
3
        map_data_column = &(nullable_column->get_null_map_data());
1512
3
    } else {
1513
0
        if (_field_schema->data_type->is_nullable()) {
1514
0
            return Status::Corruption("Not nullable column has null values in parquet file");
1515
0
        }
1516
0
        data_column = doris_column->assume_mutable();
1517
0
    }
1518
1519
3
    std::vector<uint16_t> null_map;
1520
3
    std::unordered_set<size_t> ancestor_null_indices;
1521
3
    std::vector<uint8_t> nested_filter_map_data;
1522
1523
3
    auto read_and_fill_data = [&](size_t before_rep_level_sz, size_t filter_map_index) {
1524
3
        RETURN_IF_ERROR(_chunk_reader->fill_def(_def_levels));
1525
3
        std::unique_ptr<FilterMap> nested_filter_map = std::make_unique<FilterMap>();
1526
3
        if (filter_map.has_filter()) {
1527
3
            RETURN_IF_ERROR(gen_filter_map(filter_map, filter_map_index, before_rep_level_sz,
1528
3
                                           _rep_levels.size(), nested_filter_map_data,
1529
3
                                           &nested_filter_map));
1530
3
        }
1531
1532
3
        null_map.clear();
1533
3
        ancestor_null_indices.clear();
1534
3
        RETURN_IF_ERROR(gen_nested_null_map(before_rep_level_sz, _rep_levels.size(), null_map,
1535
3
                                            ancestor_null_indices));
1536
1537
3
        ColumnSelectVector select_vector;
1538
3
        {
1539
3
            SCOPED_RAW_TIMER(&_decode_null_map_time);
1540
3
            RETURN_IF_ERROR(select_vector.init(
1541
3
                    null_map,
1542
3
                    _rep_levels.size() - before_rep_level_sz - ancestor_null_indices.size(),
1543
3
                    map_data_column, nested_filter_map.get(), 0, &ancestor_null_indices));
1544
3
        }
1545
1546
3
        RETURN_IF_ERROR(
1547
3
                _chunk_reader->decode_values(data_column, type, select_vector, is_dict_filter));
1548
3
        if (ancestor_null_indices.size() != 0) {
1549
3
            RETURN_IF_ERROR(_chunk_reader->skip_values(ancestor_null_indices.size(), false));
1550
3
        }
1551
3
        if (filter_map.has_filter()) {
1552
3
            auto new_rep_sz = before_rep_level_sz;
1553
3
            for (size_t idx = before_rep_level_sz; idx < _rep_levels.size(); idx++) {
1554
3
                if (nested_filter_map_data[idx - before_rep_level_sz]) {
1555
3
                    _rep_levels[new_rep_sz] = _rep_levels[idx];
1556
3
                    _def_levels[new_rep_sz] = _def_levels[idx];
1557
3
                    new_rep_sz++;
1558
3
                }
1559
3
            }
1560
3
            _rep_levels.resize(new_rep_sz);
1561
3
            _def_levels.resize(new_rep_sz);
1562
3
        }
1563
3
        return Status::OK();
1564
3
    };
1565
1566
3
    while (_current_range_idx < _row_ranges.range_size()) {
1567
3
        size_t left_row =
1568
3
                std::max(_current_row_index, _row_ranges.get_range_from(_current_range_idx));
1569
3
        size_t right_row = std::min(left_row + batch_size - *read_rows,
1570
3
                                    (size_t)_row_ranges.get_range_to(_current_range_idx));
1571
3
        _current_row_index = left_row;
1572
3
        RETURN_IF_ERROR(_chunk_reader->seek_to_nested_row(left_row));
1573
3
        size_t load_rows = 0;
1574
3
        bool cross_page = false;
1575
3
        size_t before_rep_level_sz = _rep_levels.size();
1576
3
        RETURN_IF_ERROR(_chunk_reader->load_page_nested_rows(_rep_levels, right_row - left_row,
1577
3
                                                             &load_rows, &cross_page));
1578
3
        RETURN_IF_ERROR(read_and_fill_data(before_rep_level_sz, _filter_map_index));
1579
3
        _filter_map_index += load_rows;
1580
3
        while (cross_page) {
1581
0
            before_rep_level_sz = _rep_levels.size();
1582
0
            RETURN_IF_ERROR(_chunk_reader->load_cross_page_nested_row(_rep_levels, &cross_page));
1583
0
            RETURN_IF_ERROR(read_and_fill_data(before_rep_level_sz, _filter_map_index - 1));
1584
0
        }
1585
3
        *read_rows += load_rows;
1586
3
        _current_row_index += load_rows;
1587
3
        _current_range_idx += (_current_row_index == _row_ranges.get_range_to(_current_range_idx));
1588
3
        if (*read_rows == batch_size) {
1589
3
            break;
1590
3
        }
1591
3
    }
1592
3
    *eof = _current_range_idx == _row_ranges.range_size();
1593
3
    return Status::OK();
1594
3
}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE19_read_nested_columnERNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERSt10shared_ptrIKNS_9IDataTypeEERNS_9FilterMapEmPmPbb
_ZN5doris18ScalarColumnReaderILb0ELb0EE19_read_nested_columnERNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERSt10shared_ptrIKNS_9IDataTypeEERNS_9FilterMapEmPmPbb
Line
Count
Source
1497
10
        size_t* read_rows, bool* eof, bool is_dict_filter) {
1498
10
    _rep_levels.clear();
1499
10
    _def_levels.clear();
1500
1501
    // Handle nullable columns
1502
10
    MutableColumnPtr data_column;
1503
10
    NullMap* map_data_column = nullptr;
1504
10
    if (doris_column->is_nullable()) {
1505
10
        SCOPED_RAW_TIMER(&_decode_null_map_time);
1506
        // doris_column either originates from a mutable block in vparquet_group_reader
1507
        // or is a newly created ColumnPtr, and therefore can be modified.
1508
10
        auto* nullable_column =
1509
10
                const_cast<ColumnNullable*>(assert_cast<const ColumnNullable*>(doris_column.get()));
1510
10
        data_column = nullable_column->get_nested_column_ptr();
1511
10
        map_data_column = &(nullable_column->get_null_map_data());
1512
10
    } else {
1513
0
        if (_field_schema->data_type->is_nullable()) {
1514
0
            return Status::Corruption("Not nullable column has null values in parquet file");
1515
0
        }
1516
0
        data_column = doris_column->assume_mutable();
1517
0
    }
1518
1519
10
    std::vector<uint16_t> null_map;
1520
10
    std::unordered_set<size_t> ancestor_null_indices;
1521
10
    std::vector<uint8_t> nested_filter_map_data;
1522
1523
10
    auto read_and_fill_data = [&](size_t before_rep_level_sz, size_t filter_map_index) {
1524
10
        RETURN_IF_ERROR(_chunk_reader->fill_def(_def_levels));
1525
10
        std::unique_ptr<FilterMap> nested_filter_map = std::make_unique<FilterMap>();
1526
10
        if (filter_map.has_filter()) {
1527
10
            RETURN_IF_ERROR(gen_filter_map(filter_map, filter_map_index, before_rep_level_sz,
1528
10
                                           _rep_levels.size(), nested_filter_map_data,
1529
10
                                           &nested_filter_map));
1530
10
        }
1531
1532
10
        null_map.clear();
1533
10
        ancestor_null_indices.clear();
1534
10
        RETURN_IF_ERROR(gen_nested_null_map(before_rep_level_sz, _rep_levels.size(), null_map,
1535
10
                                            ancestor_null_indices));
1536
1537
10
        ColumnSelectVector select_vector;
1538
10
        {
1539
10
            SCOPED_RAW_TIMER(&_decode_null_map_time);
1540
10
            RETURN_IF_ERROR(select_vector.init(
1541
10
                    null_map,
1542
10
                    _rep_levels.size() - before_rep_level_sz - ancestor_null_indices.size(),
1543
10
                    map_data_column, nested_filter_map.get(), 0, &ancestor_null_indices));
1544
10
        }
1545
1546
10
        RETURN_IF_ERROR(
1547
10
                _chunk_reader->decode_values(data_column, type, select_vector, is_dict_filter));
1548
10
        if (ancestor_null_indices.size() != 0) {
1549
10
            RETURN_IF_ERROR(_chunk_reader->skip_values(ancestor_null_indices.size(), false));
1550
10
        }
1551
10
        if (filter_map.has_filter()) {
1552
10
            auto new_rep_sz = before_rep_level_sz;
1553
10
            for (size_t idx = before_rep_level_sz; idx < _rep_levels.size(); idx++) {
1554
10
                if (nested_filter_map_data[idx - before_rep_level_sz]) {
1555
10
                    _rep_levels[new_rep_sz] = _rep_levels[idx];
1556
10
                    _def_levels[new_rep_sz] = _def_levels[idx];
1557
10
                    new_rep_sz++;
1558
10
                }
1559
10
            }
1560
10
            _rep_levels.resize(new_rep_sz);
1561
10
            _def_levels.resize(new_rep_sz);
1562
10
        }
1563
10
        return Status::OK();
1564
10
    };
1565
1566
12
    while (_current_range_idx < _row_ranges.range_size()) {
1567
10
        size_t left_row =
1568
10
                std::max(_current_row_index, _row_ranges.get_range_from(_current_range_idx));
1569
10
        size_t right_row = std::min(left_row + batch_size - *read_rows,
1570
10
                                    (size_t)_row_ranges.get_range_to(_current_range_idx));
1571
10
        _current_row_index = left_row;
1572
10
        RETURN_IF_ERROR(_chunk_reader->seek_to_nested_row(left_row));
1573
10
        size_t load_rows = 0;
1574
10
        bool cross_page = false;
1575
10
        size_t before_rep_level_sz = _rep_levels.size();
1576
10
        RETURN_IF_ERROR(_chunk_reader->load_page_nested_rows(_rep_levels, right_row - left_row,
1577
10
                                                             &load_rows, &cross_page));
1578
10
        RETURN_IF_ERROR(read_and_fill_data(before_rep_level_sz, _filter_map_index));
1579
10
        _filter_map_index += load_rows;
1580
10
        while (cross_page) {
1581
0
            before_rep_level_sz = _rep_levels.size();
1582
0
            RETURN_IF_ERROR(_chunk_reader->load_cross_page_nested_row(_rep_levels, &cross_page));
1583
0
            RETURN_IF_ERROR(read_and_fill_data(before_rep_level_sz, _filter_map_index - 1));
1584
0
        }
1585
10
        *read_rows += load_rows;
1586
10
        _current_row_index += load_rows;
1587
10
        _current_range_idx += (_current_row_index == _row_ranges.get_range_to(_current_range_idx));
1588
10
        if (*read_rows == batch_size) {
1589
8
            break;
1590
8
        }
1591
10
    }
1592
10
    *eof = _current_range_idx == _row_ranges.range_size();
1593
10
    return Status::OK();
1594
10
}
1595
1596
template <bool IN_COLLECTION, bool OFFSET_INDEX>
1597
Status ScalarColumnReader<IN_COLLECTION, OFFSET_INDEX>::read_dict_values_to_column(
1598
2
        MutableColumnPtr& doris_column, bool* has_dict) {
1599
2
    bool loaded;
1600
2
    RETURN_IF_ERROR(_try_load_dict_page(&loaded, has_dict));
1601
2
    if (loaded && *has_dict) {
1602
2
        return _chunk_reader->read_dict_values_to_column(doris_column);
1603
2
    }
1604
0
    return Status::OK();
1605
2
}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE26read_dict_values_to_columnERNS_3COWINS_7IColumnEE11mutable_ptrIS3_EEPb
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb0EE26read_dict_values_to_columnERNS_3COWINS_7IColumnEE11mutable_ptrIS3_EEPb
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE26read_dict_values_to_columnERNS_3COWINS_7IColumnEE11mutable_ptrIS3_EEPb
_ZN5doris18ScalarColumnReaderILb0ELb0EE26read_dict_values_to_columnERNS_3COWINS_7IColumnEE11mutable_ptrIS3_EEPb
Line
Count
Source
1598
2
        MutableColumnPtr& doris_column, bool* has_dict) {
1599
2
    bool loaded;
1600
2
    RETURN_IF_ERROR(_try_load_dict_page(&loaded, has_dict));
1601
2
    if (loaded && *has_dict) {
1602
2
        return _chunk_reader->read_dict_values_to_column(doris_column);
1603
2
    }
1604
0
    return Status::OK();
1605
2
}
1606
template <bool IN_COLLECTION, bool OFFSET_INDEX>
1607
Result<MutableColumnPtr>
1608
ScalarColumnReader<IN_COLLECTION, OFFSET_INDEX>::convert_dict_column_to_string_column(
1609
0
        const ColumnInt32* dict_column) {
1610
0
    return _chunk_reader->convert_dict_column_to_string_column(dict_column);
1611
0
}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE36convert_dict_column_to_string_columnEPKNS_12ColumnVectorILNS_13PrimitiveTypeE5EEE
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb0EE36convert_dict_column_to_string_columnEPKNS_12ColumnVectorILNS_13PrimitiveTypeE5EEE
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE36convert_dict_column_to_string_columnEPKNS_12ColumnVectorILNS_13PrimitiveTypeE5EEE
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb0EE36convert_dict_column_to_string_columnEPKNS_12ColumnVectorILNS_13PrimitiveTypeE5EEE
1612
1613
template <bool IN_COLLECTION, bool OFFSET_INDEX>
1614
Status ScalarColumnReader<IN_COLLECTION, OFFSET_INDEX>::_try_load_dict_page(bool* loaded,
1615
2
                                                                            bool* has_dict) {
1616
    // _chunk_reader init will load first page header to check whether has dict page
1617
2
    *loaded = true;
1618
2
    *has_dict = _chunk_reader->has_dict();
1619
2
    return Status::OK();
1620
2
}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE19_try_load_dict_pageEPbS2_
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb0EE19_try_load_dict_pageEPbS2_
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE19_try_load_dict_pageEPbS2_
_ZN5doris18ScalarColumnReaderILb0ELb0EE19_try_load_dict_pageEPbS2_
Line
Count
Source
1615
2
                                                                            bool* has_dict) {
1616
    // _chunk_reader init will load first page header to check whether has dict page
1617
2
    *loaded = true;
1618
2
    *has_dict = _chunk_reader->has_dict();
1619
2
    return Status::OK();
1620
2
}
1621
1622
template <bool IN_COLLECTION, bool OFFSET_INDEX>
1623
Status ScalarColumnReader<IN_COLLECTION, OFFSET_INDEX>::read_column_data(
1624
        ColumnPtr& doris_column, const DataTypePtr& type,
1625
        const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, FilterMap& filter_map,
1626
        size_t batch_size, size_t* read_rows, bool* eof, bool is_dict_filter,
1627
287
        int64_t real_column_size) {
1628
287
    if (_converter == nullptr) {
1629
114
        _converter = parquet::PhysicalToLogicalConverter::get_converter(
1630
114
                _field_schema, _field_schema->data_type, type, _ctz, is_dict_filter);
1631
114
        if (!_converter->support()) {
1632
0
            return Status::InternalError(
1633
0
                    "The column type of '{}' is not supported: {}, is_dict_filter: {}, "
1634
0
                    "src_logical_type: {}, dst_logical_type: {}",
1635
0
                    _field_schema->name, _converter->get_error_msg(), is_dict_filter,
1636
0
                    _field_schema->data_type->get_name(), type->get_name());
1637
0
        }
1638
114
    }
1639
    // !FIXME: We should verify whether the get_physical_column logic is correct, why do we return a doris_column?
1640
287
    ColumnPtr resolved_column =
1641
287
            _converter->get_physical_column(_field_schema->physical_type, _field_schema->data_type,
1642
287
                                            doris_column, type, is_dict_filter);
1643
287
    DataTypePtr& resolved_type = _converter->get_physical_type();
1644
1645
287
    _def_levels.clear();
1646
287
    _rep_levels.clear();
1647
287
    *read_rows = 0;
1648
1649
287
    if (_in_nested) {
1650
13
        RETURN_IF_ERROR(_read_nested_column(resolved_column, resolved_type, filter_map, batch_size,
1651
13
                                            read_rows, eof, is_dict_filter));
1652
13
        return _converter->convert(resolved_column, _field_schema->data_type, type, doris_column,
1653
13
                                   is_dict_filter);
1654
13
    }
1655
1656
274
    int64_t right_row = 0;
1657
274
    if constexpr (OFFSET_INDEX == false) {
1658
274
        RETURN_IF_ERROR(_chunk_reader->parse_page_header());
1659
274
        right_row = _chunk_reader->page_end_row();
1660
274
    } else {
1661
0
        right_row = _chunk_reader->page_end_row();
1662
0
    }
1663
1664
274
    do {
1665
        // generate the row ranges that should be read
1666
274
        RowRanges read_ranges;
1667
274
        _generate_read_ranges(RowRange {_current_row_index, right_row}, &read_ranges);
1668
274
        if (read_ranges.count() == 0) {
1669
            // skip the whole page
1670
63
            _current_row_index = right_row;
1671
211
        } else {
1672
211
            bool skip_whole_batch = false;
1673
            // Determining whether to skip page or batch will increase the calculation time.
1674
            // When the filtering effect is greater than 60%, it is possible to skip the page or batch.
1675
211
            if (filter_map.has_filter() && filter_map.filter_ratio() > 0.6) {
1676
                // lazy read
1677
0
                size_t remaining_num_values = read_ranges.count();
1678
0
                if (batch_size >= remaining_num_values &&
1679
0
                    filter_map.can_filter_all(remaining_num_values, _filter_map_index)) {
1680
                    // We can skip the whole page if the remaining values are filtered by predicate columns
1681
0
                    _filter_map_index += remaining_num_values;
1682
0
                    _current_row_index = right_row;
1683
0
                    *read_rows = remaining_num_values;
1684
0
                    break;
1685
0
                }
1686
0
                skip_whole_batch = batch_size <= remaining_num_values &&
1687
0
                                   filter_map.can_filter_all(batch_size, _filter_map_index);
1688
0
                if (skip_whole_batch) {
1689
0
                    _filter_map_index += batch_size;
1690
0
                }
1691
0
            }
1692
            // load page data to decode or skip values
1693
211
            RETURN_IF_ERROR(_chunk_reader->parse_page_header());
1694
211
            RETURN_IF_ERROR(_chunk_reader->load_page_data_idempotent());
1695
211
            size_t has_read = 0;
1696
344
            for (size_t idx = 0; idx < read_ranges.range_size(); idx++) {
1697
244
                auto range = read_ranges.get_range(idx);
1698
                // generate the skipped values
1699
244
                size_t skip_values = range.from() - _current_row_index;
1700
244
                RETURN_IF_ERROR(_skip_values(skip_values));
1701
244
                _current_row_index += skip_values;
1702
                // generate the read values
1703
244
                size_t read_values =
1704
244
                        std::min((size_t)(range.to() - range.from()), batch_size - has_read);
1705
244
                if (skip_whole_batch) {
1706
0
                    RETURN_IF_ERROR(_skip_values(read_values));
1707
244
                } else {
1708
244
                    RETURN_IF_ERROR(_read_values(read_values, resolved_column, resolved_type,
1709
244
                                                 filter_map, is_dict_filter));
1710
244
                }
1711
244
                has_read += read_values;
1712
244
                *read_rows += read_values;
1713
244
                _current_row_index += read_values;
1714
244
                if (has_read == batch_size) {
1715
111
                    break;
1716
111
                }
1717
244
            }
1718
211
        }
1719
274
    } while (false);
1720
1721
274
    if (right_row == _current_row_index) {
1722
101
        if (!_chunk_reader->has_next_page()) {
1723
101
            *eof = true;
1724
101
        } else {
1725
0
            RETURN_IF_ERROR(_chunk_reader->next_page());
1726
0
        }
1727
101
    }
1728
1729
274
    {
1730
274
        SCOPED_RAW_TIMER(&_convert_time);
1731
274
        RETURN_IF_ERROR(_converter->convert(resolved_column, _field_schema->data_type, type,
1732
274
                                            doris_column, is_dict_filter));
1733
274
    }
1734
274
    return Status::OK();
1735
274
}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE16read_column_dataERNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERKSt10shared_ptrIKNS_9IDataTypeEERKS8_INS_23TableSchemaChangeHelper4NodeEERNS_9FilterMapEmPmPbbl
_ZN5doris18ScalarColumnReaderILb1ELb0EE16read_column_dataERNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERKSt10shared_ptrIKNS_9IDataTypeEERKS8_INS_23TableSchemaChangeHelper4NodeEERNS_9FilterMapEmPmPbbl
Line
Count
Source
1627
3
        int64_t real_column_size) {
1628
3
    if (_converter == nullptr) {
1629
3
        _converter = parquet::PhysicalToLogicalConverter::get_converter(
1630
3
                _field_schema, _field_schema->data_type, type, _ctz, is_dict_filter);
1631
3
        if (!_converter->support()) {
1632
0
            return Status::InternalError(
1633
0
                    "The column type of '{}' is not supported: {}, is_dict_filter: {}, "
1634
0
                    "src_logical_type: {}, dst_logical_type: {}",
1635
0
                    _field_schema->name, _converter->get_error_msg(), is_dict_filter,
1636
0
                    _field_schema->data_type->get_name(), type->get_name());
1637
0
        }
1638
3
    }
1639
    // !FIXME: We should verify whether the get_physical_column logic is correct, why do we return a doris_column?
1640
3
    ColumnPtr resolved_column =
1641
3
            _converter->get_physical_column(_field_schema->physical_type, _field_schema->data_type,
1642
3
                                            doris_column, type, is_dict_filter);
1643
3
    DataTypePtr& resolved_type = _converter->get_physical_type();
1644
1645
3
    _def_levels.clear();
1646
3
    _rep_levels.clear();
1647
3
    *read_rows = 0;
1648
1649
3
    if (_in_nested) {
1650
3
        RETURN_IF_ERROR(_read_nested_column(resolved_column, resolved_type, filter_map, batch_size,
1651
3
                                            read_rows, eof, is_dict_filter));
1652
3
        return _converter->convert(resolved_column, _field_schema->data_type, type, doris_column,
1653
3
                                   is_dict_filter);
1654
3
    }
1655
1656
0
    int64_t right_row = 0;
1657
0
    if constexpr (OFFSET_INDEX == false) {
1658
0
        RETURN_IF_ERROR(_chunk_reader->parse_page_header());
1659
0
        right_row = _chunk_reader->page_end_row();
1660
    } else {
1661
        right_row = _chunk_reader->page_end_row();
1662
    }
1663
1664
0
    do {
1665
        // generate the row ranges that should be read
1666
0
        RowRanges read_ranges;
1667
0
        _generate_read_ranges(RowRange {_current_row_index, right_row}, &read_ranges);
1668
0
        if (read_ranges.count() == 0) {
1669
            // skip the whole page
1670
0
            _current_row_index = right_row;
1671
0
        } else {
1672
0
            bool skip_whole_batch = false;
1673
            // Determining whether to skip page or batch will increase the calculation time.
1674
            // When the filtering effect is greater than 60%, it is possible to skip the page or batch.
1675
0
            if (filter_map.has_filter() && filter_map.filter_ratio() > 0.6) {
1676
                // lazy read
1677
0
                size_t remaining_num_values = read_ranges.count();
1678
0
                if (batch_size >= remaining_num_values &&
1679
0
                    filter_map.can_filter_all(remaining_num_values, _filter_map_index)) {
1680
                    // We can skip the whole page if the remaining values are filtered by predicate columns
1681
0
                    _filter_map_index += remaining_num_values;
1682
0
                    _current_row_index = right_row;
1683
0
                    *read_rows = remaining_num_values;
1684
0
                    break;
1685
0
                }
1686
0
                skip_whole_batch = batch_size <= remaining_num_values &&
1687
0
                                   filter_map.can_filter_all(batch_size, _filter_map_index);
1688
0
                if (skip_whole_batch) {
1689
0
                    _filter_map_index += batch_size;
1690
0
                }
1691
0
            }
1692
            // load page data to decode or skip values
1693
0
            RETURN_IF_ERROR(_chunk_reader->parse_page_header());
1694
0
            RETURN_IF_ERROR(_chunk_reader->load_page_data_idempotent());
1695
0
            size_t has_read = 0;
1696
0
            for (size_t idx = 0; idx < read_ranges.range_size(); idx++) {
1697
0
                auto range = read_ranges.get_range(idx);
1698
                // generate the skipped values
1699
0
                size_t skip_values = range.from() - _current_row_index;
1700
0
                RETURN_IF_ERROR(_skip_values(skip_values));
1701
0
                _current_row_index += skip_values;
1702
                // generate the read values
1703
0
                size_t read_values =
1704
0
                        std::min((size_t)(range.to() - range.from()), batch_size - has_read);
1705
0
                if (skip_whole_batch) {
1706
0
                    RETURN_IF_ERROR(_skip_values(read_values));
1707
0
                } else {
1708
0
                    RETURN_IF_ERROR(_read_values(read_values, resolved_column, resolved_type,
1709
0
                                                 filter_map, is_dict_filter));
1710
0
                }
1711
0
                has_read += read_values;
1712
0
                *read_rows += read_values;
1713
0
                _current_row_index += read_values;
1714
0
                if (has_read == batch_size) {
1715
0
                    break;
1716
0
                }
1717
0
            }
1718
0
        }
1719
0
    } while (false);
1720
1721
0
    if (right_row == _current_row_index) {
1722
0
        if (!_chunk_reader->has_next_page()) {
1723
0
            *eof = true;
1724
0
        } else {
1725
0
            RETURN_IF_ERROR(_chunk_reader->next_page());
1726
0
        }
1727
0
    }
1728
1729
0
    {
1730
0
        SCOPED_RAW_TIMER(&_convert_time);
1731
0
        RETURN_IF_ERROR(_converter->convert(resolved_column, _field_schema->data_type, type,
1732
0
                                            doris_column, is_dict_filter));
1733
0
    }
1734
0
    return Status::OK();
1735
0
}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE16read_column_dataERNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERKSt10shared_ptrIKNS_9IDataTypeEERKS8_INS_23TableSchemaChangeHelper4NodeEERNS_9FilterMapEmPmPbbl
_ZN5doris18ScalarColumnReaderILb0ELb0EE16read_column_dataERNS_3COWINS_7IColumnEE13immutable_ptrIS3_EERKSt10shared_ptrIKNS_9IDataTypeEERKS8_INS_23TableSchemaChangeHelper4NodeEERNS_9FilterMapEmPmPbbl
Line
Count
Source
1627
284
        int64_t real_column_size) {
1628
284
    if (_converter == nullptr) {
1629
111
        _converter = parquet::PhysicalToLogicalConverter::get_converter(
1630
111
                _field_schema, _field_schema->data_type, type, _ctz, is_dict_filter);
1631
111
        if (!_converter->support()) {
1632
0
            return Status::InternalError(
1633
0
                    "The column type of '{}' is not supported: {}, is_dict_filter: {}, "
1634
0
                    "src_logical_type: {}, dst_logical_type: {}",
1635
0
                    _field_schema->name, _converter->get_error_msg(), is_dict_filter,
1636
0
                    _field_schema->data_type->get_name(), type->get_name());
1637
0
        }
1638
111
    }
1639
    // !FIXME: We should verify whether the get_physical_column logic is correct, why do we return a doris_column?
1640
284
    ColumnPtr resolved_column =
1641
284
            _converter->get_physical_column(_field_schema->physical_type, _field_schema->data_type,
1642
284
                                            doris_column, type, is_dict_filter);
1643
284
    DataTypePtr& resolved_type = _converter->get_physical_type();
1644
1645
284
    _def_levels.clear();
1646
284
    _rep_levels.clear();
1647
284
    *read_rows = 0;
1648
1649
284
    if (_in_nested) {
1650
10
        RETURN_IF_ERROR(_read_nested_column(resolved_column, resolved_type, filter_map, batch_size,
1651
10
                                            read_rows, eof, is_dict_filter));
1652
10
        return _converter->convert(resolved_column, _field_schema->data_type, type, doris_column,
1653
10
                                   is_dict_filter);
1654
10
    }
1655
1656
274
    int64_t right_row = 0;
1657
274
    if constexpr (OFFSET_INDEX == false) {
1658
274
        RETURN_IF_ERROR(_chunk_reader->parse_page_header());
1659
274
        right_row = _chunk_reader->page_end_row();
1660
    } else {
1661
        right_row = _chunk_reader->page_end_row();
1662
    }
1663
1664
274
    do {
1665
        // generate the row ranges that should be read
1666
274
        RowRanges read_ranges;
1667
274
        _generate_read_ranges(RowRange {_current_row_index, right_row}, &read_ranges);
1668
274
        if (read_ranges.count() == 0) {
1669
            // skip the whole page
1670
63
            _current_row_index = right_row;
1671
211
        } else {
1672
211
            bool skip_whole_batch = false;
1673
            // Determining whether to skip page or batch will increase the calculation time.
1674
            // When the filtering effect is greater than 60%, it is possible to skip the page or batch.
1675
211
            if (filter_map.has_filter() && filter_map.filter_ratio() > 0.6) {
1676
                // lazy read
1677
0
                size_t remaining_num_values = read_ranges.count();
1678
0
                if (batch_size >= remaining_num_values &&
1679
0
                    filter_map.can_filter_all(remaining_num_values, _filter_map_index)) {
1680
                    // We can skip the whole page if the remaining values are filtered by predicate columns
1681
0
                    _filter_map_index += remaining_num_values;
1682
0
                    _current_row_index = right_row;
1683
0
                    *read_rows = remaining_num_values;
1684
0
                    break;
1685
0
                }
1686
0
                skip_whole_batch = batch_size <= remaining_num_values &&
1687
0
                                   filter_map.can_filter_all(batch_size, _filter_map_index);
1688
0
                if (skip_whole_batch) {
1689
0
                    _filter_map_index += batch_size;
1690
0
                }
1691
0
            }
1692
            // load page data to decode or skip values
1693
211
            RETURN_IF_ERROR(_chunk_reader->parse_page_header());
1694
211
            RETURN_IF_ERROR(_chunk_reader->load_page_data_idempotent());
1695
211
            size_t has_read = 0;
1696
344
            for (size_t idx = 0; idx < read_ranges.range_size(); idx++) {
1697
244
                auto range = read_ranges.get_range(idx);
1698
                // generate the skipped values
1699
244
                size_t skip_values = range.from() - _current_row_index;
1700
244
                RETURN_IF_ERROR(_skip_values(skip_values));
1701
244
                _current_row_index += skip_values;
1702
                // generate the read values
1703
244
                size_t read_values =
1704
244
                        std::min((size_t)(range.to() - range.from()), batch_size - has_read);
1705
244
                if (skip_whole_batch) {
1706
0
                    RETURN_IF_ERROR(_skip_values(read_values));
1707
244
                } else {
1708
244
                    RETURN_IF_ERROR(_read_values(read_values, resolved_column, resolved_type,
1709
244
                                                 filter_map, is_dict_filter));
1710
244
                }
1711
244
                has_read += read_values;
1712
244
                *read_rows += read_values;
1713
244
                _current_row_index += read_values;
1714
244
                if (has_read == batch_size) {
1715
111
                    break;
1716
111
                }
1717
244
            }
1718
211
        }
1719
274
    } while (false);
1720
1721
274
    if (right_row == _current_row_index) {
1722
101
        if (!_chunk_reader->has_next_page()) {
1723
101
            *eof = true;
1724
101
        } else {
1725
0
            RETURN_IF_ERROR(_chunk_reader->next_page());
1726
0
        }
1727
101
    }
1728
1729
274
    {
1730
274
        SCOPED_RAW_TIMER(&_convert_time);
1731
274
        RETURN_IF_ERROR(_converter->convert(resolved_column, _field_schema->data_type, type,
1732
274
                                            doris_column, is_dict_filter));
1733
274
    }
1734
274
    return Status::OK();
1735
274
}
1736
1737
Status ArrayColumnReader::init(std::unique_ptr<ParquetColumnReader> element_reader,
1738
2
                               FieldSchema* field) {
1739
2
    _field_schema = field;
1740
2
    _element_reader = std::move(element_reader);
1741
2
    return Status::OK();
1742
2
}
1743
1744
Status ArrayColumnReader::read_column_data(
1745
        ColumnPtr& doris_column, const DataTypePtr& type,
1746
        const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, FilterMap& filter_map,
1747
        size_t batch_size, size_t* read_rows, bool* eof, bool is_dict_filter,
1748
2
        int64_t real_column_size) {
1749
2
    MutableColumnPtr data_column;
1750
2
    NullMap* null_map_ptr = nullptr;
1751
2
    if (doris_column->is_nullable()) {
1752
2
        auto mutable_column = doris_column->assume_mutable();
1753
2
        auto* nullable_column = assert_cast<ColumnNullable*>(mutable_column.get());
1754
2
        null_map_ptr = &nullable_column->get_null_map_data();
1755
2
        data_column = nullable_column->get_nested_column_ptr();
1756
2
    } else {
1757
0
        if (_field_schema->data_type->is_nullable()) {
1758
0
            return Status::Corruption("Not nullable column has null values in parquet file");
1759
0
        }
1760
0
        data_column = doris_column->assume_mutable();
1761
0
    }
1762
2
    if (type->get_primitive_type() != PrimitiveType::TYPE_ARRAY) {
1763
0
        return Status::Corruption(
1764
0
                "Wrong data type for column '{}', expected Array type, actual type: {}.",
1765
0
                _field_schema->name, type->get_name());
1766
0
    }
1767
1768
2
    ColumnPtr& element_column = assert_cast<ColumnArray&>(*data_column).get_data_ptr();
1769
2
    const DataTypePtr& element_type =
1770
2
            (assert_cast<const DataTypeArray*>(remove_nullable(type).get()))->get_nested_type();
1771
    // read nested column
1772
2
    RETURN_IF_ERROR(_element_reader->read_column_data(element_column, element_type,
1773
2
                                                      root_node->get_element_node(), filter_map,
1774
2
                                                      batch_size, read_rows, eof, is_dict_filter));
1775
2
    if (*read_rows == 0) {
1776
0
        return Status::OK();
1777
0
    }
1778
1779
2
    ColumnArray::Offsets64& offsets_data = assert_cast<ColumnArray&>(*data_column).get_offsets();
1780
    // fill offset and null map
1781
2
    fill_array_offset(_field_schema, offsets_data, null_map_ptr, _element_reader->get_rep_level(),
1782
2
                      _element_reader->get_def_level());
1783
2
    DCHECK_EQ(element_column->size(), offsets_data.back());
1784
2
#ifndef NDEBUG
1785
2
    doris_column->sanity_check();
1786
2
#endif
1787
2
    return Status::OK();
1788
2
}
1789
1790
Status MapColumnReader::init(std::unique_ptr<ParquetColumnReader> key_reader,
1791
                             std::unique_ptr<ParquetColumnReader> value_reader,
1792
0
                             FieldSchema* field) {
1793
0
    _field_schema = field;
1794
0
    _key_reader = std::move(key_reader);
1795
0
    _value_reader = std::move(value_reader);
1796
0
    return Status::OK();
1797
0
}
1798
1799
Status MapColumnReader::read_column_data(
1800
        ColumnPtr& doris_column, const DataTypePtr& type,
1801
        const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, FilterMap& filter_map,
1802
        size_t batch_size, size_t* read_rows, bool* eof, bool is_dict_filter,
1803
0
        int64_t real_column_size) {
1804
0
    MutableColumnPtr data_column;
1805
0
    NullMap* null_map_ptr = nullptr;
1806
0
    if (doris_column->is_nullable()) {
1807
0
        auto mutable_column = doris_column->assume_mutable();
1808
0
        auto* nullable_column = assert_cast<ColumnNullable*>(mutable_column.get());
1809
0
        null_map_ptr = &nullable_column->get_null_map_data();
1810
0
        data_column = nullable_column->get_nested_column_ptr();
1811
0
    } else {
1812
0
        if (_field_schema->data_type->is_nullable()) {
1813
0
            return Status::Corruption("Not nullable column has null values in parquet file");
1814
0
        }
1815
0
        data_column = doris_column->assume_mutable();
1816
0
    }
1817
0
    if (remove_nullable(type)->get_primitive_type() != PrimitiveType::TYPE_MAP) {
1818
0
        return Status::Corruption(
1819
0
                "Wrong data type for column '{}', expected Map type, actual type id {}.",
1820
0
                _field_schema->name, type->get_name());
1821
0
    }
1822
1823
0
    auto& map = assert_cast<ColumnMap&>(*data_column);
1824
0
    const DataTypePtr& key_type =
1825
0
            assert_cast<const DataTypeMap*>(remove_nullable(type).get())->get_key_type();
1826
0
    const DataTypePtr& value_type =
1827
0
            assert_cast<const DataTypeMap*>(remove_nullable(type).get())->get_value_type();
1828
0
    ColumnPtr& key_column = map.get_keys_ptr();
1829
0
    ColumnPtr& value_column = map.get_values_ptr();
1830
1831
0
    size_t key_rows = 0;
1832
0
    size_t value_rows = 0;
1833
0
    bool key_eof = false;
1834
0
    bool value_eof = false;
1835
0
    int64_t orig_col_column_size = key_column->size();
1836
1837
0
    RETURN_IF_ERROR(_key_reader->read_column_data(key_column, key_type, root_node->get_key_node(),
1838
0
                                                  filter_map, batch_size, &key_rows, &key_eof,
1839
0
                                                  is_dict_filter));
1840
1841
0
    while (value_rows < key_rows && !value_eof) {
1842
0
        size_t loop_rows = 0;
1843
0
        RETURN_IF_ERROR(_value_reader->read_column_data(
1844
0
                value_column, value_type, root_node->get_value_node(), filter_map,
1845
0
                key_rows - value_rows, &loop_rows, &value_eof, is_dict_filter,
1846
0
                key_column->size() - orig_col_column_size));
1847
0
        value_rows += loop_rows;
1848
0
    }
1849
0
    DCHECK_EQ(key_rows, value_rows);
1850
0
    *read_rows = key_rows;
1851
0
    *eof = key_eof;
1852
1853
0
    if (*read_rows == 0) {
1854
0
        return Status::OK();
1855
0
    }
1856
1857
0
    DCHECK_EQ(key_column->size(), value_column->size());
1858
    // fill offset and null map
1859
0
    fill_array_offset(_field_schema, map.get_offsets(), null_map_ptr, _key_reader->get_rep_level(),
1860
0
                      _key_reader->get_def_level());
1861
0
    DCHECK_EQ(key_column->size(), map.get_offsets().back());
1862
0
#ifndef NDEBUG
1863
0
    doris_column->sanity_check();
1864
0
#endif
1865
0
    return Status::OK();
1866
0
}
1867
1868
Status StructColumnReader::init(
1869
        std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>>&& child_readers,
1870
11
        FieldSchema* field) {
1871
11
    _field_schema = field;
1872
11
    _child_readers = std::move(child_readers);
1873
11
    return Status::OK();
1874
11
}
1875
Status StructColumnReader::read_column_data(
1876
        ColumnPtr& doris_column, const DataTypePtr& type,
1877
        const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, FilterMap& filter_map,
1878
        size_t batch_size, size_t* read_rows, bool* eof, bool is_dict_filter,
1879
11
        int64_t real_column_size) {
1880
11
    MutableColumnPtr data_column;
1881
11
    NullMap* null_map_ptr = nullptr;
1882
11
    if (doris_column->is_nullable()) {
1883
11
        auto mutable_column = doris_column->assume_mutable();
1884
11
        auto* nullable_column = assert_cast<ColumnNullable*>(mutable_column.get());
1885
11
        null_map_ptr = &nullable_column->get_null_map_data();
1886
11
        data_column = nullable_column->get_nested_column_ptr();
1887
11
    } else {
1888
0
        if (_field_schema->data_type->is_nullable()) {
1889
0
            return Status::Corruption("Not nullable column has null values in parquet file");
1890
0
        }
1891
0
        data_column = doris_column->assume_mutable();
1892
0
    }
1893
11
    if (type->get_primitive_type() != PrimitiveType::TYPE_STRUCT) {
1894
0
        return Status::Corruption(
1895
0
                "Wrong data type for column '{}', expected Struct type, actual type id {}.",
1896
0
                _field_schema->name, type->get_name());
1897
0
    }
1898
1899
11
    auto& doris_struct = assert_cast<ColumnStruct&>(*data_column);
1900
11
    const auto* doris_struct_type = assert_cast<const DataTypeStruct*>(remove_nullable(type).get());
1901
1902
11
    int64_t not_missing_column_id = -1;
1903
11
    size_t not_missing_orig_column_size = 0;
1904
11
    std::vector<size_t> missing_column_idxs {};
1905
11
    std::vector<size_t> skip_reading_column_idxs {};
1906
1907
11
    _read_column_names.clear();
1908
1909
37
    for (size_t i = 0; i < doris_struct.tuple_size(); ++i) {
1910
26
        ColumnPtr& doris_field = doris_struct.get_column_ptr(i);
1911
26
        auto& doris_type = doris_struct_type->get_element(i);
1912
26
        auto& doris_name = doris_struct_type->get_element_name(i);
1913
26
        if (!root_node->children_column_exists(doris_name)) {
1914
0
            missing_column_idxs.push_back(i);
1915
0
            VLOG_DEBUG << "[ParquetReader] Missing column in schema: column_idx[" << i
1916
0
                       << "], doris_name: " << doris_name << " (column not exists in root node)";
1917
0
            continue;
1918
0
        }
1919
26
        auto file_name = root_node->children_file_column_name(doris_name);
1920
1921
        // Check if this is a SkipReadingReader - we should skip it when choosing reference column
1922
        // because SkipReadingReader doesn't know the actual data size in nested context
1923
26
        bool is_skip_reader =
1924
26
                dynamic_cast<SkipReadingReader*>(_child_readers[file_name].get()) != nullptr;
1925
1926
26
        if (is_skip_reader) {
1927
            // Store SkipReadingReader columns to fill them later based on reference column size
1928
4
            skip_reading_column_idxs.push_back(i);
1929
4
            continue;
1930
4
        }
1931
1932
        // Only add non-SkipReadingReader columns to _read_column_names
1933
        // This ensures get_rep_level() and get_def_level() return valid levels
1934
22
        _read_column_names.emplace_back(file_name);
1935
1936
22
        size_t field_rows = 0;
1937
22
        bool field_eof = false;
1938
22
        if (not_missing_column_id == -1) {
1939
11
            not_missing_column_id = i;
1940
11
            not_missing_orig_column_size = doris_field->size();
1941
11
            RETURN_IF_ERROR(_child_readers[file_name]->read_column_data(
1942
11
                    doris_field, doris_type, root_node->get_children_node(doris_name), filter_map,
1943
11
                    batch_size, &field_rows, &field_eof, is_dict_filter));
1944
11
            *read_rows = field_rows;
1945
11
            *eof = field_eof;
1946
            /*
1947
             * Considering the issue in the `_read_nested_column` function where data may span across pages, leading
1948
             * to missing definition and repetition levels, when filling the null_map of the struct later, it is
1949
             * crucial to use the definition and repetition levels from the first read column
1950
             * (since `_read_nested_column` is not called repeatedly).
1951
             *
1952
             *  It is worth mentioning that, theoretically, any sub-column can be chosen to fill the null_map,
1953
             *  and selecting the shortest one will offer better performance
1954
             */
1955
11
        } else {
1956
22
            while (field_rows < *read_rows && !field_eof) {
1957
11
                size_t loop_rows = 0;
1958
11
                RETURN_IF_ERROR(_child_readers[file_name]->read_column_data(
1959
11
                        doris_field, doris_type, root_node->get_children_node(doris_name),
1960
11
                        filter_map, *read_rows - field_rows, &loop_rows, &field_eof,
1961
11
                        is_dict_filter));
1962
11
                field_rows += loop_rows;
1963
11
            }
1964
11
            DCHECK_EQ(*read_rows, field_rows);
1965
            //            DCHECK_EQ(*eof, field_eof);
1966
11
        }
1967
22
    }
1968
1969
11
    int64_t missing_column_sz = -1;
1970
1971
11
    if (not_missing_column_id == -1) {
1972
        // All queried columns are missing in the file (e.g., all added after schema change)
1973
        // We need to pick a column from _field_schema children that exists in the file for RL/DL reference
1974
0
        std::string reference_file_column_name;
1975
0
        std::unique_ptr<ParquetColumnReader>* reference_reader = nullptr;
1976
1977
0
        for (const auto& child : _field_schema->children) {
1978
0
            auto it = _child_readers.find(child.name);
1979
0
            if (it != _child_readers.end()) {
1980
                // Skip SkipReadingReader as they don't have valid RL/DL
1981
0
                bool is_skip_reader = dynamic_cast<SkipReadingReader*>(it->second.get()) != nullptr;
1982
0
                if (!is_skip_reader) {
1983
0
                    reference_file_column_name = child.name;
1984
0
                    reference_reader = &(it->second);
1985
0
                    break;
1986
0
                }
1987
0
            }
1988
0
        }
1989
1990
0
        if (reference_reader != nullptr) {
1991
            // Read the reference column to get correct RL/DL information
1992
            // TODO: Optimize by only reading RL/DL without actual data decoding
1993
1994
            // We need to find the FieldSchema for the reference column from _field_schema children
1995
0
            FieldSchema* ref_field_schema = nullptr;
1996
0
            for (auto& child : _field_schema->children) {
1997
0
                if (child.name == reference_file_column_name) {
1998
0
                    ref_field_schema = &child;
1999
0
                    break;
2000
0
                }
2001
0
            }
2002
2003
0
            if (ref_field_schema == nullptr) {
2004
0
                return Status::InternalError(
2005
0
                        "Cannot find field schema for reference column '{}' in struct '{}'",
2006
0
                        reference_file_column_name, _field_schema->name);
2007
0
            }
2008
2009
            // Create a temporary column to hold the data (we'll use its size for missing_column_sz)
2010
0
            ColumnPtr temp_column = ref_field_schema->data_type->create_column();
2011
0
            auto temp_type = ref_field_schema->data_type;
2012
2013
0
            size_t field_rows = 0;
2014
0
            bool field_eof = false;
2015
2016
            // Use ConstNode for the reference column instead of looking up from root_node.
2017
            // The reference column is only used to get RL/DL information for determining the number
2018
            // of elements in the struct. It may be a column that has been dropped from the table
2019
            // schema (e.g., 'removed' field), but still exists in older parquet files.
2020
            // Since we don't need schema mapping for this column (we just need its RL/DL levels),
2021
            // using ConstNode is safe and avoids the issue where the reference column doesn't exist
2022
            // in root_node (because it was dropped from table schema).
2023
0
            auto ref_child_node = TableSchemaChangeHelper::ConstNode::get_instance();
2024
0
            not_missing_orig_column_size = temp_column->size();
2025
2026
0
            RETURN_IF_ERROR((*reference_reader)
2027
0
                                    ->read_column_data(temp_column, temp_type, ref_child_node,
2028
0
                                                       filter_map, batch_size, &field_rows,
2029
0
                                                       &field_eof, is_dict_filter));
2030
2031
0
            *read_rows = field_rows;
2032
0
            *eof = field_eof;
2033
2034
            // Store this reference column name for get_rep_level/get_def_level to use
2035
0
            _read_column_names.emplace_back(reference_file_column_name);
2036
2037
0
            missing_column_sz = temp_column->size() - not_missing_orig_column_size;
2038
0
        } else {
2039
0
            return Status::Corruption(
2040
0
                    "Cannot read struct '{}': all queried columns are missing and no reference "
2041
0
                    "column found in file",
2042
0
                    _field_schema->name);
2043
0
        }
2044
0
    }
2045
2046
    //  This missing_column_sz is not *read_rows. Because read_rows returns the number of rows.
2047
    //  For example: suppose we have a column array<struct<a:int,b:string>>,
2048
    //  where b is a newly added column, that is, a missing column.
2049
    //  There are two rows of data in this column,
2050
    //      [{1,null},{2,null},{3,null}]
2051
    //      [{4,null},{5,null}]
2052
    //  When you first read subcolumn a, you read 5 data items and the value of *read_rows is 2.
2053
    //  You should insert 5 records into subcolumn b instead of 2.
2054
11
    if (missing_column_sz == -1) {
2055
11
        missing_column_sz = doris_struct.get_column(not_missing_column_id).size() -
2056
11
                            not_missing_orig_column_size;
2057
11
    }
2058
2059
    // Fill SkipReadingReader columns with the correct amount of data based on reference column
2060
    // Let SkipReadingReader handle the data filling through its read_column_data method
2061
11
    for (auto idx : skip_reading_column_idxs) {
2062
4
        auto& doris_field = doris_struct.get_column_ptr(idx);
2063
4
        auto& doris_type = const_cast<DataTypePtr&>(doris_struct_type->get_element(idx));
2064
4
        auto& doris_name = const_cast<String&>(doris_struct_type->get_element_name(idx));
2065
4
        auto file_name = root_node->children_file_column_name(doris_name);
2066
2067
4
        size_t field_rows = 0;
2068
4
        bool field_eof = false;
2069
4
        RETURN_IF_ERROR(_child_readers[file_name]->read_column_data(
2070
4
                doris_field, doris_type, root_node->get_children_node(doris_name), filter_map,
2071
4
                missing_column_sz, &field_rows, &field_eof, is_dict_filter, missing_column_sz));
2072
4
    }
2073
2074
    // Fill truly missing columns (not in root_node) with null or default value
2075
11
    for (auto idx : missing_column_idxs) {
2076
0
        auto& doris_field = doris_struct.get_column_ptr(idx);
2077
0
        auto& doris_type = doris_struct_type->get_element(idx);
2078
0
        DCHECK(doris_type->is_nullable());
2079
0
        auto mutable_column = doris_field->assume_mutable();
2080
0
        auto* nullable_column = static_cast<ColumnNullable*>(mutable_column.get());
2081
0
        nullable_column->insert_many_defaults(missing_column_sz);
2082
0
    }
2083
2084
11
    if (null_map_ptr != nullptr) {
2085
11
        fill_struct_null_map(_field_schema, *null_map_ptr, this->get_rep_level(),
2086
11
                             this->get_def_level());
2087
11
    }
2088
11
#ifndef NDEBUG
2089
11
    doris_column->sanity_check();
2090
11
#endif
2091
11
    return Status::OK();
2092
11
}
2093
2094
Status VariantColumnReader::init(io::FileReaderSPtr file, FieldSchema* field,
2095
                                 const tparquet::RowGroup& row_group, size_t max_buf_size,
2096
                                 std::unordered_map<int, tparquet::OffsetIndex>& col_offsets,
2097
                                 RuntimeState* state, bool in_collection,
2098
                                 const std::set<uint64_t>& column_ids,
2099
0
                                 const std::set<uint64_t>& filter_column_ids) {
2100
0
    _field_schema = field;
2101
0
    _column_ids = column_ids;
2102
0
    _variant_struct_field = std::make_unique<FieldSchema>(*field);
2103
2104
0
    DataTypes child_types;
2105
0
    Strings child_names;
2106
0
    child_types.reserve(field->children.size());
2107
0
    child_names.reserve(field->children.size());
2108
0
    for (const auto& child : field->children) {
2109
0
        child_types.push_back(make_nullable(child.data_type));
2110
0
        child_names.push_back(child.name);
2111
0
    }
2112
0
    DataTypePtr variant_struct_type = std::make_shared<DataTypeStruct>(child_types, child_names);
2113
0
    if (field->data_type->is_nullable()) {
2114
0
        variant_struct_type = make_nullable(variant_struct_type);
2115
0
    }
2116
0
    _variant_struct_field->data_type = variant_struct_type;
2117
2118
0
    RETURN_IF_ERROR(ParquetColumnReader::create(file, _variant_struct_field.get(), row_group,
2119
0
                                                _row_ranges, _ctz, _io_ctx, _struct_reader,
2120
0
                                                max_buf_size, col_offsets, state, in_collection,
2121
0
                                                column_ids, filter_column_ids));
2122
0
    _struct_reader->set_column_in_nested();
2123
0
    return Status::OK();
2124
0
}
2125
2126
// NOLINTNEXTLINE(readability-function-size): existing variant read path stays local to avoid churn.
2127
Status VariantColumnReader::read_column_data(
2128
        ColumnPtr& doris_column, const DataTypePtr& type,
2129
        const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, FilterMap& filter_map,
2130
        size_t batch_size, size_t* read_rows, bool* eof, bool is_dict_filter,
2131
0
        int64_t real_column_size) {
2132
0
    (void)root_node;
2133
0
    if (remove_nullable(type)->get_primitive_type() != PrimitiveType::TYPE_VARIANT) {
2134
0
        return Status::Corruption(
2135
0
                "Wrong data type for column '{}', expected Variant type, actual type: {}.",
2136
0
                _field_schema->name, type->get_name());
2137
0
    }
2138
2139
0
    const auto& variant_struct_type = _variant_struct_field->data_type;
2140
0
    ColumnPtr struct_column = variant_struct_type->create_column();
2141
0
    const size_t old_struct_rows = struct_column->size();
2142
0
    auto const_node = TableSchemaChangeHelper::ConstNode::get_instance();
2143
0
    RETURN_IF_ERROR(_struct_reader->read_column_data(struct_column, variant_struct_type, const_node,
2144
0
                                                     filter_map, batch_size, read_rows, eof,
2145
0
                                                     is_dict_filter, real_column_size));
2146
2147
0
    const size_t new_struct_rows = struct_column->size() - old_struct_rows;
2148
0
    if (new_struct_rows == 0) {
2149
0
        return Status::OK();
2150
0
    }
2151
2152
0
    MutableColumnPtr variant_column_ptr;
2153
0
    NullMap* null_map_ptr = nullptr;
2154
0
    auto mutable_column = doris_column->assume_mutable();
2155
0
    if (doris_column->is_nullable()) {
2156
0
        auto* nullable_column = assert_cast<ColumnNullable*>(mutable_column.get());
2157
0
        variant_column_ptr = nullable_column->get_nested_column_ptr();
2158
0
        null_map_ptr = &nullable_column->get_null_map_data();
2159
0
    } else {
2160
0
        if (_field_schema->data_type->is_nullable()) {
2161
0
            return Status::Corruption("Not nullable column has null values in parquet file");
2162
0
        }
2163
0
        variant_column_ptr = std::move(mutable_column);
2164
0
    }
2165
0
    auto* variant_column = assert_cast<ColumnVariant*>(variant_column_ptr.get());
2166
2167
0
    const IColumn* variant_struct_source = struct_column.get();
2168
0
    const NullMap* struct_null_map = nullptr;
2169
0
    if (const auto* nullable_struct = check_and_get_column<ColumnNullable>(variant_struct_source)) {
2170
0
        struct_null_map = &nullable_struct->get_null_map_data();
2171
0
        variant_struct_source = &nullable_struct->get_nested_column();
2172
0
    }
2173
0
    const auto& variant_struct_column = assert_cast<const ColumnStruct&>(*variant_struct_source);
2174
2175
0
    const int typed_value_idx = find_child_idx(*_field_schema, "typed_value");
2176
0
    if (can_use_direct_typed_only_value(*_field_schema, _column_ids)) {
2177
0
        _variant_statistics.variant_direct_typed_value_read_rows +=
2178
0
                static_cast<int64_t>(new_struct_rows);
2179
0
        MutableColumnPtr batch_variant_column =
2180
0
                ColumnVariant::create(variant_column->max_subcolumns_count(),
2181
0
                                      variant_column->enable_doc_mode(), new_struct_rows + 1);
2182
0
        auto* batch_variant = assert_cast<ColumnVariant*>(batch_variant_column.get());
2183
0
        PathInDataBuilder path;
2184
0
        RETURN_IF_ERROR(append_direct_typed_column_to_batch(
2185
0
                _field_schema->children[typed_value_idx],
2186
0
                variant_struct_column.get_column(typed_value_idx), old_struct_rows, new_struct_rows,
2187
0
                &path, batch_variant, false, _column_ids, {}));
2188
0
        variant_column->insert_range_from(*batch_variant_column, 1, new_struct_rows);
2189
0
        if (null_map_ptr != nullptr) {
2190
0
            for (size_t i = old_struct_rows; i < struct_column->size(); ++i) {
2191
0
                null_map_ptr->push_back(struct_null_map != nullptr && (*struct_null_map)[i]);
2192
0
            }
2193
0
        }
2194
0
#ifndef NDEBUG
2195
0
        doris_column->sanity_check();
2196
0
#endif
2197
0
        return Status::OK();
2198
0
    }
2199
2200
0
    _variant_statistics.variant_rowwise_read_rows += static_cast<int64_t>(new_struct_rows);
2201
0
    for (size_t i = old_struct_rows; i < struct_column->size(); ++i) {
2202
0
        if (struct_null_map != nullptr && (*struct_null_map)[i]) {
2203
0
            if (null_map_ptr == nullptr) {
2204
0
                return Status::Corruption("Not nullable column has null values in parquet file");
2205
0
            }
2206
0
            variant_column->insert_default();
2207
0
            null_map_ptr->push_back(1);
2208
0
            continue;
2209
0
        }
2210
0
        VariantMap values;
2211
0
        bool present = false;
2212
0
        PathInDataBuilder path;
2213
0
        RETURN_IF_ERROR(variant_to_variant_map(*_field_schema, (*struct_column)[i], nullptr, &path,
2214
0
                                               &values, &present));
2215
0
        if (!present) {
2216
0
            values[PathInData()] = FieldWithDataType {.field = Field()};
2217
0
        }
2218
0
        RETURN_IF_CATCH_EXCEPTION(
2219
0
                variant_column->insert(Field::create_field<TYPE_VARIANT>(std::move(values))));
2220
0
        if (null_map_ptr != nullptr) {
2221
0
            null_map_ptr->push_back(0);
2222
0
        }
2223
0
    }
2224
0
#ifndef NDEBUG
2225
0
    doris_column->sanity_check();
2226
0
#endif
2227
0
    return Status::OK();
2228
0
}
2229
2230
template class ScalarColumnReader<true, true>;
2231
template class ScalarColumnReader<true, false>;
2232
template class ScalarColumnReader<false, true>;
2233
template class ScalarColumnReader<false, false>;
2234
2235
}; // namespace doris