Coverage Report

Created: 2026-05-15 01:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exprs/aggregate/aggregate_function_window_funnel.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
// This file is copied from
19
// https://github.com/ClickHouse/ClickHouse/blob/master/AggregateFunctionWindowFunnel.h
20
// and modified by Doris
21
22
#pragma once
23
24
#include <gen_cpp/data.pb.h>
25
26
#include <algorithm>
27
#include <boost/iterator/iterator_facade.hpp>
28
#include <iterator>
29
#include <memory>
30
#include <type_traits>
31
#include <utility>
32
33
#include "common/cast_set.h"
34
#include "common/exception.h"
35
#include "common/status.h"
36
#include "core/assert_cast.h"
37
#include "core/binary_cast.hpp"
38
#include "core/column/column_string.h"
39
#include "core/data_type/data_type_number.h"
40
#include "core/types.h"
41
#include "core/value/vdatetime_value.h"
42
#include "exec/sort/sort_block.h"
43
#include "exprs/aggregate/aggregate_function.h"
44
#include "util/simd/bits.h"
45
#include "util/var_int.h"
46
47
namespace doris {
48
class Arena;
49
class BufferReadable;
50
class BufferWritable;
51
class IColumn;
52
} // namespace doris
53
54
namespace doris {
55
56
enum class WindowFunnelMode : Int64 { INVALID, DEFAULT, DEDUPLICATION, FIXED, INCREASE };
57
58
252
inline WindowFunnelMode string_to_window_funnel_mode(const String& string) {
59
252
    if (string == "default") {
60
95
        return WindowFunnelMode::DEFAULT;
61
157
    } else if (string == "deduplication") {
62
36
        return WindowFunnelMode::DEDUPLICATION;
63
121
    } else if (string == "fixed") {
64
19
        return WindowFunnelMode::FIXED;
65
102
    } else if (string == "increase") {
66
18
        return WindowFunnelMode::INCREASE;
67
84
    } else {
68
84
        return WindowFunnelMode::INVALID;
69
84
    }
70
252
}
71
72
template <PrimitiveType T>
73
struct DataValue {
74
    using TimestampEvent = std::vector<ColumnUInt8::Container>;
75
    using DateValueType = typename PrimitiveTypeTraits<T>::CppType;
76
    std::vector<DateValueType> dt;
77
    TimestampEvent event_columns_data;
78
    bool operator<(const DataValue& other) const { return dt < other.dt; }
79
2
    void clear() {
80
2
        dt.clear();
81
8
        for (auto& data : event_columns_data) {
82
8
            data.clear();
83
8
        }
84
2
    }
_ZN5doris9DataValueILNS_13PrimitiveTypeE26EE5clearEv
Line
Count
Source
79
2
    void clear() {
80
2
        dt.clear();
81
8
        for (auto& data : event_columns_data) {
82
8
            data.clear();
83
8
        }
84
2
    }
Unexecuted instantiation: _ZN5doris9DataValueILNS_13PrimitiveTypeE42EE5clearEv
85
72
    auto size() const { return dt.size(); }
_ZNK5doris9DataValueILNS_13PrimitiveTypeE26EE4sizeEv
Line
Count
Source
85
72
    auto size() const { return dt.size(); }
Unexecuted instantiation: _ZNK5doris9DataValueILNS_13PrimitiveTypeE42EE4sizeEv
86
11
    bool empty() const { return dt.empty(); }
_ZNK5doris9DataValueILNS_13PrimitiveTypeE26EE5emptyEv
Line
Count
Source
86
11
    bool empty() const { return dt.empty(); }
Unexecuted instantiation: _ZNK5doris9DataValueILNS_13PrimitiveTypeE42EE5emptyEv
87
    std::string debug_string() const {
88
        std::string result = "\n" + std::to_string(dt.size()) + " " +
89
                             std::to_string(event_columns_data[0].size()) + "\n";
90
        for (size_t i = 0; i < dt.size(); ++i) {
91
            result += dt[i].debug_string() + " ,";
92
            for (const auto& event : event_columns_data) {
93
                result += std::to_string(event[i]) + ",";
94
            }
95
            result += "\n";
96
        }
97
        return result;
98
    }
99
};
100
101
template <PrimitiveType T>
102
struct WindowFunnelState {
103
    static constexpr PrimitiveType PType = T;
104
    using NativeType = typename PrimitiveTypeTraits<T>::StorageFieldType;
105
    using DateValueType = typename PrimitiveTypeTraits<T>::CppType;
106
    int event_count = 0;
107
    int64_t window;
108
    bool enable_mode;
109
    WindowFunnelMode window_funnel_mode;
110
    DataValue<T> events_list;
111
112
34
    WindowFunnelState() {
113
34
        event_count = 0;
114
34
        window = 0;
115
34
        window_funnel_mode = WindowFunnelMode::INVALID;
116
34
    }
_ZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EEC2Ev
Line
Count
Source
112
34
    WindowFunnelState() {
113
34
        event_count = 0;
114
34
        window = 0;
115
34
        window_funnel_mode = WindowFunnelMode::INVALID;
116
34
    }
Unexecuted instantiation: _ZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EEC2Ev
117
34
    WindowFunnelState(int arg_event_count) : WindowFunnelState() {
118
34
        event_count = arg_event_count;
119
34
        events_list.event_columns_data.resize(event_count);
120
34
    }
_ZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EEC2Ei
Line
Count
Source
117
34
    WindowFunnelState(int arg_event_count) : WindowFunnelState() {
118
34
        event_count = arg_event_count;
119
34
        events_list.event_columns_data.resize(event_count);
120
34
    }
Unexecuted instantiation: _ZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EEC2Ei
121
122
0
    void reset() { events_list.clear(); }
Unexecuted instantiation: _ZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE5resetEv
Unexecuted instantiation: _ZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE5resetEv
123
124
84
    void add(const IColumn** arg_columns, ssize_t row_num, int64_t win, WindowFunnelMode mode) {
125
84
        window = win;
126
84
        window_funnel_mode = enable_mode ? mode : WindowFunnelMode::DEFAULT;
127
84
        events_list.dt.emplace_back(
128
84
                assert_cast<const typename PrimitiveTypeTraits<PType>::ColumnType&>(*arg_columns[2])
129
84
                        .get_data()[row_num]);
130
420
        for (int i = 0; i < event_count; i++) {
131
336
            events_list.event_columns_data[i].emplace_back(
132
336
                    assert_cast<const ColumnUInt8&>(*arg_columns[3 + i]).get_data()[row_num]);
133
336
        }
134
84
    }
_ZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE3addEPPKNS_7IColumnEllNS_16WindowFunnelModeE
Line
Count
Source
124
84
    void add(const IColumn** arg_columns, ssize_t row_num, int64_t win, WindowFunnelMode mode) {
125
84
        window = win;
126
84
        window_funnel_mode = enable_mode ? mode : WindowFunnelMode::DEFAULT;
127
84
        events_list.dt.emplace_back(
128
84
                assert_cast<const typename PrimitiveTypeTraits<PType>::ColumnType&>(*arg_columns[2])
129
84
                        .get_data()[row_num]);
130
420
        for (int i = 0; i < event_count; i++) {
131
336
            events_list.event_columns_data[i].emplace_back(
132
336
                    assert_cast<const ColumnUInt8&>(*arg_columns[3 + i]).get_data()[row_num]);
133
336
        }
134
84
    }
Unexecuted instantiation: _ZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE3addEPPKNS_7IColumnEllNS_16WindowFunnelModeE
135
136
    // todo: rethink thid sort method.
137
24
    void sort() {
138
24
        auto num = events_list.size();
139
24
        std::vector<size_t> indices(num);
140
24
        std::iota(indices.begin(), indices.end(), 0);
141
24
        std::sort(indices.begin(), indices.end(),
142
102
                  [this](size_t i1, size_t i2) { return events_list.dt[i1] < events_list.dt[i2]; });
_ZZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE4sortEvENKUlmmE_clEmm
Line
Count
Source
142
102
                  [this](size_t i1, size_t i2) { return events_list.dt[i1] < events_list.dt[i2]; });
Unexecuted instantiation: _ZZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE4sortEvENKUlmmE_clEmm
143
144
120
        auto reorder = [&indices, &num](auto& vec) {
145
120
            std::decay_t<decltype(vec)> temp;
146
120
            temp.resize(num);
147
560
            for (auto i = 0; i < num; i++) {
148
440
                temp[i] = vec[indices[i]];
149
440
            }
150
120
            std::swap(vec, temp);
151
120
        };
_ZZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE4sortEvENKUlRT_E_clISt6vectorINS_11DateV2ValueINS_19DateTimeV2ValueTypeEEESaISA_EEEEDaS4_
Line
Count
Source
144
24
        auto reorder = [&indices, &num](auto& vec) {
145
24
            std::decay_t<decltype(vec)> temp;
146
24
            temp.resize(num);
147
112
            for (auto i = 0; i < num; i++) {
148
88
                temp[i] = vec[indices[i]];
149
88
            }
150
24
            std::swap(vec, temp);
151
24
        };
_ZZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE4sortEvENKUlRT_E_clINS_8PODArrayIhLm4096ENS_9AllocatorILb0ELb0ELb0ENS_22DefaultMemoryAllocatorELb1EEELm16ELm15EEEEEDaS4_
Line
Count
Source
144
96
        auto reorder = [&indices, &num](auto& vec) {
145
96
            std::decay_t<decltype(vec)> temp;
146
96
            temp.resize(num);
147
448
            for (auto i = 0; i < num; i++) {
148
352
                temp[i] = vec[indices[i]];
149
352
            }
150
96
            std::swap(vec, temp);
151
96
        };
Unexecuted instantiation: _ZZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE4sortEvENKUlRT_E_clISt6vectorINS_16TimestampTzValueESaIS8_EEEEDaS4_
Unexecuted instantiation: _ZZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE4sortEvENKUlRT_E_clINS_8PODArrayIhLm4096ENS_9AllocatorILb0ELb0ELb0ENS_22DefaultMemoryAllocatorELb1EEELm16ELm15EEEEEDaS4_
152
153
24
        reorder(events_list.dt);
154
96
        for (auto& inner_vec : events_list.event_columns_data) {
155
96
            reorder(inner_vec);
156
96
        }
157
24
    }
_ZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE4sortEv
Line
Count
Source
137
24
    void sort() {
138
24
        auto num = events_list.size();
139
24
        std::vector<size_t> indices(num);
140
24
        std::iota(indices.begin(), indices.end(), 0);
141
24
        std::sort(indices.begin(), indices.end(),
142
24
                  [this](size_t i1, size_t i2) { return events_list.dt[i1] < events_list.dt[i2]; });
143
144
24
        auto reorder = [&indices, &num](auto& vec) {
145
24
            std::decay_t<decltype(vec)> temp;
146
24
            temp.resize(num);
147
24
            for (auto i = 0; i < num; i++) {
148
24
                temp[i] = vec[indices[i]];
149
24
            }
150
24
            std::swap(vec, temp);
151
24
        };
152
153
24
        reorder(events_list.dt);
154
96
        for (auto& inner_vec : events_list.event_columns_data) {
155
96
            reorder(inner_vec);
156
96
        }
157
24
    }
Unexecuted instantiation: _ZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE4sortEv
158
159
    template <WindowFunnelMode WINDOW_FUNNEL_MODE>
160
36
    int _match_event_list(size_t& start_row, size_t row_count) const {
161
36
        int matched_count = 0;
162
36
        DateValueType end_timestamp;
163
164
36
        if (window < 0) {
165
0
            throw Exception(ErrorCode::INVALID_ARGUMENT,
166
0
                            "the sliding time window must be a positive integer, but got: {}",
167
0
                            window);
168
0
        }
169
36
        TimeInterval interval(SECOND, window, false);
170
36
        int column_idx = 0;
171
36
        const auto& timestamp_data = events_list.dt;
172
36
        const auto& first_event_data = events_list.event_columns_data[column_idx].data();
173
36
        auto match_row = simd::find_one(first_event_data, start_row, row_count);
174
36
        start_row = match_row + 1;
175
36
        if (match_row < row_count) {
176
22
            auto prev_timestamp = timestamp_data[match_row];
177
22
            end_timestamp = prev_timestamp;
178
22
            end_timestamp.template date_add_interval<SECOND>(interval);
179
180
22
            matched_count++;
181
22
            column_idx++;
182
22
            auto last_match_row = match_row;
183
22
            ++match_row;
184
62
            for (; column_idx < event_count && match_row < row_count; column_idx++, match_row++) {
185
54
                const auto& event_data = events_list.event_columns_data[column_idx];
186
54
                if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::FIXED) {
187
0
                    if (event_data[match_row] == 1) {
188
0
                        auto current_timestamp = timestamp_data[match_row];
189
0
                        if (current_timestamp <= end_timestamp) {
190
0
                            matched_count++;
191
0
                            continue;
192
0
                        }
193
0
                    }
194
0
                    break;
195
0
                }
196
0
                match_row = simd::find_one(event_data.data(), match_row, row_count);
197
54
                if (match_row < row_count) {
198
54
                    auto current_timestamp = timestamp_data[match_row];
199
54
                    bool is_matched = current_timestamp <= end_timestamp;
200
54
                    if (is_matched) {
201
40
                        if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::INCREASE) {
202
0
                            is_matched = current_timestamp > prev_timestamp;
203
0
                        }
204
40
                    }
205
54
                    if (!is_matched) {
206
14
                        break;
207
14
                    }
208
40
                    if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::INCREASE) {
209
0
                        prev_timestamp = timestamp_data[match_row];
210
0
                    }
211
40
                    if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::DEDUPLICATION) {
212
0
                        bool is_dup = false;
213
0
                        if (match_row != last_match_row + 1) {
214
0
                            for (int tmp_column_idx = 0; tmp_column_idx < column_idx;
215
0
                                 tmp_column_idx++) {
216
0
                                const auto& tmp_event_data =
217
0
                                        events_list.event_columns_data[tmp_column_idx].data();
218
0
                                auto dup_match_row = simd::find_one(tmp_event_data,
219
0
                                                                    last_match_row + 1, match_row);
220
0
                                if (dup_match_row < match_row) {
221
0
                                    is_dup = true;
222
0
                                    break;
223
0
                                }
224
0
                            }
225
0
                        }
226
0
                        if (is_dup) {
227
0
                            break;
228
0
                        }
229
0
                        last_match_row = match_row;
230
0
                    }
231
0
                    matched_count++;
232
40
                } else {
233
0
                    break;
234
0
                }
