Coverage Report

Created: 2026-04-10 04:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/parquet/decoder.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/parquet_types.h>
21
#include <glog/logging.h>
22
23
#include <cstddef>
24
#include <cstdint>
25
#include <memory>
26
#include <ostream>
27
#include <vector>
28
29
#include "common/status.h"
30
#include "core/assert_cast.h"
31
#include "core/column/column.h"
32
#include "core/column/column_dictionary.h"
33
#include "core/column/column_vector.h"
34
#include "core/custom_allocator.h"
35
#include "core/data_type/data_type.h"
36
#include "core/data_type/data_type_decimal.h" // IWYU pragma: keep
37
#include "core/data_type/data_type_nullable.h"
38
#include "core/pod_array_fwd.h"
39
#include "core/types.h"
40
#include "format/parquet/parquet_common.h"
41
#include "util/rle_encoding.h"
42
#include "util/slice.h"
43
44
namespace doris {
45
template <typename T>
46
class ColumnStr;
47
using ColumnString = ColumnStr<UInt32>;
48
49
class Decoder {
50
public:
51
307
    Decoder() = default;
52
299
    virtual ~Decoder() = default;
53
54
    static Status get_decoder(tparquet::Type::type type, tparquet::Encoding::type encoding,
55
                              std::unique_ptr<Decoder>& decoder);
56
57
    // The type with fix length
58
204
    void set_type_length(int32_t type_length) { _type_length = type_length; }
59
60
    // Set the data to be decoded
61
123
    virtual Status set_data(Slice* data) {
62
123
        _data = data;
63
123
        _offset = 0;
64
123
        return Status::OK();
65
123
    }
66
67
    // Write the decoded values batch to doris's column
68
    virtual Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
69
                                 ColumnSelectVector& select_vector, bool is_dict_filter) = 0;
70
71
    virtual Status skip_values(size_t num_values) = 0;
72
73
    virtual Status set_dict(DorisUniqueBufferPtr<uint8_t>& dict, int32_t length,
74
0
                            size_t num_values) {
75
0
        return Status::NotSupported("set_dict is not supported");
76
0
    }
77
78
0
    virtual Status read_dict_values_to_column(MutableColumnPtr& doris_column) {
79
0
        return Status::NotSupported("read_dict_values_to_column is not supported");
80
0
    }
81
82
    virtual Result<MutableColumnPtr> convert_dict_column_to_string_column(
83
0
            const ColumnInt32* dict_column) {
84
0
        throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
85
0
                               "Method convert_dict_column_to_string_column is not supported");
86
0
    }
87
88
protected:
89
    int32_t _type_length;
90
    Slice* _data = nullptr;
91
    uint32_t _offset = 0;
