be/src/exprs/aggregate/aggregate_function_window_funnel.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | // This file is copied from |
19 | | // https://github.com/ClickHouse/ClickHouse/blob/master/AggregateFunctionWindowFunnel.h |
20 | | // and modified by Doris |
21 | | |
22 | | #pragma once |
23 | | |
24 | | #include <gen_cpp/data.pb.h> |
25 | | |
26 | | #include <algorithm> |
27 | | #include <boost/iterator/iterator_facade.hpp> |
28 | | #include <iterator> |
29 | | #include <memory> |
30 | | #include <type_traits> |
31 | | #include <utility> |
32 | | |
33 | | #include "common/cast_set.h" |
34 | | #include "common/exception.h" |
35 | | #include "common/status.h" |
36 | | #include "core/assert_cast.h" |
37 | | #include "core/binary_cast.hpp" |
38 | | #include "core/column/column_string.h" |
39 | | #include "core/data_type/data_type_number.h" |
40 | | #include "core/types.h" |
41 | | #include "core/value/vdatetime_value.h" |
42 | | #include "exec/sort/sort_block.h" |
43 | | #include "exprs/aggregate/aggregate_function.h" |
44 | | #include "util/simd/bits.h" |
45 | | #include "util/var_int.h" |
46 | | |
47 | | namespace doris { |
48 | | class Arena; |
49 | | class BufferReadable; |
50 | | class BufferWritable; |
51 | | class IColumn; |
52 | | } // namespace doris |
53 | | |
54 | | namespace doris { |
55 | | |
56 | | enum class WindowFunnelMode : Int64 { INVALID, DEFAULT, DEDUPLICATION, FIXED, INCREASE }; |
57 | | |
58 | 252 | inline WindowFunnelMode string_to_window_funnel_mode(const String& string) { |
59 | 252 | if (string == "default") { |
60 | 95 | return WindowFunnelMode::DEFAULT; |
61 | 157 | } else if (string == "deduplication") { |
62 | 36 | return WindowFunnelMode::DEDUPLICATION; |
63 | 121 | } else if (string == "fixed") { |
64 | 19 | return WindowFunnelMode::FIXED; |
65 | 102 | } else if (string == "increase") { |
66 | 18 | return WindowFunnelMode::INCREASE; |
67 | 84 | } else { |
68 | 84 | return WindowFunnelMode::INVALID; |
69 | 84 | } |
70 | 252 | } |
71 | | |
72 | | template <PrimitiveType T> |
73 | | struct DataValue { |
74 | | using TimestampEvent = std::vector<ColumnUInt8::Container>; |
75 | | using DateValueType = typename PrimitiveTypeTraits<T>::CppType; |
76 | | std::vector<DateValueType> dt; |
77 | | TimestampEvent event_columns_data; |
78 | | bool operator<(const DataValue& other) const { return dt < other.dt; } |
79 | 2 | void clear() { |
80 | 2 | dt.clear(); |
81 | 8 | for (auto& data : event_columns_data) { |
82 | 8 | data.clear(); |
83 | 8 | } |
84 | 2 | } _ZN5doris9DataValueILNS_13PrimitiveTypeE26EE5clearEv Line | Count | Source | 79 | 2 | void clear() { | 80 | 2 | dt.clear(); | 81 | 8 | for (auto& data : event_columns_data) { | 82 | 8 | data.clear(); | 83 | 8 | } | 84 | 2 | } |
Unexecuted instantiation: _ZN5doris9DataValueILNS_13PrimitiveTypeE42EE5clearEv |
85 | 72 | auto size() const { return dt.size(); }_ZNK5doris9DataValueILNS_13PrimitiveTypeE26EE4sizeEv Line | Count | Source | 85 | 72 | auto size() const { return dt.size(); } |
Unexecuted instantiation: _ZNK5doris9DataValueILNS_13PrimitiveTypeE42EE4sizeEv |
86 | 11 | bool empty() const { return dt.empty(); }_ZNK5doris9DataValueILNS_13PrimitiveTypeE26EE5emptyEv Line | Count | Source | 86 | 11 | bool empty() const { return dt.empty(); } |
Unexecuted instantiation: _ZNK5doris9DataValueILNS_13PrimitiveTypeE42EE5emptyEv |
87 | | std::string debug_string() const { |
88 | | std::string result = "\n" + std::to_string(dt.size()) + " " + |
89 | | std::to_string(event_columns_data[0].size()) + "\n"; |
90 | | for (size_t i = 0; i < dt.size(); ++i) { |
91 | | result += dt[i].debug_string() + " ,"; |
92 | | for (const auto& event : event_columns_data) { |
93 | | result += std::to_string(event[i]) + ","; |
94 | | } |
95 | | result += "\n"; |
96 | | } |
97 | | return result; |
98 | | } |
99 | | }; |
100 | | |
101 | | template <PrimitiveType T> |
102 | | struct WindowFunnelState { |
103 | | static constexpr PrimitiveType PType = T; |
104 | | using NativeType = typename PrimitiveTypeTraits<T>::StorageFieldType; |
105 | | using DateValueType = typename PrimitiveTypeTraits<T>::CppType; |
106 | | int event_count = 0; |
107 | | int64_t window; |
108 | | bool enable_mode; |
109 | | WindowFunnelMode window_funnel_mode; |
110 | | DataValue<T> events_list; |
111 | | |
112 | 34 | WindowFunnelState() { |
113 | 34 | event_count = 0; |
114 | 34 | window = 0; |
115 | 34 | window_funnel_mode = WindowFunnelMode::INVALID; |
116 | 34 | } _ZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EEC2Ev Line | Count | Source | 112 | 34 | WindowFunnelState() { | 113 | 34 | event_count = 0; | 114 | 34 | window = 0; | 115 | 34 | window_funnel_mode = WindowFunnelMode::INVALID; | 116 | 34 | } |
Unexecuted instantiation: _ZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EEC2Ev |
117 | 34 | WindowFunnelState(int arg_event_count) : WindowFunnelState() { |
118 | 34 | event_count = arg_event_count; |
119 | 34 | events_list.event_columns_data.resize(event_count); |
120 | 34 | } _ZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EEC2Ei Line | Count | Source | 117 | 34 | WindowFunnelState(int arg_event_count) : WindowFunnelState() { | 118 | 34 | event_count = arg_event_count; | 119 | 34 | events_list.event_columns_data.resize(event_count); | 120 | 34 | } |
Unexecuted instantiation: _ZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EEC2Ei |
121 | | |
122 | 0 | void reset() { events_list.clear(); }Unexecuted instantiation: _ZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE5resetEv Unexecuted instantiation: _ZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE5resetEv |
123 | | |
124 | 84 | void add(const IColumn** arg_columns, ssize_t row_num, int64_t win, WindowFunnelMode mode) { |
125 | 84 | window = win; |
126 | 84 | window_funnel_mode = enable_mode ? mode : WindowFunnelMode::DEFAULT; |
127 | 84 | events_list.dt.emplace_back( |
128 | 84 | assert_cast<const typename PrimitiveTypeTraits<PType>::ColumnType&>(*arg_columns[2]) |
129 | 84 | .get_data()[row_num]); |
130 | 420 | for (int i = 0; i < event_count; i++) { |
131 | 336 | events_list.event_columns_data[i].emplace_back( |
132 | 336 | assert_cast<const ColumnUInt8&>(*arg_columns[3 + i]).get_data()[row_num]); |
133 | 336 | } |
134 | 84 | } _ZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE3addEPPKNS_7IColumnEllNS_16WindowFunnelModeE Line | Count | Source | 124 | 84 | void add(const IColumn** arg_columns, ssize_t row_num, int64_t win, WindowFunnelMode mode) { | 125 | 84 | window = win; | 126 | 84 | window_funnel_mode = enable_mode ? mode : WindowFunnelMode::DEFAULT; | 127 | 84 | events_list.dt.emplace_back( | 128 | 84 | assert_cast<const typename PrimitiveTypeTraits<PType>::ColumnType&>(*arg_columns[2]) | 129 | 84 | .get_data()[row_num]); | 130 | 420 | for (int i = 0; i < event_count; i++) { | 131 | 336 | events_list.event_columns_data[i].emplace_back( | 132 | 336 | assert_cast<const ColumnUInt8&>(*arg_columns[3 + i]).get_data()[row_num]); | 133 | 336 | } | 134 | 84 | } |
Unexecuted instantiation: _ZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE3addEPPKNS_7IColumnEllNS_16WindowFunnelModeE |
135 | | |
136 | | // todo: rethink thid sort method. |
137 | 24 | void sort() { |
138 | 24 | auto num = events_list.size(); |
139 | 24 | std::vector<size_t> indices(num); |
140 | 24 | std::iota(indices.begin(), indices.end(), 0); |
141 | 24 | std::sort(indices.begin(), indices.end(), |
142 | 102 | [this](size_t i1, size_t i2) { return events_list.dt[i1] < events_list.dt[i2]; });_ZZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE4sortEvENKUlmmE_clEmm Line | Count | Source | 142 | 102 | [this](size_t i1, size_t i2) { return events_list.dt[i1] < events_list.dt[i2]; }); |
Unexecuted instantiation: _ZZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE4sortEvENKUlmmE_clEmm |
143 | | |
144 | 120 | auto reorder = [&indices, &num](auto& vec) { |
145 | 120 | std::decay_t<decltype(vec)> temp; |
146 | 120 | temp.resize(num); |
147 | 560 | for (auto i = 0; i < num; i++) { |
148 | 440 | temp[i] = vec[indices[i]]; |
149 | 440 | } |
150 | 120 | std::swap(vec, temp); |
151 | 120 | }; _ZZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE4sortEvENKUlRT_E_clISt6vectorINS_11DateV2ValueINS_19DateTimeV2ValueTypeEEESaISA_EEEEDaS4_ Line | Count | Source | 144 | 24 | auto reorder = [&indices, &num](auto& vec) { | 145 | 24 | std::decay_t<decltype(vec)> temp; | 146 | 24 | temp.resize(num); | 147 | 112 | for (auto i = 0; i < num; i++) { | 148 | 88 | temp[i] = vec[indices[i]]; | 149 | 88 | } | 150 | 24 | std::swap(vec, temp); | 151 | 24 | }; |
_ZZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE4sortEvENKUlRT_E_clINS_8PODArrayIhLm4096ENS_9AllocatorILb0ELb0ELb0ENS_22DefaultMemoryAllocatorELb1EEELm16ELm15EEEEEDaS4_ Line | Count | Source | 144 | 96 | auto reorder = [&indices, &num](auto& vec) { | 145 | 96 | std::decay_t<decltype(vec)> temp; | 146 | 96 | temp.resize(num); | 147 | 448 | for (auto i = 0; i < num; i++) { | 148 | 352 | temp[i] = vec[indices[i]]; | 149 | 352 | } | 150 | 96 | std::swap(vec, temp); | 151 | 96 | }; |
Unexecuted instantiation: _ZZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE4sortEvENKUlRT_E_clISt6vectorINS_16TimestampTzValueESaIS8_EEEEDaS4_ Unexecuted instantiation: _ZZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE4sortEvENKUlRT_E_clINS_8PODArrayIhLm4096ENS_9AllocatorILb0ELb0ELb0ENS_22DefaultMemoryAllocatorELb1EEELm16ELm15EEEEEDaS4_ |
152 | | |
153 | 24 | reorder(events_list.dt); |
154 | 96 | for (auto& inner_vec : events_list.event_columns_data) { |
155 | 96 | reorder(inner_vec); |
156 | 96 | } |
157 | 24 | } _ZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE4sortEv Line | Count | Source | 137 | 24 | void sort() { | 138 | 24 | auto num = events_list.size(); | 139 | 24 | std::vector<size_t> indices(num); | 140 | 24 | std::iota(indices.begin(), indices.end(), 0); | 141 | 24 | std::sort(indices.begin(), indices.end(), | 142 | 24 | [this](size_t i1, size_t i2) { return events_list.dt[i1] < events_list.dt[i2]; }); | 143 | | | 144 | 24 | auto reorder = [&indices, &num](auto& vec) { | 145 | 24 | std::decay_t<decltype(vec)> temp; | 146 | 24 | temp.resize(num); | 147 | 24 | for (auto i = 0; i < num; i++) { | 148 | 24 | temp[i] = vec[indices[i]]; | 149 | 24 | } | 150 | 24 | std::swap(vec, temp); | 151 | 24 | }; | 152 | | | 153 | 24 | reorder(events_list.dt); | 154 | 96 | for (auto& inner_vec : events_list.event_columns_data) { | 155 | 96 | reorder(inner_vec); | 156 | 96 | } | 157 | 24 | } |
Unexecuted instantiation: _ZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE4sortEv |
158 | | |
159 | | template <WindowFunnelMode WINDOW_FUNNEL_MODE> |
160 | 36 | int _match_event_list(size_t& start_row, size_t row_count) const { |
161 | 36 | int matched_count = 0; |
162 | 36 | DateValueType end_timestamp; |
163 | | |
164 | 36 | if (window < 0) { |
165 | 0 | throw Exception(ErrorCode::INVALID_ARGUMENT, |
166 | 0 | "the sliding time window must be a positive integer, but got: {}", |
167 | 0 | window); |
168 | 0 | } |
169 | 36 | TimeInterval interval(SECOND, window, false); |
170 | 36 | int column_idx = 0; |
171 | 36 | const auto& timestamp_data = events_list.dt; |
172 | 36 | const auto& first_event_data = events_list.event_columns_data[column_idx].data(); |
173 | 36 | auto match_row = simd::find_one(first_event_data, start_row, row_count); |
174 | 36 | start_row = match_row + 1; |
175 | 36 | if (match_row < row_count) { |
176 | 22 | auto prev_timestamp = timestamp_data[match_row]; |
177 | 22 | end_timestamp = prev_timestamp; |
178 | 22 | end_timestamp.template date_add_interval<SECOND>(interval); |
179 | | |
180 | 22 | matched_count++; |
181 | 22 | column_idx++; |
182 | 22 | auto last_match_row = match_row; |
183 | 22 | ++match_row; |
184 | 62 | for (; column_idx < event_count && match_row < row_count; column_idx++, match_row++) { |
185 | 54 | const auto& event_data = events_list.event_columns_data[column_idx]; |
186 | 54 | if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::FIXED) { |
187 | 0 | if (event_data[match_row] == 1) { |
188 | 0 | auto current_timestamp = timestamp_data[match_row]; |
189 | 0 | if (current_timestamp <= end_timestamp) { |
190 | 0 | matched_count++; |
191 | 0 | continue; |
192 | 0 | } |
193 | 0 | } |
194 | 0 | break; |
195 | 0 | } |
196 | 0 | match_row = simd::find_one(event_data.data(), match_row, row_count); |
197 | 54 | if (match_row < row_count) { |
198 | 54 | auto current_timestamp = timestamp_data[match_row]; |
199 | 54 | bool is_matched = current_timestamp <= end_timestamp; |
200 | 54 | if (is_matched) { |
201 | 40 | if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::INCREASE) { |
202 | 0 | is_matched = current_timestamp > prev_timestamp; |
203 | 0 | } |
204 | 40 | } |
205 | 54 | if (!is_matched) { |
206 | 14 | break; |
207 | 14 | } |
208 | 40 | if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::INCREASE) { |
209 | 0 | prev_timestamp = timestamp_data[match_row]; |
210 | 0 | } |
211 | 40 | if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::DEDUPLICATION) { |
212 | 0 | bool is_dup = false; |
213 | 0 | if (match_row != last_match_row + 1) { |
214 | 0 | for (int tmp_column_idx = 0; tmp_column_idx < column_idx; |
215 | 0 | tmp_column_idx++) { |
216 | 0 | const auto& tmp_event_data = |
217 | 0 | events_list.event_columns_data[tmp_column_idx].data(); |
218 | 0 | auto dup_match_row = simd::find_one(tmp_event_data, |
219 | 0 | last_match_row + 1, match_row); |
220 | 0 | if (dup_match_row < match_row) { |
221 | 0 | is_dup = true; |
222 | 0 | break; |
223 | 0 | } |
224 | 0 | } |
225 | 0 | } |
226 | 0 | if (is_dup) { |
227 | 0 | break; |
228 | 0 | } |
229 | 0 | last_match_row = match_row; |
230 | 0 | } |
231 | 0 | matched_count++; |
232 | 40 | } else { |
233 | 0 | break; |
234 | 0 | } |
235 | 54 | } |
236 | 22 | } |
237 | 0 | return matched_count; |
238 | 0 | } _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE17_match_event_listILNS_16WindowFunnelModeE1EEEiRmm Line | Count | Source | 160 | 36 | int _match_event_list(size_t& start_row, size_t row_count) const { | 161 | 36 | int matched_count = 0; | 162 | 36 | DateValueType end_timestamp; | 163 | | | 164 | 36 | if (window < 0) { | 165 | 0 | throw Exception(ErrorCode::INVALID_ARGUMENT, | 166 | 0 | "the sliding time window must be a positive integer, but got: {}", | 167 | 0 | window); | 168 | 0 | } | 169 | 36 | TimeInterval interval(SECOND, window, false); | 170 | 36 | int column_idx = 0; | 171 | 36 | const auto& timestamp_data = events_list.dt; | 172 | 36 | const auto& first_event_data = events_list.event_columns_data[column_idx].data(); | 173 | 36 | auto match_row = simd::find_one(first_event_data, start_row, row_count); | 174 | 36 | start_row = match_row + 1; | 175 | 36 | if (match_row < row_count) { | 176 | 22 | auto prev_timestamp = timestamp_data[match_row]; | 177 | 22 | end_timestamp = prev_timestamp; | 178 | 22 | end_timestamp.template date_add_interval<SECOND>(interval); | 179 | | | 180 | 22 | matched_count++; | 181 | 22 | column_idx++; | 182 | 22 | auto last_match_row = match_row; | 183 | 22 | ++match_row; | 184 | 62 | for (; column_idx < event_count && match_row < row_count; column_idx++, match_row++) { | 185 | 54 | const auto& event_data = events_list.event_columns_data[column_idx]; | 186 | | if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::FIXED) { | 187 | | if (event_data[match_row] == 1) { | 188 | | auto current_timestamp = timestamp_data[match_row]; | 189 | | if (current_timestamp <= end_timestamp) { | 190 | | matched_count++; | 191 | | continue; | 192 | | } | 193 | | } | 194 | | break; | 195 | | } | 196 | 54 | match_row = simd::find_one(event_data.data(), match_row, row_count); | 197 | 54 | if (match_row < row_count) { | 198 | 54 | auto current_timestamp = timestamp_data[match_row]; | 199 | 54 | bool is_matched = current_timestamp <= end_timestamp; | 200 | 54 | if (is_matched) { | 201 | | if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::INCREASE) { | 202 | | is_matched = current_timestamp > prev_timestamp; | 203 | | } | 204 | 40 | } | 205 | 54 | if (!is_matched) { | 206 | 14 | break; | 207 | 14 | } | 208 | | if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::INCREASE) { | 209 | | prev_timestamp = timestamp_data[match_row]; | 210 | | } | 211 | | if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::DEDUPLICATION) { | 212 | | bool is_dup = false; | 213 | | if (match_row != last_match_row + 1) { | 214 | | for (int tmp_column_idx = 0; tmp_column_idx < column_idx; | 215 | | tmp_column_idx++) { | 216 | | const auto& tmp_event_data = | 217 | | events_list.event_columns_data[tmp_column_idx].data(); | 218 | | auto dup_match_row = simd::find_one(tmp_event_data, | 219 | | last_match_row + 1, match_row); | 220 | | if (dup_match_row < match_row) { | 221 | | is_dup = true; | 222 | | break; | 223 | | } | 224 | | } | 225 | | } | 226 | | if (is_dup) { | 227 | | break; | 228 | | } | 229 | | last_match_row = match_row; | 230 | | } | 231 | 40 | matched_count++; | 232 | 40 | } else { | 233 | 0 | break; | 234 | 0 | } | 235 | 54 | } | 236 | 22 | } | 237 | 36 | return matched_count; | 238 | 36 | } |
Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE17_match_event_listILNS_16WindowFunnelModeE2EEEiRmm Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE17_match_event_listILNS_16WindowFunnelModeE3EEEiRmm Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE17_match_event_listILNS_16WindowFunnelModeE4EEEiRmm Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE17_match_event_listILNS_16WindowFunnelModeE1EEEiRmm Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE17_match_event_listILNS_16WindowFunnelModeE2EEEiRmm Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE17_match_event_listILNS_16WindowFunnelModeE3EEEiRmm Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE17_match_event_listILNS_16WindowFunnelModeE4EEEiRmm |
239 | | |
240 | | template <WindowFunnelMode WINDOW_FUNNEL_MODE> |
241 | 22 | int _get_internal() const { |
242 | 22 | size_t start_row = 0; |
243 | 22 | int max_found_event_count = 0; |
244 | 22 | auto row_count = events_list.size(); |
245 | 50 | while (start_row < row_count) { |
246 | 36 | auto found_event_count = _match_event_list<WINDOW_FUNNEL_MODE>(start_row, row_count); |
247 | 36 | if (found_event_count == event_count) { |
248 | 8 | return found_event_count; |
249 | 8 | } |
250 | 28 | max_found_event_count = std::max(max_found_event_count, found_event_count); |
251 | 28 | } |
252 | 14 | return max_found_event_count; |
253 | 22 | } _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE13_get_internalILNS_16WindowFunnelModeE1EEEiv Line | Count | Source | 241 | 22 | int _get_internal() const { | 242 | 22 | size_t start_row = 0; | 243 | 22 | int max_found_event_count = 0; | 244 | 22 | auto row_count = events_list.size(); | 245 | 50 | while (start_row < row_count) { | 246 | 36 | auto found_event_count = _match_event_list<WINDOW_FUNNEL_MODE>(start_row, row_count); | 247 | 36 | if (found_event_count == event_count) { | 248 | 8 | return found_event_count; | 249 | 8 | } | 250 | 28 | max_found_event_count = std::max(max_found_event_count, found_event_count); | 251 | 28 | } | 252 | 14 | return max_found_event_count; | 253 | 22 | } |
Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE13_get_internalILNS_16WindowFunnelModeE2EEEiv Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE13_get_internalILNS_16WindowFunnelModeE3EEEiv Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE13_get_internalILNS_16WindowFunnelModeE4EEEiv Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE13_get_internalILNS_16WindowFunnelModeE1EEEiv Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE13_get_internalILNS_16WindowFunnelModeE2EEEiv Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE13_get_internalILNS_16WindowFunnelModeE3EEEiv Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE13_get_internalILNS_16WindowFunnelModeE4EEEiv |
254 | 24 | int get() const { |
255 | 24 | auto row_count = events_list.size(); |
256 | 24 | if (event_count == 0 || row_count == 0) { |
257 | 2 | return 0; |
258 | 2 | } |
259 | 22 | switch (window_funnel_mode) { |
260 | 22 | case WindowFunnelMode::DEFAULT: |
261 | 22 | return _get_internal<WindowFunnelMode::DEFAULT>(); |
262 | 0 | case WindowFunnelMode::DEDUPLICATION: |
263 | 0 | return _get_internal<WindowFunnelMode::DEDUPLICATION>(); |
264 | 0 | case WindowFunnelMode::FIXED: |
265 | 0 | return _get_internal<WindowFunnelMode::FIXED>(); |
266 | 0 | case WindowFunnelMode::INCREASE: |
267 | 0 | return _get_internal<WindowFunnelMode::INCREASE>(); |
268 | 0 | default: |
269 | 0 | throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Invalid window_funnel mode"); |
270 | 0 | return 0; |
271 | 22 | } |
272 | 22 | } _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE3getEv Line | Count | Source | 254 | 24 | int get() const { | 255 | 24 | auto row_count = events_list.size(); | 256 | 24 | if (event_count == 0 || row_count == 0) { | 257 | 2 | return 0; | 258 | 2 | } | 259 | 22 | switch (window_funnel_mode) { | 260 | 22 | case WindowFunnelMode::DEFAULT: | 261 | 22 | return _get_internal<WindowFunnelMode::DEFAULT>(); | 262 | 0 | case WindowFunnelMode::DEDUPLICATION: | 263 | 0 | return _get_internal<WindowFunnelMode::DEDUPLICATION>(); | 264 | 0 | case WindowFunnelMode::FIXED: | 265 | 0 | return _get_internal<WindowFunnelMode::FIXED>(); | 266 | 0 | case WindowFunnelMode::INCREASE: | 267 | 0 | return _get_internal<WindowFunnelMode::INCREASE>(); | 268 | 0 | default: | 269 | 0 | throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Invalid window_funnel mode"); | 270 | 0 | return 0; | 271 | 22 | } | 272 | 22 | } |
Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE3getEv |
273 | | |
274 | 11 | void merge(const WindowFunnelState<T>& other) { |
275 | 11 | if (other.events_list.empty()) { |
276 | 1 | return; |
277 | 1 | } |
278 | 10 | events_list.dt.insert(std::end(events_list.dt), std::begin(other.events_list.dt), |
279 | 10 | std::end(other.events_list.dt)); |
280 | 50 | for (size_t i = 0; i < event_count; i++) { |
281 | 40 | events_list.event_columns_data[i].insert( |
282 | 40 | std::end(events_list.event_columns_data[i]), |
283 | 40 | std::begin(other.events_list.event_columns_data[i]), |
284 | 40 | std::end(other.events_list.event_columns_data[i])); |
285 | 40 | } |
286 | 10 | event_count = event_count > 0 ? event_count : other.event_count; |
287 | 10 | window = window > 0 ? window : other.window; |
288 | 10 | if (enable_mode) { |
289 | 0 | window_funnel_mode = window_funnel_mode == WindowFunnelMode::INVALID |
290 | 0 | ? other.window_funnel_mode |
291 | 0 | : window_funnel_mode; |
292 | 10 | } else { |
293 | 10 | window_funnel_mode = WindowFunnelMode::DEFAULT; |
294 | 10 | } |
295 | 10 | } _ZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE5mergeERKS2_ Line | Count | Source | 274 | 11 | void merge(const WindowFunnelState<T>& other) { | 275 | 11 | if (other.events_list.empty()) { | 276 | 1 | return; | 277 | 1 | } | 278 | 10 | events_list.dt.insert(std::end(events_list.dt), std::begin(other.events_list.dt), | 279 | 10 | std::end(other.events_list.dt)); | 280 | 50 | for (size_t i = 0; i < event_count; i++) { | 281 | 40 | events_list.event_columns_data[i].insert( | 282 | 40 | std::end(events_list.event_columns_data[i]), | 283 | 40 | std::begin(other.events_list.event_columns_data[i]), | 284 | 40 | std::end(other.events_list.event_columns_data[i])); | 285 | 40 | } | 286 | 10 | event_count = event_count > 0 ? event_count : other.event_count; | 287 | 10 | window = window > 0 ? window : other.window; | 288 | 10 | if (enable_mode) { | 289 | 0 | window_funnel_mode = window_funnel_mode == WindowFunnelMode::INVALID | 290 | 0 | ? other.window_funnel_mode | 291 | 0 | : window_funnel_mode; | 292 | 10 | } else { | 293 | 10 | window_funnel_mode = WindowFunnelMode::DEFAULT; | 294 | 10 | } | 295 | 10 | } |
Unexecuted instantiation: _ZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE5mergeERKS2_ |
296 | | |
297 | 2 | void write(BufferWritable& out) const { |
298 | 2 | write_var_int(event_count, out); |
299 | 2 | write_var_int(window, out); |
300 | 2 | if (enable_mode) { |
301 | 0 | write_var_int(static_cast<std::underlying_type_t<WindowFunnelMode>>(window_funnel_mode), |
302 | 0 | out); |
303 | 0 | } |
304 | 2 | auto size = events_list.size(); |
305 | 2 | write_var_int(size, out); |
306 | 4 | for (const auto& timestamp : events_list.dt) { |
307 | 4 | write_var_int(timestamp.to_date_int_val(), out); |
308 | 4 | } |
309 | 10 | for (int64_t i = 0; i < event_count; i++) { |
310 | 8 | const auto& event_columns_data = events_list.event_columns_data[i]; |
311 | 16 | for (auto event : event_columns_data) { |
312 | 16 | write_var_int(event, out); |
313 | 16 | } |
314 | 8 | } |
315 | 2 | } _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE5writeERNS_14BufferWritableE Line | Count | Source | 297 | 2 | void write(BufferWritable& out) const { | 298 | 2 | write_var_int(event_count, out); | 299 | 2 | write_var_int(window, out); | 300 | 2 | if (enable_mode) { | 301 | 0 | write_var_int(static_cast<std::underlying_type_t<WindowFunnelMode>>(window_funnel_mode), | 302 | 0 | out); | 303 | 0 | } | 304 | 2 | auto size = events_list.size(); | 305 | 2 | write_var_int(size, out); | 306 | 4 | for (const auto& timestamp : events_list.dt) { | 307 | 4 | write_var_int(timestamp.to_date_int_val(), out); | 308 | 4 | } | 309 | 10 | for (int64_t i = 0; i < event_count; i++) { | 310 | 8 | const auto& event_columns_data = events_list.event_columns_data[i]; | 311 | 16 | for (auto event : event_columns_data) { | 312 | 16 | write_var_int(event, out); | 313 | 16 | } | 314 | 8 | } | 315 | 2 | } |
Unexecuted instantiation: _ZNK5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE5writeERNS_14BufferWritableE |
316 | | |
317 | 2 | void read(BufferReadable& in) { |
318 | 2 | int64_t event_level; |
319 | 2 | read_var_int(event_level, in); |
320 | 2 | event_count = (int)event_level; |
321 | 2 | read_var_int(window, in); |
322 | 2 | window_funnel_mode = WindowFunnelMode::DEFAULT; |
323 | 2 | if (enable_mode) { |
324 | 0 | int64_t mode; |
325 | 0 | read_var_int(mode, in); |
326 | 0 | window_funnel_mode = static_cast<WindowFunnelMode>(mode); |
327 | 0 | } |
328 | 2 | int64_t size = 0; |
329 | 2 | read_var_int(size, in); |
330 | 2 | events_list.clear(); |
331 | 2 | events_list.dt.resize(size); |
332 | 6 | for (auto i = 0; i < size; i++) { |
333 | 4 | Int64 timestamp = 0; |
334 | 4 | read_var_int(timestamp, in); |
335 | 4 | events_list.dt[i] = DateValueType(static_cast<UInt64>(timestamp)); |
336 | 4 | } |
337 | 2 | events_list.event_columns_data.resize(event_count); |
338 | 10 | for (int64_t i = 0; i < event_count; i++) { |
339 | 8 | auto& event_columns_data = events_list.event_columns_data[i]; |
340 | 8 | event_columns_data.resize(size); |
341 | 24 | for (auto j = 0; j < size; j++) { |
342 | 16 | Int64 temp_value; |
343 | 16 | read_var_int(temp_value, in); |
344 | 16 | event_columns_data[j] = static_cast<UInt8>(temp_value); |
345 | 16 | } |
346 | 8 | } |
347 | 2 | } _ZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE26EE4readERNS_14BufferReadableE Line | Count | Source | 317 | 2 | void read(BufferReadable& in) { | 318 | 2 | int64_t event_level; | 319 | 2 | read_var_int(event_level, in); | 320 | 2 | event_count = (int)event_level; | 321 | 2 | read_var_int(window, in); | 322 | 2 | window_funnel_mode = WindowFunnelMode::DEFAULT; | 323 | 2 | if (enable_mode) { | 324 | 0 | int64_t mode; | 325 | 0 | read_var_int(mode, in); | 326 | 0 | window_funnel_mode = static_cast<WindowFunnelMode>(mode); | 327 | 0 | } | 328 | 2 | int64_t size = 0; | 329 | 2 | read_var_int(size, in); | 330 | 2 | events_list.clear(); | 331 | 2 | events_list.dt.resize(size); | 332 | 6 | for (auto i = 0; i < size; i++) { | 333 | 4 | Int64 timestamp = 0; | 334 | 4 | read_var_int(timestamp, in); | 335 | 4 | events_list.dt[i] = DateValueType(static_cast<UInt64>(timestamp)); | 336 | 4 | } | 337 | 2 | events_list.event_columns_data.resize(event_count); | 338 | 10 | for (int64_t i = 0; i < event_count; i++) { | 339 | 8 | auto& event_columns_data = events_list.event_columns_data[i]; | 340 | 8 | event_columns_data.resize(size); | 341 | 24 | for (auto j = 0; j < size; j++) { | 342 | 16 | Int64 temp_value; | 343 | 16 | read_var_int(temp_value, in); | 344 | 16 | event_columns_data[j] = static_cast<UInt8>(temp_value); | 345 | 16 | } | 346 | 8 | } | 347 | 2 | } |
Unexecuted instantiation: _ZN5doris17WindowFunnelStateILNS_13PrimitiveTypeE42EE4readERNS_14BufferReadableE |
348 | | }; |
349 | | |
350 | | template <PrimitiveType T> |
351 | | class AggregateFunctionWindowFunnel final |
352 | | : public IAggregateFunctionDataHelper<WindowFunnelState<T>, |
353 | | AggregateFunctionWindowFunnel<T>>, |
354 | | MultiExpression, |
355 | | NullableAggregateFunction { |
356 | | public: |
357 | | AggregateFunctionWindowFunnel(const DataTypes& argument_types_) |
358 | 6 | : IAggregateFunctionDataHelper<WindowFunnelState<T>, AggregateFunctionWindowFunnel<T>>( |
359 | 6 | argument_types_) {}_ZN5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE26EEC2ERKSt6vectorISt10shared_ptrIKNS_9IDataTypeEESaIS7_EE Line | Count | Source | 358 | 6 | : IAggregateFunctionDataHelper<WindowFunnelState<T>, AggregateFunctionWindowFunnel<T>>( | 359 | 6 | argument_types_) {} |
Unexecuted instantiation: _ZN5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE42EEC2ERKSt6vectorISt10shared_ptrIKNS_9IDataTypeEESaIS7_EE |
360 | | |
361 | 34 | void create(AggregateDataPtr __restrict place) const override { |
362 | 34 | auto data = new (place) WindowFunnelState<T>( |
363 | 34 | cast_set<int>(IAggregateFunction::get_argument_types().size() - 3)); |
364 | | /// support window funnel mode from 2.0. See `BeExecVersionManager::max_be_exec_version` |
365 | 34 | data->enable_mode = IAggregateFunction::version >= 3; |
366 | 34 | } _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE26EE6createEPc Line | Count | Source | 361 | 34 | void create(AggregateDataPtr __restrict place) const override { | 362 | 34 | auto data = new (place) WindowFunnelState<T>( | 363 | 34 | cast_set<int>(IAggregateFunction::get_argument_types().size() - 3)); | 364 | | /// support window funnel mode from 2.0. See `BeExecVersionManager::max_be_exec_version` | 365 | 34 | data->enable_mode = IAggregateFunction::version >= 3; | 366 | 34 | } |
Unexecuted instantiation: _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE42EE6createEPc |
367 | | |
368 | 0 | String get_name() const override { return "window_funnel"; }Unexecuted instantiation: _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE26EE8get_nameB5cxx11Ev Unexecuted instantiation: _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE42EE8get_nameB5cxx11Ev |
369 | | |
370 | 0 | DataTypePtr get_return_type() const override { return std::make_shared<DataTypeInt32>(); }Unexecuted instantiation: _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE26EE15get_return_typeEv Unexecuted instantiation: _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE42EE15get_return_typeEv |
371 | | |
372 | 0 | void reset(AggregateDataPtr __restrict place) const override { this->data(place).reset(); }Unexecuted instantiation: _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE26EE5resetEPc Unexecuted instantiation: _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE42EE5resetEPc |
373 | | |
374 | | void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num, |
375 | 84 | Arena&) const override { |
376 | 84 | const auto& window = assert_cast<const ColumnInt64&>(*columns[0]).get_data()[row_num]; |
377 | 84 | StringRef mode = columns[1]->get_data_at(row_num); |
378 | 84 | this->data(place).add(columns, row_num, window, |
379 | 84 | string_to_window_funnel_mode(mode.to_string())); |
380 | 84 | } _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE26EE3addEPcPPKNS_7IColumnElRNS_5ArenaE Line | Count | Source | 375 | 84 | Arena&) const override { | 376 | 84 | const auto& window = assert_cast<const ColumnInt64&>(*columns[0]).get_data()[row_num]; | 377 | 84 | StringRef mode = columns[1]->get_data_at(row_num); | 378 | 84 | this->data(place).add(columns, row_num, window, | 379 | 84 | string_to_window_funnel_mode(mode.to_string())); | 380 | 84 | } |
Unexecuted instantiation: _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE42EE3addEPcPPKNS_7IColumnElRNS_5ArenaE |
381 | | |
382 | | void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, |
383 | 11 | Arena&) const override { |
384 | 11 | this->data(place).merge(this->data(rhs)); |
385 | 11 | } _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE26EE5mergeEPcPKcRNS_5ArenaE Line | Count | Source | 383 | 11 | Arena&) const override { | 384 | 11 | this->data(place).merge(this->data(rhs)); | 385 | 11 | } |
Unexecuted instantiation: _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE42EE5mergeEPcPKcRNS_5ArenaE |
386 | | |
387 | 2 | void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override { |
388 | 2 | this->data(place).write(buf); |
389 | 2 | } _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE26EE9serializeEPKcRNS_14BufferWritableE Line | Count | Source | 387 | 2 | void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override { | 388 | 2 | this->data(place).write(buf); | 389 | 2 | } |
Unexecuted instantiation: _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE42EE9serializeEPKcRNS_14BufferWritableE |
390 | | |
391 | | void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, |
392 | 2 | Arena&) const override { |
393 | 2 | this->data(place).read(buf); |
394 | 2 | } _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE26EE11deserializeEPcRNS_14BufferReadableERNS_5ArenaE Line | Count | Source | 392 | 2 | Arena&) const override { | 393 | 2 | this->data(place).read(buf); | 394 | 2 | } |
Unexecuted instantiation: _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE42EE11deserializeEPcRNS_14BufferReadableERNS_5ArenaE |
395 | | |
396 | 24 | void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { |
397 | | // place is essentially an AggregateDataPtr, passed as a ConstAggregateDataPtr. |
398 | 24 | this->data(const_cast<AggregateDataPtr>(place)).sort(); |
399 | 24 | assert_cast<ColumnInt32&>(to).get_data().push_back( |
400 | 24 | IAggregateFunctionDataHelper<WindowFunnelState<T>, |
401 | 24 | AggregateFunctionWindowFunnel<T>>::data(place) |
402 | 24 | .get()); |
403 | 24 | } _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE26EE18insert_result_intoEPKcRNS_7IColumnE Line | Count | Source | 396 | 24 | void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { | 397 | | // place is essentially an AggregateDataPtr, passed as a ConstAggregateDataPtr. | 398 | 24 | this->data(const_cast<AggregateDataPtr>(place)).sort(); | 399 | 24 | assert_cast<ColumnInt32&>(to).get_data().push_back( | 400 | 24 | IAggregateFunctionDataHelper<WindowFunnelState<T>, | 401 | 24 | AggregateFunctionWindowFunnel<T>>::data(place) | 402 | 24 | .get()); | 403 | 24 | } |
Unexecuted instantiation: _ZNK5doris29AggregateFunctionWindowFunnelILNS_13PrimitiveTypeE42EE18insert_result_intoEPKcRNS_7IColumnE |
404 | | |
405 | | protected: |
406 | | using IAggregateFunction::version; |
407 | | }; |
408 | | } // namespace doris |