235
54
            }
236
22
        }
237
0
        return matched_count;
238
0
    }
_ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE17_match_event_listILNS_16WindowFunnelModeE1EEEiRmm
Line
Count
Source
160
36
    int _match_event_list(size_t& start_row, size_t row_count) const {
161
36
        int matched_count = 0;
162
36
        DateValueType end_timestamp;
163
164
36
        if (window < 0) {
165
0
            throw Exception(ErrorCode::INVALID_ARGUMENT,
166
0
                            "the sliding time window must be a positive integer, but got: {}",
167
0
                            window);
168
0
        }
169
36
        TimeInterval interval(SECOND, window, false);
170
36
        int column_idx = 0;
171
36
        const auto& timestamp_data = events_list.dt;
172
36
        const auto& first_event_data = events_list.event_columns_data[column_idx].data();
173
36
        auto match_row = simd::find_one(first_event_data, start_row, row_count);
174
36
        start_row = match_row + 1;
175
36
        if (match_row < row_count) {
176
22
            auto prev_timestamp = timestamp_data[match_row];
177
22
            end_timestamp = prev_timestamp;
178
22
            end_timestamp.template date_add_interval<SECOND>(interval);
179
180
22
            matched_count++;
181
22
            column_idx++;
182
22
            auto last_match_row = match_row;
183
22
            ++match_row;
184
62
            for (; column_idx < event_count && match_row < row_count; column_idx++, match_row++) {
185
54
                const auto& event_data = events_list.event_columns_data[column_idx];
186
                if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::FIXED) {
187
                    if (event_data[match_row] == 1) {
188
                        auto current_timestamp = timestamp_data[match_row];
189
                        if (current_timestamp <= end_timestamp) {
190
                            matched_count++;
191
                            continue;
192
                        }
193
                    }
194
                    break;
195
                }
196
54
                match_row = simd::find_one(event_data.data(), match_row, row_count);
197
54
                if (match_row < row_count) {
198
54
                    auto current_timestamp = timestamp_data[match_row];
199
54
                    bool is_matched = current_timestamp <= end_timestamp;
200
54
                    if (is_matched) {
201
                        if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::INCREASE) {
202
                            is_matched = current_timestamp > prev_timestamp;
203
                        }
204
40
                    }
205
54
                    if (!is_matched) {
206
14
                        break;
207
14
                    }
208
                    if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::INCREASE) {
209
                        prev_timestamp = timestamp_data[match_row];
210
                    }
211
                    if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::DEDUPLICATION) {
212
                        bool is_dup = false;
213
                        if (match_row != last_match_row + 1) {
214
                            for (int tmp_column_idx = 0; tmp_column_idx < column_idx;
215
                                 tmp_column_idx++) {
216
                                const auto& tmp_event_data =
217
                                        events_list.event_columns_data[tmp_column_idx].data();
218
                                auto dup_match_row = simd::find_one(tmp_event_data,
219
                                                                    last_match_row + 1, match_row);
220
                                if (dup_match_row < match_row) {
221
                                    is_dup = true;
222
                                    break;
223
                                }
224
                            }
225
                        }
226
                        if (is_dup) {
227
                            break;
228
                        }
229
                        last_match_row = match_row;
230
                    }
