Coverage Report

Created: 2026-03-13 09:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/byte_stream_split.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "util/byte_stream_split.h"
19
20
#include <glog/logging.h>
21
22
#include <array>
23
#include <bit> // IWYU pragma: keep
24
#include <cstring>
25
#include <vector>
26
27
namespace doris {
28
29
inline void do_merge_streams(const uint8_t** src_streams, int width, int64_t nvalues,
30
45
                             uint8_t* dest) {
31
    // Value empirically chosen to provide the best performance on the author's machine
32
45
    constexpr int kBlockSize = 128;
33
34
67
    while (nvalues >= kBlockSize) {
35
140
        for (int stream = 0; stream < width; ++stream) {
36
            // Take kBlockSize bytes from the given stream and spread them
37
            // to their logical places in destination.
38
118
            const uint8_t* src = src_streams[stream];
39
2.00k
            for (int i = 0; i < kBlockSize; i += 8) {
40
1.88k
                uint64_t v;
41
1.88k
                std::memcpy(&v, src + i, sizeof(v));
42
1.88k
                if constexpr (std::endian::native == std::endian::little) {
43
1.88k
                    dest[stream + i * width] = static_cast<uint8_t>(v);
44
1.88k
                    dest[stream + (i + 1) * width] = static_cast<uint8_t>(v >> 8);
45
1.88k
                    dest[stream + (i + 2) * width] = static_cast<uint8_t>(v >> 16);
46
1.88k
                    dest[stream + (i + 3) * width] = static_cast<uint8_t>(v >> 24);
47
1.88k
                    dest[stream + (i + 4) * width] = static_cast<uint8_t>(v >> 32);
48
1.88k
                    dest[stream + (i + 5) * width] = static_cast<uint8_t>(v >> 40);
49
1.88k
                    dest[stream + (i + 6) * width] = static_cast<uint8_t>(v >> 48);
50
1.88k
                    dest[stream + (i + 7) * width] = static_cast<uint8_t>(v >> 56);
51
                } else if constexpr (std::endian::native == std::endian::big) {
52
                    dest[stream + i * width] = static_cast<uint8_t>(v >> 56);
53
                    dest[stream + (i + 1) * width] = static_cast<uint8_t>(v >> 48);
54
                    dest[stream + (i + 2) * width] = static_cast<uint8_t>(v >> 40);
55
                    dest[stream + (i + 3) * width] = static_cast<uint8_t>(v >> 32);
56
                    dest[stream + (i + 4) * width] = static_cast<uint8_t>(v >> 24);
57
                    dest[stream + (i + 5) * width] = static_cast<uint8_t>(v >> 16);
58
                    dest[stream + (i + 6) * width] = static_cast<uint8_t>(v >> 8);
59
                    dest[stream + (i + 7) * width] = static_cast<uint8_t>(v);
60
                }
61
1.88k
            }
62
118
            src_streams[stream] += kBlockSize;
63
118
        }
64
22
        dest += width * kBlockSize;
65
22
        nvalues -= kBlockSize;
66
22
    }
67
68
    // Epilog
69
295
    for (int stream = 0; stream < width; ++stream) {
70
250
        const uint8_t* src = src_streams[stream];
71
9.05k
        for (int64_t i = 0; i < nvalues; ++i) {
72
8.80k
            dest[stream + i * width] = src[i];
73
8.80k
        }
74
250
    }
75
45
}
76
77
template <int kNumStreams>
78
void byte_stream_split_decode_scalar(const uint8_t* src, int width, int64_t offset,
79
43
                                     int64_t num_values, int64_t stride, uint8_t* dest) {
80
43
    DCHECK(width == kNumStreams);
81
43
    std::array<const uint8_t*, kNumStreams> src_streams;
82
283
    for (int stream = 0; stream < kNumStreams; ++stream) {
83
240
        src_streams[stream] = &src[stream * stride + offset];
84
240
    }
85
43
    do_merge_streams(src_streams.data(), kNumStreams, num_values, dest);
86
43
}
_ZN5doris31byte_stream_split_decode_scalarILi2EEEvPKhilllPh
Line
Count
Source
79
4
                                     int64_t num_values, int64_t stride, uint8_t* dest) {
80
4
    DCHECK(width == kNumStreams);
81
4
    std::array<const uint8_t*, kNumStreams> src_streams;
82
12
    for (int stream = 0; stream < kNumStreams; ++stream) {
83
8
        src_streams[stream] = &src[stream * stride + offset];
84
8
    }
85
4
    do_merge_streams(src_streams.data(), kNumStreams, num_values, dest);
86
4
}
_ZN5doris31byte_stream_split_decode_scalarILi4EEEvPKhilllPh
Line
Count
Source
79
20
                                     int64_t num_values, int64_t stride, uint8_t* dest) {
80
20
    DCHECK(width == kNumStreams);
81
20
    std::array<const uint8_t*, kNumStreams> src_streams;
82
100
    for (int stream = 0; stream < kNumStreams; ++stream) {
83
80
        src_streams[stream] = &src[stream * stride + offset];
84
80
    }
85
20
    do_merge_streams(src_streams.data(), kNumStreams, num_values, dest);
86
20
}
_ZN5doris31byte_stream_split_decode_scalarILi8EEEvPKhilllPh
Line
Count
Source
79
19
                                     int64_t num_values, int64_t stride, uint8_t* dest) {
80
19
    DCHECK(width == kNumStreams);
81
19
    std::array<const uint8_t*, kNumStreams> src_streams;
82
171
    for (int stream = 0; stream < kNumStreams; ++stream) {
83
152
        src_streams[stream] = &src[stream * stride + offset];
84
152
    }
85
19
    do_merge_streams(src_streams.data(), kNumStreams, num_values, dest);
86
19
}
Unexecuted instantiation: _ZN5doris31byte_stream_split_decode_scalarILi16EEEvPKhilllPh
87
88
inline void byte_stream_split_decode_scalar_dynamic(const uint8_t* src, int width, int64_t offset,
89
                                                    int64_t num_values, int64_t stride,
90
2
                                                    uint8_t* dest) {
91
2
    std::vector<const uint8_t*> src_streams;
92
2
    src_streams.resize(width);
93
12
    for (int stream = 0; stream < width; ++stream) {
94
10
        src_streams[stream] = &src[stream * stride + offset];
95
10
    }
96
2
    do_merge_streams(src_streams.data(), width, num_values, dest);
97
2
}
98
99
// TODO: optimize using simd: https://github.com/apache/arrow/pull/38529
100
void byte_stream_split_decode(const uint8_t* src, int width, int64_t offset, int64_t num_values,
101
47
                              int64_t stride, uint8_t* dest) {
102
47
    switch (width) {
103
2
    case 1:
104
2
        memcpy(dest, src + offset * width, num_values);
105
2
        return;
106
4
    case 2:
107
4
        return byte_stream_split_decode_scalar<2>(src, width, offset, num_values, stride, dest);
108
20
    case 4:
109
20
        return byte_stream_split_decode_scalar<4>(src, width, offset, num_values, stride, dest);
110
19
    case 8:
111
19
        return byte_stream_split_decode_scalar<8>(src, width, offset, num_values, stride, dest);
112
0
    case 16:
113
0
        return byte_stream_split_decode_scalar<16>(src, width, offset, num_values, stride, dest);
114
47
    }
115
2
    return byte_stream_split_decode_scalar_dynamic(src, width, offset, num_values, stride, dest);
116
47
}
117
118
} // namespace doris