Coverage Report

Created: 2026-03-24 14:21

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/byte_stream_split.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "util/byte_stream_split.h"
19
20
#include <glog/logging.h>
21
22
#include <array>
23
#include <bit> // IWYU pragma: keep
24
#include <cstring>
25
#include <vector>
26
27
namespace doris {
28
29
inline void do_merge_streams(const uint8_t** src_streams, int width, int64_t nvalues,
30
34
                             uint8_t* dest) {
31
    // Value empirically chosen to provide the best performance on the author's machine
32
34
    constexpr int kBlockSize = 128;
33
34
45
    while (nvalues >= kBlockSize) {
35
70
        for (int stream = 0; stream < width; ++stream) {
36
            // Take kBlockSize bytes from the given stream and spread them
37
            // to their logical places in destination.
38
59
            const uint8_t* src = src_streams[stream];
39
1.00k
            for (int i = 0; i < kBlockSize; i += 8) {
40
944
                uint64_t v;
41
944
                std::memcpy(&v, src + i, sizeof(v));
42
944
                if constexpr (std::endian::native == std::endian::little) {
43
944
                    dest[stream + i * width] = static_cast<uint8_t>(v);
44
944
                    dest[stream + (i + 1) * width] = static_cast<uint8_t>(v >> 8);
45
944
                    dest[stream + (i + 2) * width] = static_cast<uint8_t>(v >> 16);
46
944
                    dest[stream + (i + 3) * width] = static_cast<uint8_t>(v >> 24);
47
944
                    dest[stream + (i + 4) * width] = static_cast<uint8_t>(v >> 32);
48
944
                    dest[stream + (i + 5) * width] = static_cast<uint8_t>(v >> 40);
49
944
                    dest[stream + (i + 6) * width] = static_cast<uint8_t>(v >> 48);
50
944
                    dest[stream + (i + 7) * width] = static_cast<uint8_t>(v >> 56);
51
                } else if constexpr (std::endian::native == std::endian::big) {
52
                    dest[stream + i * width] = static_cast<uint8_t>(v >> 56);
53
                    dest[stream + (i + 1) * width] = static_cast<uint8_t>(v >> 48);
54
                    dest[stream + (i + 2) * width] = static_cast<uint8_t>(v >> 40);
55
                    dest[stream + (i + 3) * width] = static_cast<uint8_t>(v >> 32);
56
                    dest[stream + (i + 4) * width] = static_cast<uint8_t>(v >> 24);
57
                    dest[stream + (i + 5) * width] = static_cast<uint8_t>(v >> 16);
58
                    dest[stream + (i + 6) * width] = static_cast<uint8_t>(v >> 8);
59
                    dest[stream + (i + 7) * width] = static_cast<uint8_t>(v);
60
                }
61
944
            }
62
59
            src_streams[stream] += kBlockSize;
63
59
        }
64
11
        dest += width * kBlockSize;
65
11
        nvalues -= kBlockSize;
66
11
    }
67
68
    // Epilog
69
225
    for (int stream = 0; stream < width; ++stream) {
70
191
        const uint8_t* src = src_streams[stream];
71
4.74k
        for (int64_t i = 0; i < nvalues; ++i) {
72
4.55k
            dest[stream + i * width] = src[i];
73
4.55k
        }
74
191
    }
75
34
}
76
77
template <int kNumStreams>
78
void byte_stream_split_decode_scalar(const uint8_t* src, int width, int64_t offset,
79
33
                                     int64_t num_values, int64_t stride, uint8_t* dest) {
80
33
    DCHECK(width == kNumStreams);
81
33
    std::array<const uint8_t*, kNumStreams> src_streams;
82
219
    for (int stream = 0; stream < kNumStreams; ++stream) {
83
186
        src_streams[stream] = &src[stream * stride + offset];
84
186
    }
85
33
    do_merge_streams(src_streams.data(), kNumStreams, num_values, dest);
86
33
}
_ZN5doris31byte_stream_split_decode_scalarILi2EEEvPKhilllPh
Line
Count
Source
79
3
                                     int64_t num_values, int64_t stride, uint8_t* dest) {
80
3
    DCHECK(width == kNumStreams);
81
3
    std::array<const uint8_t*, kNumStreams> src_streams;
82
9
    for (int stream = 0; stream < kNumStreams; ++stream) {
83
6
        src_streams[stream] = &src[stream * stride + offset];
84
6
    }
85
3
    do_merge_streams(src_streams.data(), kNumStreams, num_values, dest);
86
3
}
_ZN5doris31byte_stream_split_decode_scalarILi4EEEvPKhilllPh
Line
Count
Source
79
15
                                     int64_t num_values, int64_t stride, uint8_t* dest) {
80
15
    DCHECK(width == kNumStreams);
81
15
    std::array<const uint8_t*, kNumStreams> src_streams;
82
75
    for (int stream = 0; stream < kNumStreams; ++stream) {
83
60
        src_streams[stream] = &src[stream * stride + offset];
84
60
    }
85
15
    do_merge_streams(src_streams.data(), kNumStreams, num_values, dest);
86
15
}
_ZN5doris31byte_stream_split_decode_scalarILi8EEEvPKhilllPh
Line
Count
Source
79
15
                                     int64_t num_values, int64_t stride, uint8_t* dest) {
80
15
    DCHECK(width == kNumStreams);
81
15
    std::array<const uint8_t*, kNumStreams> src_streams;
82
135
    for (int stream = 0; stream < kNumStreams; ++stream) {
83
120
        src_streams[stream] = &src[stream * stride + offset];
84
120
    }
85
15
    do_merge_streams(src_streams.data(), kNumStreams, num_values, dest);
86
15
}
Unexecuted instantiation: _ZN5doris31byte_stream_split_decode_scalarILi16EEEvPKhilllPh
87
88
inline void byte_stream_split_decode_scalar_dynamic(const uint8_t* src, int width, int64_t offset,
89
                                                    int64_t num_values, int64_t stride,
90
1
                                                    uint8_t* dest) {
91
1
    std::vector<const uint8_t*> src_streams;
92
1
    src_streams.resize(width);
93
6
    for (int stream = 0; stream < width; ++stream) {
94
5
        src_streams[stream] = &src[stream * stride + offset];
95
5
    }
96
1
    do_merge_streams(src_streams.data(), width, num_values, dest);
97
1
}
98
99
// TODO: optimize using simd: https://github.com/apache/arrow/pull/38529
100
void byte_stream_split_decode(const uint8_t* src, int width, int64_t offset, int64_t num_values,
101
36
                              int64_t stride, uint8_t* dest) {
102
36
    switch (width) {
103
2
    case 1:
104
2
        memcpy(dest, src + offset * width, num_values);
105
2
        return;
106
3
    case 2:
107
3
        return byte_stream_split_decode_scalar<2>(src, width, offset, num_values, stride, dest);
108
15
    case 4:
109
15
        return byte_stream_split_decode_scalar<4>(src, width, offset, num_values, stride, dest);
110
15
    case 8:
111
15
        return byte_stream_split_decode_scalar<8>(src, width, offset, num_values, stride, dest);
112
0
    case 16:
113
0
        return byte_stream_split_decode_scalar<16>(src, width, offset, num_values, stride, dest);
114
36
    }
115
1
    return byte_stream_split_decode_scalar_dynamic(src, width, offset, num_values, stride, dest);
116
36
}
117
118
} // namespace doris