231
40
                    matched_count++;
232
40
                } else {
233
0
                    break;
234
0
                }
235
54
            }
236
22
        }
237
36
        return matched_count;
238
36
    }
Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE17_match_event_listILNS_16WindowFunnelModeE2EEEiRmm
Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE17_match_event_listILNS_16WindowFunnelModeE3EEEiRmm
Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE17_match_event_listILNS_16WindowFunnelModeE4EEEiRmm
Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE17_match_event_listILNS_16WindowFunnelModeE1EEEiRmm
Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE17_match_event_listILNS_16WindowFunnelModeE2EEEiRmm
Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE17_match_event_listILNS_16WindowFunnelModeE3EEEiRmm
Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE17_match_event_listILNS_16WindowFunnelModeE4EEEiRmm
239
240
    template <WindowFunnelMode WINDOW_FUNNEL_MODE>
241
22
    int _get_internal() const {
242
22
        size_t start_row = 0;
243
22
        int max_found_event_count = 0;
244
22
        auto row_count = events_list.size();
245
50
        while (start_row < row_count) {
246
36
            auto found_event_count = _match_event_list<WINDOW_FUNNEL_MODE>(start_row, row_count);
247
36
            if (found_event_count == event_count) {
248
8
                return found_event_count;
249
8
            }
250
28
            max_found_event_count = std::max(max_found_event_count, found_event_count);
251
28
        }
252
14
        return max_found_event_count;
253
22
    }
_ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE13_get_internalILNS_16WindowFunnelModeE1EEEiv
Line
Count
Source
241
22
    int _get_internal() const {
242
22
        size_t start_row = 0;
243
22
        int max_found_event_count = 0;
244
22
        auto row_count = events_list.size();
245
50
        while (start_row < row_count) {
246
36
            auto found_event_count = _match_event_list<WINDOW_FUNNEL_MODE>(start_row, row_count);
247
36
            if (found_event_count == event_count) {
248
8
                return found_event_count;
249
8
            }
250
28
            max_found_event_count = std::max(max_found_event_count, found_event_count);
251
28
        }
252
14
        return max_found_event_count;
253
22
    }
Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE13_get_internalILNS_16WindowFunnelModeE2EEEiv
Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE13_get_internalILNS_16WindowFunnelModeE3EEEiv
Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE13_get_internalILNS_16WindowFunnelModeE4EEEiv
Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE13_get_internalILNS_16WindowFunnelModeE1EEEiv
Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE13_get_internalILNS_16WindowFunnelModeE2EEEiv
Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE13_get_internalILNS_16WindowFunnelModeE3EEEiv
Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE13_get_internalILNS_16WindowFunnelModeE4EEEiv
254
24
    int get() const {
255
24
        auto row_count = events_list.size();
256
24
        if (event_count == 0 || row_count == 0) {
257
2
            return 0;
258
2
        }
259
22
        switch (window_funnel_mode) {
260
22
        case WindowFunnelMode::DEFAULT:
261
22
            return _get_internal<WindowFunnelMode::DEFAULT>();
262
0
        case WindowFunnelMode::DEDUPLICATION:
263
0
            return _get_internal<WindowFunnelMode::DEDUPLICATION>();
264
0
        case WindowFunnelMode::FIXED:
265
0
            return _get_internal<WindowFunnelMode::FIXED>();
266
0
        case WindowFunnelMode::INCREASE:
267
0
            return _get_internal<WindowFunnelMode::INCREASE>();
268
0
        default:
269
0
            throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Invalid window_funnel mode");
270
0
            return 0;
271
22
        }
272
22
    }
_ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE3getEv
Line
Count
Source
254
24
    int get() const {
255
24
        auto row_count = events_list.size();
256
24
        if (event_count == 0 || row_count == 0) {
257
2
            return 0;
258
2
        }
259
22
        switch (window_funnel_mode) {
260
22
        case WindowFunnelMode::DEFAULT:
261
22
            return _get_internal<WindowFunnelMode::DEFAULT>();
262
0
        case WindowFunnelMode::DEDUPLICATION:
263
0
            return _get_internal<WindowFunnelMode::DEDUPLICATION>();
264
0
        case WindowFunnelMode::FIXED:
265
0
            return _get_internal<WindowFunnelMode::FIXED>();
266
0
        case WindowFunnelMode::INCREASE:
267
0
            return _get_internal<WindowFunnelMode::INCREASE>();
268
0
        default:
269
0
            throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Invalid window_funnel mode");
270
0
            return 0;
271
22
        }
272
22
    }
Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE3getEv
273
274
11
    void merge(const WindowFunnelState<T>& other) {
275
11
        if (other.events_list.empty()) {
276
1
            return;
277
1
        }
278
10
        events_list.dt.insert(std::end(events_list.dt), std::begin(other.events_list.dt),
279
10
                              std::end(other.events_list.dt));
280
50
        for (size_t i = 0; i < event_count; i++) {
281
40
            events_list.event_columns_data[i].insert(
282
40
                    std::end(events_list.event_columns_data[i]),
283
40
                    std::begin(other.events_list.event_columns_data[i]),
284
40
                    std::end(other.events_list.event_columns_data[i]));
285
40
        }
286
10
        event_count = event_count > 0 ? event_count : other.event_count;
287
10
        window = window > 0 ? window : other.window;
288
10
        if (enable_mode) {
289
0
            window_funnel_mode = window_funnel_mode == WindowFunnelMode::INVALID
290
0
                                         ? other.window_funnel_mode
291
0
                                         : window_funnel_mode;
292
10
        } else {
293
10
            window_funnel_mode = WindowFunnelMode::DEFAULT;
294
10
        }
295
10
    }
_ZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE5mergeERKS2_
Line
Count
Source
274
11
    void merge(const WindowFunnelState<T>& other) {
275
11
        if (other.events_list.empty()) {
276
1
            return;
277
1
        }
278
10
        events_list.dt.insert(std::end(events_list.dt), std::begin(other.events_list.dt),
279
10
                              std::end(other.events_list.dt));
280
50
        for (size_t i = 0; i < event_count; i++) {
281
40
            events_list.event_columns_data[i].insert(
282
40
                    std::end(events_list.event_columns_data[i]),
283
40
                    std::begin(other.events_list.event_columns_data[i]),
284
40
                    std::end(other.events_list.event_columns_data[i]));
285
40
        }
286
10
        event_count = event_count > 0 ? event_count : other.event_count;
287
10
        window = window > 0 ? window : other.window;
288
10
        if (enable_mode) {
289
0
            window_funnel_mode = window_funnel_mode == WindowFunnelMode::INVALID
290
0
                                         ? other.window_funnel_mode
291
0
                                         : window_funnel_mode;
292
10
        } else {
293
10
            window_funnel_mode = WindowFunnelMode::DEFAULT;
294
10
        }
295
10
    }
Unexecuted instantiation: _ZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE5mergeERKS2_
296
297
2
    void write(BufferWritable& out) const {
298
2
        write_var_int(event_count, out);
299
2
        write_var_int(window, out);
300
2
        if (enable_mode) {
301
0
            write_var_int(static_cast<std::underlying_type_t<WindowFunnelMode>>(window_funnel_mode),
302
0
                          out);
303
0
        }
304
2
        auto size = events_list.size();
305
2
        write_var_int(size, out);
306
4
        for (const auto& timestamp : events_list.dt) {
307
4
            write_var_int(timestamp.to_date_int_val(), out);
308
4
        }
309
10
        for (int64_t i = 0; i < event_count; i++) {
310
8
            const auto& event_columns_data = events_list.event_columns_data[i];
311
16
            for (auto event : event_columns_data) {
312
16
                write_var_int(event, out);
313
16
            }
314
8
        }
315
2
    }
_ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE5writeERNS_14BufferWritableE
Line
Count
Source
297
2
    void write(BufferWritable& out) const {
298
2
        write_var_int(event_count, out);
299
2
        write_var_int(window, out);
300
2
        if (enable_mode) {
301
0
            write_var_int(static_cast<std::underlying_type_t<WindowFunnelMode>>(window_funnel_mode),
302
0
                          out);
303
0
        }
304
2
        auto size = events_list.size();
305
2
        write_var_int(size, out);
306
4
        for (const auto& timestamp : events_list.dt) {
307
4
            write_var_int(timestamp.to_date_int_val(), out);
308
4
        }
309
10
        for (int64_t i = 0; i < event_count; i++) {
310
8
            const auto& event_columns_data = events_list.event_columns_data[i];
311
16
            for (auto event : event_columns_data) {
312
16
                write_var_int(event, out);
313
16
            }
314
8
        }
315
2
    }
Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE5writeERNS_14BufferWritableE
316
317
2
    void read(BufferReadable& in) {
318
2
        int64_t event_level;
319
2
        read_var_int(event_level, in);
320
2
        event_count = (int)event_level;
321
2
        read_var_int(window, in);
322
2
        window_funnel_mode = WindowFunnelMode::DEFAULT;
323
2
        if (enable_mode) {
324
0
            int64_t mode;
325
0
            read_var_int(mode, in);
326
0
            window_funnel_mode = static_cast<WindowFunnelMode>(mode);
327
0
        }
328
2
        int64_t size = 0;
329
2
        read_var_int(size, in);
330
2
        events_list.clear();
331
2
        events_list.dt.resize(size);
332
6
        for (auto i = 0; i < size; i++) {
333
4
            Int64 timestamp = 0;
334
4
            read_var_int(timestamp, in);
335
4
            events_list.dt[i] = DateValueType(static_cast<UInt64>(timestamp));
336
4
        }
337
2
        events_list.event_columns_data.resize(event_count);
338
10
        for (int64_t i = 0; i < event_count; i++) {
339
8
            auto& event_columns_data = events_list.event_columns_data[i];
340
8
            event_columns_data.resize(size);
341
24
            for (auto j = 0; j < size; j++) {
342
16
                Int64 temp_value;
343
16
                read_var_int(temp_value, in);
344
16
                event_columns_data[j] = static_cast<UInt8>(temp_value);
345
16
            }
346
8
        }
347
2
    }
_ZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE4readERNS_14BufferReadableE
Line
Count
Source
317
2
    void read(BufferReadable& in) {
318
2
        int64_t event_level;
319
2
        read_var_int(event_level, in);
320
2
        event_count = (int)event_level;
321
2
        read_var_int(window, in);
322
2
        window_funnel_mode = WindowFunnelMode::DEFAULT;
323
2
        if (enable_mode) {
324
0
            int64_t mode;
325
0
            read_var_int(mode, in);
326
0
            window_funnel_mode = static_cast<WindowFunnelMode>(mode);
327
0
        }
328
2
        int64_t size = 0;
329
2
        read_var_int(size, in);
330
2
        events_list.clear();
331
2
        events_list.dt.resize(size);
332
6
        for (auto i = 0; i < size; i++) {
333
4
            Int64 timestamp = 0;
334
4
            read_var_int(timestamp, in);
335
4
            events_list.dt[i] = DateValueType(static_cast<UInt64>(timestamp));
336
4
        }
337
2
        events_list.event_columns_data.resize(event_count);
338
10
        for (int64_t i = 0; i < event_count; i++) {
339
8
            auto& event_columns_data = events_list.event_columns_data[i];
340
8
            event_columns_data.resize(size);
341
24
            for (auto j = 0; j < size; j++) {
342
16
                Int64 temp_value;
343
16
                read_var_int(temp_value, in);
344
16
                event_columns_data[j] = static_cast<UInt8>(temp_value);
345
16
            }
346
8
        }
347
2
    }
Unexecuted instantiation: _ZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE4readERNS_14BufferReadableE
348
};
349
350
template <PrimitiveType T>
351
class AggregateFunctionWindowFunnel final
352
        : public IAggregateFunctionDataHelper<WindowFunnelState<T>,
353
                                              AggregateFunctionWindowFunnel<T>>,
354
          MultiExpression,
