Coverage Report

Created: 2026-03-14 17:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/parquet/decoder.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/parquet_types.h>
21
#include <glog/logging.h>
22
23
#include <cstddef>
24
#include <cstdint>
25
#include <memory>
26
#include <ostream>
27
#include <vector>
28
29
#include "common/status.h"
30
#include "core/assert_cast.h"
31
#include "core/column/column.h"
32
#include "core/column/column_dictionary.h"
33
#include "core/column/column_vector.h"
34
#include "core/custom_allocator.h"
35
#include "core/data_type/data_type.h"
36
#include "core/data_type/data_type_decimal.h" // IWYU pragma: keep
37
#include "core/data_type/data_type_nullable.h"
38
#include "core/pod_array_fwd.h"
39
#include "core/types.h"
40
#include "format/parquet/parquet_common.h"
41
#include "util/rle_encoding.h"
42
#include "util/slice.h"
43
44
namespace doris {
45
#include "common/compile_check_begin.h"
46
template <typename T>
47
class ColumnStr;
48
using ColumnString = ColumnStr<UInt32>;
49
50
class Decoder {
51
public:
52
173k
    Decoder() = default;
53
173k
    virtual ~Decoder() = default;
54
55
    static Status get_decoder(tparquet::Type::type type, tparquet::Encoding::type encoding,
56
                              std::unique_ptr<Decoder>& decoder);
57
58
    // The type with fix length
59
172k
    void set_type_length(int32_t type_length) { _type_length = type_length; }
60
61
    // Set the data to be decoded
62
413k
    virtual Status set_data(Slice* data) {
63
413k
        _data = data;
64
413k
        _offset = 0;
65
413k
        return Status::OK();
66
413k
    }
67
68
    // Write the decoded values batch to doris's column
69
    virtual Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
70
                                 ColumnSelectVector& select_vector, bool is_dict_filter) = 0;
71
72
    virtual Status skip_values(size_t num_values) = 0;
73
74
    virtual Status set_dict(DorisUniqueBufferPtr<uint8_t>& dict, int32_t length,
75
0
                            size_t num_values) {
76
0
        return Status::NotSupported("set_dict is not supported");
77
0
    }
78
79
0
    virtual Status read_dict_values_to_column(MutableColumnPtr& doris_column) {
80
0
        return Status::NotSupported("read_dict_values_to_column is not supported");
81
0
    }
82
83
    virtual Result<MutableColumnPtr> convert_dict_column_to_string_column(
84
0
            const ColumnInt32* dict_column) {
85
0
        throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
86
0
                               "Method convert_dict_column_to_string_column is not supported");
87
0
    }
88
89
protected:
90
    int32_t _type_length;
91
    Slice* _data = nullptr;
92
    uint32_t _offset = 0;
