Coverage Report

Created: 2026-03-16 12:03

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/byte_stream_split.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "util/byte_stream_split.h"
19
20
#include <glog/logging.h>
21
22
#include <array>
23
#include <bit> // IWYU pragma: keep
24
#include <cstring>
25
#include <vector>
26
27
namespace doris {
28
29
inline void do_merge_streams(const uint8_t** src_streams, int width, int64_t nvalues,
30
41
                             uint8_t* dest) {
31
    // Value empirically chosen to provide the best performance on the author's machine
32
41
    constexpr int kBlockSize = 128;
33
34
55
    while (nvalues >= kBlockSize) {
35
84
        for (int stream = 0; stream < width; ++stream) {
36
            // Take kBlockSize bytes from the given stream and spread them
37
            // to their logical places in destination.
38
70
            const uint8_t* src = src_streams[stream];
39
1.19k
            for (int i = 0; i < kBlockSize; i += 8) {
40
1.12k
                uint64_t v;
41
1.12k
                std::memcpy(&v, src + i, sizeof(v));
42
1.12k
                if constexpr (std::endian::native == std::endian::little) {
43
1.12k
                    dest[stream + i * width] = static_cast<uint8_t>(v);
44
1.12k
                    dest[stream + (i + 1) * width] = static_cast<uint8_t>(v >> 8);
45
1.12k
                    dest[stream + (i + 2) * width] = static_cast<uint8_t>(v >> 16);
46
1.12k
                    dest[stream + (i + 3) * width] = static_cast<uint8_t>(v >> 24);
47
1.12k
                    dest[stream + (i + 4) * width] = static_cast<uint8_t>(v >> 32);
48
1.12k
                    dest[stream + (i + 5) * width] = static_cast<uint8_t>(v >> 40);
49
1.12k
                    dest[stream + (i + 6) * width] = static_cast<uint8_t>(v >> 48);
50
1.12k
                    dest[stream + (i + 7) * width] = static_cast<uint8_t>(v >> 56);
51
                } else if constexpr (std::endian::native == std::endian::big) {
52
                    dest[stream + i * width] = static_cast<uint8_t>(v >> 56);
53
                    dest[stream + (i + 1) * width] = static_cast<uint8_t>(v >> 48);
54
                    dest[stream + (i + 2) * width] = static_cast<uint8_t>(v >> 40);
55
                    dest[stream + (i + 3) * width] = static_cast<uint8_t>(v >> 32);
56
                    dest[stream + (i + 4) * width] = static_cast<uint8_t>(v >> 24);
57
                    dest[stream + (i + 5) * width] = static_cast<uint8_t>(v >> 16);
58
                    dest[stream + (i + 6) * width] = static_cast<uint8_t>(v >> 8);
59
                    dest[stream + (i + 7) * width] = static_cast<uint8_t>(v);
60
                }
61
1.12k
            }
62
70
            src_streams[stream] += kBlockSize;
63
70
        }
64
14
        dest += width * kBlockSize;
65
14
        nvalues -= kBlockSize;
66
14
    }
67
68
    // Epilog
69
267
    for (int stream = 0; stream < width; ++stream) {
70
226
        const uint8_t* src = src_streams[stream];
71
7.97k
        for (int64_t i = 0; i < nvalues; ++i) {
72
7.74k
            dest[stream + i * width] = src[i];
73
7.74k
        }
74
226
    }
75
41
}
76
77
template <int kNumStreams>
78
void byte_stream_split_decode_scalar(const uint8_t* src, int width, int64_t offset,
79
39
                                     int64_t num_values, int64_t stride, uint8_t* dest) {
80
39
    DCHECK(width == kNumStreams);
81
39
    std::array<const uint8_t*, kNumStreams> src_streams;
82
255
    for (int stream = 0; stream < kNumStreams; ++stream) {
83
216
        src_streams[stream] = &src[stream * stride + offset];
84
216
    }
85
39
    do_merge_streams(src_streams.data(), kNumStreams, num_values, dest);
86
39
}
_ZN5doris31byte_stream_split_decode_scalarILi2EEEvPKhilllPh
Line
Count
Source
79
4
                                     int64_t num_values, int64_t stride, uint8_t* dest) {
80
4
    DCHECK(width == kNumStreams);
81
4
    std::array<const uint8_t*, kNumStreams> src_streams;
82
12
    for (int stream = 0; stream < kNumStreams; ++stream) {
83
8
        src_streams[stream] = &src[stream * stride + offset];
84
8
    }
85
4
    do_merge_streams(src_streams.data(), kNumStreams, num_values, dest);
86
4
}
_ZN5doris31byte_stream_split_decode_scalarILi4EEEvPKhilllPh
Line
Count
Source
79
18
                                     int64_t num_values, int64_t stride, uint8_t* dest) {
80
18
    DCHECK(width == kNumStreams);
81
18
    std::array<const uint8_t*, kNumStreams> src_streams;
82
90
    for (int stream = 0; stream < kNumStreams; ++stream) {
83
72
        src_streams[stream] = &src[stream * stride + offset];
84
72
    }
85
18
    do_merge_streams(src_streams.data(), kNumStreams, num_values, dest);
86
18
}
_ZN5doris31byte_stream_split_decode_scalarILi8EEEvPKhilllPh
Line
Count
Source
79
17
                                     int64_t num_values, int64_t stride, uint8_t* dest) {
80
17
    DCHECK(width == kNumStreams);
81
17
    std::array<const uint8_t*, kNumStreams> src_streams;
82
153
    for (int stream = 0; stream < kNumStreams; ++stream) {
83
136
        src_streams[stream] = &src[stream * stride + offset];
84
136
    }
85
17
    do_merge_streams(src_streams.data(), kNumStreams, num_values, dest);
86
17
}
Unexecuted instantiation: _ZN5doris31byte_stream_split_decode_scalarILi16EEEvPKhilllPh
87
88
inline void byte_stream_split_decode_scalar_dynamic(const uint8_t* src, int width, int64_t offset,
89
                                                    int64_t num_values, int64_t stride,
90
2
                                                    uint8_t* dest) {
91
2
    std::vector<const uint8_t*> src_streams;
92
2
    src_streams.resize(width);
93
12
    for (int stream = 0; stream < width; ++stream) {
94
10
        src_streams[stream] = &src[stream * stride + offset];
95
10
    }
96
2
    do_merge_streams(src_streams.data(), width, num_values, dest);
97
2
}
98
99
// TODO: optimize using simd: https://github.com/apache/arrow/pull/38529
100
void byte_stream_split_decode(const uint8_t* src, int width, int64_t offset, int64_t num_values,
101
43
                              int64_t stride, uint8_t* dest) {
102
43
    switch (width) {
103
2
    case 1:
104
2
        memcpy(dest, src + offset * width, num_values);
105
2
        return;
106
4
    case 2:
107
4
        return byte_stream_split_decode_scalar<2>(src, width, offset, num_values, stride, dest);
108
18
    case 4:
109
18
        return byte_stream_split_decode_scalar<4>(src, width, offset, num_values, stride, dest);
110
17
    case 8:
111
17
        return byte_stream_split_decode_scalar<8>(src, width, offset, num_values, stride, dest);
112
0
    case 16:
113
0
        return byte_stream_split_decode_scalar<16>(src, width, offset, num_values, stride, dest);
114
43
    }
115
2
    return byte_stream_split_decode_scalar_dynamic(src, width, offset, num_values, stride, dest);
116
43
}
117
118
} // namespace doris