Coverage Report

Created: 2026-03-15 17:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/byte_stream_split.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "util/byte_stream_split.h"
19
20
#include <glog/logging.h>
21
22
#include <array>
23
#include <bit> // IWYU pragma: keep
24
#include <cstring>
25
#include <vector>
26
27
namespace doris {
28
29
inline void do_merge_streams(const uint8_t** src_streams, int width, int64_t nvalues,
30
23
                             uint8_t* dest) {
31
    // Value empirically chosen to provide the best performance on the author's machine
32
23
    constexpr int kBlockSize = 128;
33
34
23
    while (nvalues >= kBlockSize) {
35
0
        for (int stream = 0; stream < width; ++stream) {
36
            // Take kBlockSize bytes from the given stream and spread them
37
            // to their logical places in destination.
38
0
            const uint8_t* src = src_streams[stream];
39
0
            for (int i = 0; i < kBlockSize; i += 8) {
40
0
                uint64_t v;
41
0
                std::memcpy(&v, src + i, sizeof(v));
42
0
                if constexpr (std::endian::native == std::endian::little) {
43
0
                    dest[stream + i * width] = static_cast<uint8_t>(v);
44
0
                    dest[stream + (i + 1) * width] = static_cast<uint8_t>(v >> 8);
45
0
                    dest[stream + (i + 2) * width] = static_cast<uint8_t>(v >> 16);
46
0
                    dest[stream + (i + 3) * width] = static_cast<uint8_t>(v >> 24);
47
0
                    dest[stream + (i + 4) * width] = static_cast<uint8_t>(v >> 32);
48
0
                    dest[stream + (i + 5) * width] = static_cast<uint8_t>(v >> 40);
49
0
                    dest[stream + (i + 6) * width] = static_cast<uint8_t>(v >> 48);
50
0
                    dest[stream + (i + 7) * width] = static_cast<uint8_t>(v >> 56);
51
                } else if constexpr (std::endian::native == std::endian::big) {
52
                    dest[stream + i * width] = static_cast<uint8_t>(v >> 56);
53
                    dest[stream + (i + 1) * width] = static_cast<uint8_t>(v >> 48);
54
                    dest[stream + (i + 2) * width] = static_cast<uint8_t>(v >> 40);
55
                    dest[stream + (i + 3) * width] = static_cast<uint8_t>(v >> 32);
56
                    dest[stream + (i + 4) * width] = static_cast<uint8_t>(v >> 24);
57
                    dest[stream + (i + 5) * width] = static_cast<uint8_t>(v >> 16);
58
                    dest[stream + (i + 6) * width] = static_cast<uint8_t>(v >> 8);
59
                    dest[stream + (i + 7) * width] = static_cast<uint8_t>(v);
60
                }
61
0
            }
62
0
            src_streams[stream] += kBlockSize;
63
0
        }
64
0
        dest += width * kBlockSize;
65
0
        nvalues -= kBlockSize;
66
0
    }
67
68
    // Epilog
69
155
    for (int stream = 0; stream < width; ++stream) {
70
132
        const uint8_t* src = src_streams[stream];
71
438
        for (int64_t i = 0; i < nvalues; ++i) {
72
306
            dest[stream + i * width] = src[i];
73
306
        }
74
132
    }
75
23
}
76
77
template <int kNumStreams>
78
void byte_stream_split_decode_scalar(const uint8_t* src, int width, int64_t offset,
79
23
                                     int64_t num_values, int64_t stride, uint8_t* dest) {
80
23
    DCHECK(width == kNumStreams);
81
23
    std::array<const uint8_t*, kNumStreams> src_streams;
82
155
    for (int stream = 0; stream < kNumStreams; ++stream) {
83
132
        src_streams[stream] = &src[stream * stride + offset];
84
132
    }
85
23
    do_merge_streams(src_streams.data(), kNumStreams, num_values, dest);
86
23
}
_ZN5doris31byte_stream_split_decode_scalarILi2EEEvPKhilllPh
Line
Count
Source
79
2
                                     int64_t num_values, int64_t stride, uint8_t* dest) {
80
2
    DCHECK(width == kNumStreams);
81
2
    std::array<const uint8_t*, kNumStreams> src_streams;
82
6
    for (int stream = 0; stream < kNumStreams; ++stream) {
83
4
        src_streams[stream] = &src[stream * stride + offset];
84
4
    }
85
2
    do_merge_streams(src_streams.data(), kNumStreams, num_values, dest);
86
2
}
_ZN5doris31byte_stream_split_decode_scalarILi4EEEvPKhilllPh
Line
Count
Source
79
10
                                     int64_t num_values, int64_t stride, uint8_t* dest) {
80
10
    DCHECK(width == kNumStreams);
81
10
    std::array<const uint8_t*, kNumStreams> src_streams;
82
50
    for (int stream = 0; stream < kNumStreams; ++stream) {
83
40
        src_streams[stream] = &src[stream * stride + offset];
84
40
    }
85
10
    do_merge_streams(src_streams.data(), kNumStreams, num_values, dest);
86
10
}
_ZN5doris31byte_stream_split_decode_scalarILi8EEEvPKhilllPh
Line
Count
Source
79
11
                                     int64_t num_values, int64_t stride, uint8_t* dest) {
80
11
    DCHECK(width == kNumStreams);
81
11
    std::array<const uint8_t*, kNumStreams> src_streams;
82
99
    for (int stream = 0; stream < kNumStreams; ++stream) {
83
88
        src_streams[stream] = &src[stream * stride + offset];
84
88
    }
85
11
    do_merge_streams(src_streams.data(), kNumStreams, num_values, dest);
86
11
}
Unexecuted instantiation: _ZN5doris31byte_stream_split_decode_scalarILi16EEEvPKhilllPh
87
88
inline void byte_stream_split_decode_scalar_dynamic(const uint8_t* src, int width, int64_t offset,
89
                                                    int64_t num_values, int64_t stride,
90
0
                                                    uint8_t* dest) {
91
0
    std::vector<const uint8_t*> src_streams;
92
0
    src_streams.resize(width);
93
0
    for (int stream = 0; stream < width; ++stream) {
94
0
        src_streams[stream] = &src[stream * stride + offset];
95
0
    }
96
0
    do_merge_streams(src_streams.data(), width, num_values, dest);
97
0
}
98
99
// TODO: optimize using simd: https://github.com/apache/arrow/pull/38529
100
void byte_stream_split_decode(const uint8_t* src, int width, int64_t offset, int64_t num_values,
101
25
                              int64_t stride, uint8_t* dest) {
102
25
    switch (width) {
103
2
    case 1:
104
2
        memcpy(dest, src + offset * width, num_values);
105
2
        return;
106
2
    case 2:
107
2
        return byte_stream_split_decode_scalar<2>(src, width, offset, num_values, stride, dest);
108
10
    case 4:
109
10
        return byte_stream_split_decode_scalar<4>(src, width, offset, num_values, stride, dest);
110
11
    case 8:
111
11
        return byte_stream_split_decode_scalar<8>(src, width, offset, num_values, stride, dest);
112
0
    case 16:
113
0
        return byte_stream_split_decode_scalar<16>(src, width, offset, num_values, stride, dest);
114
25
    }
115
0
    return byte_stream_split_decode_scalar_dynamic(src, width, offset, num_values, stride, dest);
116
25
}
117
118
} // namespace doris