93
};
94
95
class BaseDictDecoder : public Decoder {
96
public:
97
91.6k
    BaseDictDecoder() = default;
98
91.5k
    ~BaseDictDecoder() override = default;
99
100
    // Set the data to be decoded
101
466k
    Status set_data(Slice* data) override {
102
466k
        _data = data;
103
466k
        _offset = 0;
104
466k
        uint8_t bit_width = *data->data;
105
466k
        _index_batch_decoder = std::make_unique<RleBatchDecoder<uint32_t>>(
106
466k
                reinterpret_cast<uint8_t*>(data->data) + 1, static_cast<int>(data->size) - 1,
107
466k
                bit_width);
108
466k
        return Status::OK();
109
466k
    }
110
111
protected:
112
    /**
113
     * Decode dictionary-coded values into doris_column, ensure that doris_column is ColumnDictI32 type,
114
     * and the coded values must be read into _indexes previously.
115
     */
116
    template <bool has_filter>
117
    Status _decode_dict_values(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector,
118
554
                               bool is_dict_filter) {
119
554
        DCHECK(doris_column->is_column_dictionary() || is_dict_filter);
120
554
        size_t dict_index = 0;
121
554
        ColumnSelectVector::DataReadType read_type;
122
554
        PaddedPODArray<Int32>& column_data =
123
554
                doris_column->is_column_dictionary()
124
554
                        ? assert_cast<ColumnDictI32&>(*doris_column).get_data()
125
554
                        : assert_cast<ColumnInt32&>(*doris_column).get_data();
126
1.18k
        while (size_t run_length = select_vector.get_next_run<has_filter>(&read_type)) {
127
626
            switch (read_type) {
128
586
            case ColumnSelectVector::CONTENT: {
129
586
                uint32_t* start_index = _indexes.data();
130
586
                column_data.insert(start_index + dict_index, start_index + dict_index + run_length);
131
586
                dict_index += run_length;
132
586
                break;
133
0
            }
134
24
            case ColumnSelectVector::NULL_DATA: {
135
24
                doris_column->insert_many_defaults(run_length);
136
24
                break;
137
0
            }
138
16
            case ColumnSelectVector::FILTERED_CONTENT: {
139
16
                dict_index += run_length;
140
16
                break;
141
0
            }
142
0
            case ColumnSelectVector::FILTERED_NULL: {
143
0
                break;
144
0
            }
145
626
            }
146
626
        }
147
554
        return Status::OK();
148
554
    }
_ZN5doris15BaseDictDecoder19_decode_dict_valuesILb1EEENS_6StatusERNS_3COWINS_7IColumnEE11mutable_ptrIS4_EERNS_18ColumnSelectVectorEb
Line
Count
Source
118
8
                               bool is_dict_filter) {
119
8
        DCHECK(doris_column->is_column_dictionary() || is_dict_filter);
120
8
        size_t dict_index = 0;
121
8
        ColumnSelectVector::DataReadType read_type;
122
8
        PaddedPODArray<Int32>& column_data =
123
8
                doris_column->is_column_dictionary()
124
8
                        ? assert_cast<ColumnDictI32&>(*doris_column).get_data()
125
8
                        : assert_cast<ColumnInt32&>(*doris_column).get_data();
126
56
        while (size_t run_length = select_vector.get_next_run<has_filter>(&read_type)) {
127
48
            switch (read_type) {
128
24
            case ColumnSelectVector::CONTENT: {
129
24
                uint32_t* start_index = _indexes.data();
130
24
                column_data.insert(start_index + dict_index, start_index + dict_index + run_length);
131
24
                dict_index += run_length;
132
24
                break;
133
0
            }
134
8
            case ColumnSelectVector::NULL_DATA: {
135
8
                doris_column->insert_many_defaults(run_length);
136
8
                break;
137
0
            }
138
16
            case ColumnSelectVector::FILTERED_CONTENT: {
139
16
                dict_index += run_length;
140
16
                break;
141
0
            }
142
0
            case ColumnSelectVector::FILTERED_NULL: {
143
0
                break;
144
0
            }
145
48
            }
146
48
        }
147
8
        return Status::OK();
148
8
    }
_ZN5doris15BaseDictDecoder19_decode_dict_valuesILb0EEENS_6StatusERNS_3COWINS_7IColumnEE11mutable_ptrIS4_EERNS_18ColumnSelectVectorEb
Line
Count
Source
118
546
                               bool is_dict_filter) {
119
546
        DCHECK(doris_column->is_column_dictionary() || is_dict_filter);
120
546
        size_t dict_index = 0;
121
546
        ColumnSelectVector::DataReadType read_type;
122
546
        PaddedPODArray<Int32>& column_data =
123
546
                doris_column->is_column_dictionary()
124
546
                        ? assert_cast<ColumnDictI32&>(*doris_column).get_data()
125
546
                        : assert_cast<ColumnInt32&>(*doris_column).get_data();
126
1.12k
        while (size_t run_length = select_vector.get_next_run<has_filter>(&read_type)) {
127
578
            switch (read_type) {
128
562
            case ColumnSelectVector::CONTENT: {
129
562
                uint32_t* start_index = _indexes.data();
130
562
                column_data.insert(start_index + dict_index, start_index + dict_index + run_length);
131
562
                dict_index += run_length;
132
562
                break;
133
0
            }
134
16
            case ColumnSelectVector::NULL_DATA: {
135
16
                doris_column->insert_many_defaults(run_length);
136
16
                break;
137
0
            }
138
0
            case ColumnSelectVector::FILTERED_CONTENT: {
139
0
                dict_index += run_length;
140
0
                break;
141
0
            }
142
0
            case ColumnSelectVector::FILTERED_NULL: {
143
0
                break;
144
0
            }
145
578
            }
146
578
        }
147
546
        return Status::OK();
148
546
    }
149
150
37.0k
    Status skip_values(size_t num_values) override {
151
37.0k
        _indexes.resize(num_values);
152
37.0k
        _index_batch_decoder->GetBatch(_indexes.data(), cast_set<uint32_t>(num_values));
153
37.0k
        return Status::OK();
154
37.0k
    }
155
156
    // For dictionary encoding
157
    DorisUniqueBufferPtr<uint8_t> _dict;
158
    std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder;
159
    std::vector<uint32_t> _indexes;
160
};
161
#include "common/compile_check_end.h"
162
163
} // namespace doris