355
          NullableAggregateFunction {
356
public:
357
    AggregateFunctionWindowFunnel(const DataTypes& argument_types_)
358
6
            : IAggregateFunctionDataHelper<WindowFunnelState<T>, AggregateFunctionWindowFunnel<T>>(
359
6
                      argument_types_) {}
_ZN5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE26EEC2ERKSt6vectorISt10shared_ptrIKNS_9IDataTypeEESaIS7_EE
Line
Count
Source
358
6
            : IAggregateFunctionDataHelper<WindowFunnelState<T>, AggregateFunctionWindowFunnel<T>>(
359
6
                      argument_types_) {}
Unexecuted instantiation: _ZN5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE42EEC2ERKSt6vectorISt10shared_ptrIKNS_9IDataTypeEESaIS7_EE
360
361
34
    void create(AggregateDataPtr __restrict place) const override {
362
34
        auto data = new (place) WindowFunnelState<T>(
363
34
                cast_set<int>(IAggregateFunction::get_argument_types().size() - 3));
364
        /// support window funnel mode from 2.0. See `BeExecVersionManager::max_be_exec_version`
365
34
        data->enable_mode = IAggregateFunction::version >= 3;
366
34
    }
_ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE26EE6createEPc
Line
Count
Source
361
34
    void create(AggregateDataPtr __restrict place) const override {
362
34
        auto data = new (place) WindowFunnelState<T>(
363
34
                cast_set<int>(IAggregateFunction::get_argument_types().size() - 3));
364
        /// support window funnel mode from 2.0. See `BeExecVersionManager::max_be_exec_version`
365
34
        data->enable_mode = IAggregateFunction::version >= 3;
366
34
    }
Unexecuted instantiation: _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE42EE6createEPc
367
368
0
    String get_name() const override { return "window_funnel"; }
Unexecuted instantiation: _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE26EE8get_nameB5cxx11Ev
Unexecuted instantiation: _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE42EE8get_nameB5cxx11Ev
369
370
0
    DataTypePtr get_return_type() const override { return std::make_shared<DataTypeInt32>(); }
Unexecuted instantiation: _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE26EE15get_return_typeEv
Unexecuted instantiation: _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE42EE15get_return_typeEv
371
372
0
    void reset(AggregateDataPtr __restrict place) const override { this->data(place).reset(); }
Unexecuted instantiation: _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE26EE5resetEPc
Unexecuted instantiation: _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE42EE5resetEPc
373
374
    void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num,
375
84
             Arena&) const override {
376
84
        const auto& window = assert_cast<const ColumnInt64&>(*columns[0]).get_data()[row_num];
377
84
        StringRef mode = columns[1]->get_data_at(row_num);
378
84
        this->data(place).add(columns, row_num, window,
379
84
                              string_to_window_funnel_mode(mode.to_string()));
380
84
    }
_ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE26EE3addEPcPPKNS_7IColumnElRNS_5ArenaE
Line
Count
Source
375
84
             Arena&) const override {
376
84
        const auto& window = assert_cast<const ColumnInt64&>(*columns[0]).get_data()[row_num];
377
84
        StringRef mode = columns[1]->get_data_at(row_num);
378
84
        this->data(place).add(columns, row_num, window,
379
84
                              string_to_window_funnel_mode(mode.to_string()));
380
84
    }
Unexecuted instantiation: _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE42EE3addEPcPPKNS_7IColumnElRNS_5ArenaE
381
382
    void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
383
11
               Arena&) const override {
384
11
        this->data(place).merge(this->data(rhs));
385
11
    }
_ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE26EE5mergeEPcPKcRNS_5ArenaE
Line
Count
Source
383
11
               Arena&) const override {
384
11
        this->data(place).merge(this->data(rhs));
385
11
    }
Unexecuted instantiation: _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE42EE5mergeEPcPKcRNS_5ArenaE
386
387
2
    void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override {
388
2
        this->data(place).write(buf);
389
2
    }
_ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE26EE9serializeEPKcRNS_14BufferWritableE
Line
Count
Source
387
2
    void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override {
388
2
        this->data(place).write(buf);
389
2
    }
Unexecuted instantiation: _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE42EE9serializeEPKcRNS_14BufferWritableE
390
391
    void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
392
2
                     Arena&) const override {
393
2
        this->data(place).read(buf);
394
2
    }
_ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE26EE11deserializeEPcRNS_14BufferReadableERNS_5ArenaE
Line
Count
Source
392
2
                     Arena&) const override {
393
2
        this->data(place).read(buf);
394
2
    }
Unexecuted instantiation: _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE42EE11deserializeEPcRNS_14BufferReadableERNS_5ArenaE
395
396
24
    void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override {
397
        // place is essentially an AggregateDataPtr, passed as a ConstAggregateDataPtr.
398
24
        this->data(const_cast<AggregateDataPtr>(place)).sort();
399
24
        assert_cast<ColumnInt32&>(to).get_data().push_back(
400
24
                IAggregateFunctionDataHelper<WindowFunnelState<T>,
401
24
                                             AggregateFunctionWindowFunnel<T>>::data(place)
402
24
                        .get());
403
24
    }
_ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE26EE18insert_result_intoEPKcRNS_7IColumnE
Line
Count
Source
396
24
    void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override {
397
        // place is essentially an AggregateDataPtr, passed as a ConstAggregateDataPtr.
398
24
        this->data(const_cast<AggregateDataPtr>(place)).sort();
399
24
        assert_cast<ColumnInt32&>(to).get_data().push_back(
400
24
                IAggregateFunctionDataHelper<WindowFunnelState<T>,
401
24
                                             AggregateFunctionWindowFunnel<T>>::data(place)
402
24
                        .get());
403
24
    }
Unexecuted instantiation: _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE42EE18insert_result_intoEPKcRNS_7IColumnE
404
405
protected:
406
    using IAggregateFunction::version;
407
};
408
} // namespace doris