92
};
93
94
class BaseDictDecoder : public Decoder {
95
public:
96
66
    BaseDictDecoder() = default;
97
66
    ~BaseDictDecoder() override = default;
98
99
    // Set the data to be decoded
100
49
    Status set_data(Slice* data) override {
101
49
        _data = data;
102
49
        _offset = 0;
103
49
        uint8_t bit_width = *data->data;
104
49
        _index_batch_decoder = std::make_unique<RleBatchDecoder<uint32_t>>(
105
49
                reinterpret_cast<uint8_t*>(data->data) + 1, static_cast<int>(data->size) - 1,
106
49
                bit_width);
107
49
        return Status::OK();
108
49
    }
109
110
protected:
111
    /**
112
     * Decode dictionary-coded values into doris_column, ensure that doris_column is ColumnDictI32 type,
113
     * and the coded values must be read into _indexes previously.
114
     */
115
    template <bool has_filter>
116
    Status _decode_dict_values(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector,
117
13
                               bool is_dict_filter) {
118
13
        DCHECK(doris_column->is_column_dictionary() || is_dict_filter);
119
13
        size_t dict_index = 0;
120
13
        ColumnSelectVector::DataReadType read_type;
121
13
        PaddedPODArray<Int32>& column_data =
122
13
                doris_column->is_column_dictionary()
123
13
                        ? assert_cast<ColumnDictI32&>(*doris_column).get_data()
124
13
                        : assert_cast<ColumnInt32&>(*doris_column).get_data();
125
66
        while (size_t run_length = select_vector.get_next_run<has_filter>(&read_type)) {
126
53
            switch (read_type) {
127
29
            case ColumnSelectVector::CONTENT: {
128
29
                uint32_t* start_index = _indexes.data();
129
29
                column_data.insert(start_index + dict_index, start_index + dict_index + run_length);
130
29
                dict_index += run_length;
131
29
                break;
132
0
            }
133
8
            case ColumnSelectVector::NULL_DATA: {
134
8
                doris_column->insert_many_defaults(run_length);
135
8
                break;
136
0
            }
137
16
            case ColumnSelectVector::FILTERED_CONTENT: {
138
16
                dict_index += run_length;
139
16
                break;
140
0
            }
141
0
            case ColumnSelectVector::FILTERED_NULL: {
142
0
                break;
143
0
            }
144
53
            }
145
53
        }
146
13
        return Status::OK();
147
13
    }
_ZN5doris15BaseDictDecoder19_decode_dict_valuesILb1EEENS_6StatusERNS_3COWINS_7IColumnEE11mutable_ptrIS4_EERNS_18ColumnSelectVectorEb
Line
Count
Source
117
8
                               bool is_dict_filter) {
118
8
        DCHECK(doris_column->is_column_dictionary() || is_dict_filter);
119
8
        size_t dict_index = 0;
120
8
        ColumnSelectVector::DataReadType read_type;
121
8
        PaddedPODArray<Int32>& column_data =
122
8
                doris_column->is_column_dictionary()
123
8
                        ? assert_cast<ColumnDictI32&>(*doris_column).get_data()
124
8
                        : assert_cast<ColumnInt32&>(*doris_column).get_data();
125
56
        while (size_t run_length = select_vector.get_next_run<has_filter>(&read_type)) {
126
48
            switch (read_type) {
127
24
            case ColumnSelectVector::CONTENT: {
128
24
                uint32_t* start_index = _indexes.data();
129
24
                column_data.insert(start_index + dict_index, start_index + dict_index + run_length);
130
24
                dict_index += run_length;
131
24
                break;
132
0
            }
133
8
            case ColumnSelectVector::NULL_DATA: {
134
8
                doris_column->insert_many_defaults(run_length);
135
8
                break;
136
0
            }
137
16
            case ColumnSelectVector::FILTERED_CONTENT: {
138
16
                dict_index += run_length;
139
16
                break;
140
0
            }
141
0
            case ColumnSelectVector::FILTERED_NULL: {
142
0
                break;
143
0
            }
144
48
            }
145
48
        }
146
8
        return Status::OK();
147
8
    }
_ZN5doris15BaseDictDecoder19_decode_dict_valuesILb0EEENS_6StatusERNS_3COWINS_7IColumnEE11mutable_ptrIS4_EERNS_18ColumnSelectVectorEb
Line
Count
Source
117
5
                               bool is_dict_filter) {
118
5
        DCHECK(doris_column->is_column_dictionary() || is_dict_filter);
119
5
        size_t dict_index = 0;
120
5
        ColumnSelectVector::DataReadType read_type;
121
5
        PaddedPODArray<Int32>& column_data =
122
5
                doris_column->is_column_dictionary()
123
5
                        ? assert_cast<ColumnDictI32&>(*doris_column).get_data()
124
5
                        : assert_cast<ColumnInt32&>(*doris_column).get_data();
125
10
        while (size_t run_length = select_vector.get_next_run<has_filter>(&read_type)) {
126
5
            switch (read_type) {
127
5
            case ColumnSelectVector::CONTENT: {
128
5
                uint32_t* start_index = _indexes.data();
129
5
                column_data.insert(start_index + dict_index, start_index + dict_index + run_length);
130
5
                dict_index += run_length;
131
5
                break;
132
0
            }
133
0
            case ColumnSelectVector::NULL_DATA: {
134
0
                doris_column->insert_many_defaults(run_length);
135
0
                break;
136
0
            }
137
0
            case ColumnSelectVector::FILTERED_CONTENT: {
138
0
                dict_index += run_length;
139
0
                break;
140
0
            }
141
0
            case ColumnSelectVector::FILTERED_NULL: {
142
0
                break;
143
0
            }
144
5
            }
145
5
        }
146
5
        return Status::OK();
147
5
    }
148
149
2
    Status skip_values(size_t num_values) override {
150
2
        _indexes.resize(num_values);
151
2
        _index_batch_decoder->GetBatch(_indexes.data(), cast_set<uint32_t>(num_values));
152
2
        return Status::OK();
153
2
    }
154
155
    // For dictionary encoding
156
    DorisUniqueBufferPtr<uint8_t> _dict;
157
    std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder;
158
    std::vector<uint32_t> _indexes;
159
};
160
161
} // namespace doris