be/src/exprs/aggregate/aggregate_function_window_funnel.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | // This file is copied from |
19 | | // https://github.com/ClickHouse/ClickHouse/blob/master/AggregateFunctionWindowFunnel.h |
20 | | // and modified by Doris |
21 | | |
22 | | #pragma once |
23 | | |
24 | | #include <gen_cpp/data.pb.h> |
25 | | |
26 | | #include <algorithm> |
27 | | #include <boost/iterator/iterator_facade.hpp> |
28 | | #include <iterator> |
29 | | #include <memory> |
30 | | #include <type_traits> |
31 | | #include <utility> |
32 | | |
33 | | #include "common/cast_set.h" |
34 | | #include "common/exception.h" |
35 | | #include "common/status.h" |
36 | | #include "core/assert_cast.h" |
37 | | #include "core/binary_cast.hpp" |
38 | | #include "core/column/column_string.h" |
39 | | #include "core/data_type/data_type_number.h" |
40 | | #include "core/types.h" |
41 | | #include "core/value/vdatetime_value.h" |
42 | | #include "exec/sort/sort_block.h" |
43 | | #include "exprs/aggregate/aggregate_function.h" |
44 | | #include "util/simd/bits.h" |
45 | | #include "util/var_int.h" |
46 | | |
47 | | namespace doris { |
48 | | #include "common/compile_check_begin.h" |
49 | | class Arena; |
50 | | class BufferReadable; |
51 | | class BufferWritable; |
52 | | class IColumn; |
53 | | } // namespace doris |
54 | | |
55 | | namespace doris { |
56 | | |
57 | | enum class WindowFunnelMode : Int64 { INVALID, DEFAULT, DEDUPLICATION, FIXED, INCREASE }; |
58 | | |
59 | 84 | WindowFunnelMode string_to_window_funnel_mode(const String& string) { |
60 | 84 | if (string == "default") { |
61 | 0 | return WindowFunnelMode::DEFAULT; |
62 | 84 | } else if (string == "deduplication") { |
63 | 0 | return WindowFunnelMode::DEDUPLICATION; |
64 | 84 | } else if (string == "fixed") { |
65 | 0 | return WindowFunnelMode::FIXED; |
66 | 84 | } else if (string == "increase") { |
67 | 0 | return WindowFunnelMode::INCREASE; |
68 | 84 | } else { |
69 | 84 | return WindowFunnelMode::INVALID; |
70 | 84 | } |
71 | 84 | } |
72 | | |
73 | | struct DataValue { |
74 | | using TimestampEvent = std::vector<ColumnUInt8::Container>; |
75 | | std::vector<DateV2Value<DateTimeV2ValueType>> dt; |
76 | | TimestampEvent event_columns_data; |
77 | 0 | bool operator<(const DataValue& other) const { return dt < other.dt; } |
78 | 2 | void clear() { |
79 | 2 | dt.clear(); |
80 | 8 | for (auto& data : event_columns_data) { |
81 | 8 | data.clear(); |
82 | 8 | } |
83 | 2 | } |
84 | 72 | auto size() const { return dt.size(); } |
85 | 11 | bool empty() const { return dt.empty(); } |
86 | 0 | std::string debug_string() const { |
87 | 0 | std::string result = "\n" + std::to_string(dt.size()) + " " + |
88 | 0 | std::to_string(event_columns_data[0].size()) + "\n"; |
89 | 0 | for (size_t i = 0; i < dt.size(); ++i) { |
90 | 0 | result += dt[i].debug_string() + " ,"; |
91 | 0 | for (const auto& event : event_columns_data) { |
92 | 0 | result += std::to_string(event[i]) + ","; |
93 | 0 | } |
94 | 0 | result += "\n"; |
95 | 0 | } |
96 | 0 | return result; |
97 | 0 | } |
98 | | }; |
99 | | |
100 | | struct WindowFunnelState { |
101 | | static constexpr PrimitiveType PType = PrimitiveType::TYPE_DATETIMEV2; |
102 | | using NativeType = UInt64; |
103 | | using DateValueType = DateV2Value<DateTimeV2ValueType>; |
104 | | int event_count = 0; |
105 | | int64_t window; |
106 | | bool enable_mode; |
107 | | WindowFunnelMode window_funnel_mode; |
108 | | DataValue events_list; |
109 | | |
110 | 34 | WindowFunnelState() { |
111 | 34 | event_count = 0; |
112 | 34 | window = 0; |
113 | 34 | window_funnel_mode = WindowFunnelMode::INVALID; |
114 | 34 | } |
115 | 34 | WindowFunnelState(int arg_event_count) : WindowFunnelState() { |
116 | 34 | event_count = arg_event_count; |
117 | 34 | events_list.event_columns_data.resize(event_count); |
118 | 34 | } |
119 | | |
120 | 0 | void reset() { events_list.clear(); } |
121 | | |
122 | 84 | void add(const IColumn** arg_columns, ssize_t row_num, int64_t win, WindowFunnelMode mode) { |
123 | 84 | window = win; |
124 | 84 | window_funnel_mode = enable_mode ? mode : WindowFunnelMode::DEFAULT; |
125 | 84 | events_list.dt.emplace_back( |
126 | 84 | assert_cast<const ColumnVector<PType>&>(*arg_columns[2]).get_data()[row_num]); |
127 | 420 | for (int i = 0; i < event_count; i++) { |
128 | 336 | events_list.event_columns_data[i].emplace_back( |
129 | 336 | assert_cast<const ColumnUInt8&>(*arg_columns[3 + i]).get_data()[row_num]); |
130 | 336 | } |
131 | 84 | } |
132 | | |
133 | | // todo: rethink thid sort method. |
134 | 24 | void sort() { |
135 | 24 | auto num = events_list.size(); |
136 | 24 | std::vector<size_t> indices(num); |
137 | 24 | std::iota(indices.begin(), indices.end(), 0); |
138 | 24 | std::sort(indices.begin(), indices.end(), |
139 | 102 | [this](size_t i1, size_t i2) { return events_list.dt[i1] < events_list.dt[i2]; }); |
140 | | |
141 | 120 | auto reorder = [&indices, &num](auto& vec) { |
142 | 120 | std::decay_t<decltype(vec)> temp; |
143 | 120 | temp.resize(num); |
144 | 560 | for (auto i = 0; i < num; i++) { |
145 | 440 | temp[i] = vec[indices[i]]; |
146 | 440 | } |
147 | 120 | std::swap(vec, temp); |
148 | 120 | }; _ZZN5doris17WindowFunnelState4sortEvENKUlRT_E_clISt6vectorINS_11DateV2ValueINS_19DateTimeV2ValueTypeEEESaIS8_EEEEDaS2_ Line | Count | Source | 141 | 24 | auto reorder = [&indices, &num](auto& vec) { | 142 | 24 | std::decay_t<decltype(vec)> temp; | 143 | 24 | temp.resize(num); | 144 | 112 | for (auto i = 0; i < num; i++) { | 145 | 88 | temp[i] = vec[indices[i]]; | 146 | 88 | } | 147 | 24 | std::swap(vec, temp); | 148 | 24 | }; |
_ZZN5doris17WindowFunnelState4sortEvENKUlRT_E_clINS_8PODArrayIhLm4096ENS_9AllocatorILb0ELb0ELb0ENS_22DefaultMemoryAllocatorELb1EEELm16ELm15EEEEEDaS2_ Line | Count | Source | 141 | 96 | auto reorder = [&indices, &num](auto& vec) { | 142 | 96 | std::decay_t<decltype(vec)> temp; | 143 | 96 | temp.resize(num); | 144 | 448 | for (auto i = 0; i < num; i++) { | 145 | 352 | temp[i] = vec[indices[i]]; | 146 | 352 | } | 147 | 96 | std::swap(vec, temp); | 148 | 96 | }; |
|
149 | | |
150 | 24 | reorder(events_list.dt); |
151 | 96 | for (auto& inner_vec : events_list.event_columns_data) { |
152 | 96 | reorder(inner_vec); |
153 | 96 | } |
154 | 24 | } |
155 | | |
156 | | template <WindowFunnelMode WINDOW_FUNNEL_MODE> |
157 | 36 | int _match_event_list(size_t& start_row, size_t row_count) const { |
158 | 36 | int matched_count = 0; |
159 | 36 | DateValueType end_timestamp; |
160 | | |
161 | 36 | if (window < 0) { |
162 | 0 | throw Exception(ErrorCode::INVALID_ARGUMENT, |
163 | 0 | "the sliding time window must be a positive integer, but got: {}", |
164 | 0 | window); |
165 | 0 | } |
166 | 36 | TimeInterval interval(SECOND, window, false); |
167 | 36 | int column_idx = 0; |
168 | 36 | const auto& timestamp_data = events_list.dt; |
169 | 36 | const auto& first_event_data = events_list.event_columns_data[column_idx].data(); |
170 | 36 | auto match_row = simd::find_one(first_event_data, start_row, row_count); |
171 | 36 | start_row = match_row + 1; |
172 | 36 | if (match_row < row_count) { |
173 | 22 | auto prev_timestamp = timestamp_data[match_row]; |
174 | 22 | end_timestamp = prev_timestamp; |
175 | 22 | end_timestamp.template date_add_interval<SECOND>(interval); |
176 | | |
177 | 22 | matched_count++; |
178 | 22 | column_idx++; |
179 | 22 | auto last_match_row = match_row; |
180 | 22 | ++match_row; |
181 | 62 | for (; column_idx < event_count && match_row < row_count; column_idx++, match_row++) { |
182 | 54 | const auto& event_data = events_list.event_columns_data[column_idx]; |
183 | 54 | if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::FIXED) { |
184 | 0 | if (event_data[match_row] == 1) { |
185 | 0 | auto current_timestamp = timestamp_data[match_row]; |
186 | 0 | if (current_timestamp <= end_timestamp) { |
187 | 0 | matched_count++; |
188 | 0 | continue; |
189 | 0 | } |
190 | 0 | } |
191 | 0 | break; |
192 | 0 | } |
193 | 0 | match_row = simd::find_one(event_data.data(), match_row, row_count); |
194 | 54 | if (match_row < row_count) { |
195 | 54 | auto current_timestamp = timestamp_data[match_row]; |
196 | 54 | bool is_matched = current_timestamp <= end_timestamp; |
197 | 54 | if (is_matched) { |
198 | 40 | if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::INCREASE) { |
199 | 0 | is_matched = current_timestamp > prev_timestamp; |
200 | 0 | } |
201 | 40 | } |
202 | 54 | if (!is_matched) { |
203 | 14 | break; |
204 | 14 | } |
205 | 40 | if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::INCREASE) { |
206 | 0 | prev_timestamp = timestamp_data[match_row]; |
207 | 0 | } |
208 | 40 | if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::DEDUPLICATION) { |
209 | 0 | bool is_dup = false; |
210 | 0 | if (match_row != last_match_row + 1) { |
211 | 0 | for (int tmp_column_idx = 0; tmp_column_idx < column_idx; |
212 | 0 | tmp_column_idx++) { |
213 | 0 | const auto& tmp_event_data = |
214 | 0 | events_list.event_columns_data[tmp_column_idx].data(); |
215 | 0 | auto dup_match_row = simd::find_one(tmp_event_data, |
216 | 0 | last_match_row + 1, match_row); |
217 | 0 | if (dup_match_row < match_row) { |
218 | 0 | is_dup = true; |
219 | 0 | break; |
220 | 0 | } |
221 | 0 | } |
222 | 0 | } |
223 | 0 | if (is_dup) { |
224 | 0 | break; |
225 | 0 | } |
226 | 0 | last_match_row = match_row; |
227 | 0 | } |
228 | 0 | matched_count++; |
229 | 40 | } else { |
230 | 0 | break; |
231 | 0 | } |
232 | 54 | } |
233 | 22 | } |
234 | 0 | return matched_count; |
235 | 0 | } _ZNK5doris17WindowFunnelState17_match_event_listILNS_16WindowFunnelModeE1EEEiRmm Line | Count | Source | 157 | 36 | int _match_event_list(size_t& start_row, size_t row_count) const { | 158 | 36 | int matched_count = 0; | 159 | 36 | DateValueType end_timestamp; | 160 | | | 161 | 36 | if (window < 0) { | 162 | 0 | throw Exception(ErrorCode::INVALID_ARGUMENT, | 163 | 0 | "the sliding time window must be a positive integer, but got: {}", | 164 | 0 | window); | 165 | 0 | } | 166 | 36 | TimeInterval interval(SECOND, window, false); | 167 | 36 | int column_idx = 0; | 168 | 36 | const auto& timestamp_data = events_list.dt; | 169 | 36 | const auto& first_event_data = events_list.event_columns_data[column_idx].data(); | 170 | 36 | auto match_row = simd::find_one(first_event_data, start_row, row_count); | 171 | 36 | start_row = match_row + 1; | 172 | 36 | if (match_row < row_count) { | 173 | 22 | auto prev_timestamp = timestamp_data[match_row]; | 174 | 22 | end_timestamp = prev_timestamp; | 175 | 22 | end_timestamp.template date_add_interval<SECOND>(interval); | 176 | | | 177 | 22 | matched_count++; | 178 | 22 | column_idx++; | 179 | 22 | auto last_match_row = match_row; | 180 | 22 | ++match_row; | 181 | 62 | for (; column_idx < event_count && match_row < row_count; column_idx++, match_row++) { | 182 | 54 | const auto& event_data = events_list.event_columns_data[column_idx]; | 183 | | if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::FIXED) { | 184 | | if (event_data[match_row] == 1) { | 185 | | auto current_timestamp = timestamp_data[match_row]; | 186 | | if (current_timestamp <= end_timestamp) { | 187 | | matched_count++; | 188 | | continue; | 189 | | } | 190 | | } | 191 | | break; | 192 | | } | 193 | 54 | match_row = simd::find_one(event_data.data(), match_row, row_count); | 194 | 54 | if (match_row < row_count) { | 195 | 54 | auto current_timestamp = timestamp_data[match_row]; | 196 | 54 | bool is_matched = current_timestamp <= end_timestamp; | 197 | 54 | if (is_matched) { | 198 | | if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::INCREASE) { | 199 | | is_matched = current_timestamp > prev_timestamp; | 200 | | } | 201 | 40 | } | 202 | 54 | if (!is_matched) { | 203 | 14 | break; | 204 | 14 | } | 205 | | if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::INCREASE) { | 206 | | prev_timestamp = timestamp_data[match_row]; | 207 | | } | 208 | | if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::DEDUPLICATION) { | 209 | | bool is_dup = false; | 210 | | if (match_row != last_match_row + 1) { | 211 | | for (int tmp_column_idx = 0; tmp_column_idx < column_idx; | 212 | | tmp_column_idx++) { | 213 | | const auto& tmp_event_data = | 214 | | events_list.event_columns_data[tmp_column_idx].data(); | 215 | | auto dup_match_row = simd::find_one(tmp_event_data, | 216 | | last_match_row + 1, match_row); | 217 | | if (dup_match_row < match_row) { | 218 | | is_dup = true; | 219 | | break; | 220 | | } | 221 | | } | 222 | | } | 223 | | if (is_dup) { | 224 | | break; | 225 | | } | 226 | | last_match_row = match_row; | 227 | | } | 228 | 40 | matched_count++; | 229 | 40 | } else { | 230 | 0 | break; | 231 | 0 | } | 232 | 54 | } | 233 | 22 | } | 234 | 36 | return matched_count; | 235 | 36 | } |
Unexecuted instantiation: _ZNK5doris17WindowFunnelState17_match_event_listILNS_16WindowFunnelModeE2EEEiRmm Unexecuted instantiation: _ZNK5doris17WindowFunnelState17_match_event_listILNS_16WindowFunnelModeE3EEEiRmm Unexecuted instantiation: _ZNK5doris17WindowFunnelState17_match_event_listILNS_16WindowFunnelModeE4EEEiRmm |
236 | | |
237 | | template <WindowFunnelMode WINDOW_FUNNEL_MODE> |
238 | 22 | int _get_internal() const { |
239 | 22 | size_t start_row = 0; |
240 | 22 | int max_found_event_count = 0; |
241 | 22 | auto row_count = events_list.size(); |
242 | 50 | while (start_row < row_count) { |
243 | 36 | auto found_event_count = _match_event_list<WINDOW_FUNNEL_MODE>(start_row, row_count); |
244 | 36 | if (found_event_count == event_count) { |
245 | 8 | return found_event_count; |
246 | 8 | } |
247 | 28 | max_found_event_count = std::max(max_found_event_count, found_event_count); |
248 | 28 | } |
249 | 14 | return max_found_event_count; |
250 | 22 | } _ZNK5doris17WindowFunnelState13_get_internalILNS_16WindowFunnelModeE1EEEiv Line | Count | Source | 238 | 22 | int _get_internal() const { | 239 | 22 | size_t start_row = 0; | 240 | 22 | int max_found_event_count = 0; | 241 | 22 | auto row_count = events_list.size(); | 242 | 50 | while (start_row < row_count) { | 243 | 36 | auto found_event_count = _match_event_list<WINDOW_FUNNEL_MODE>(start_row, row_count); | 244 | 36 | if (found_event_count == event_count) { | 245 | 8 | return found_event_count; | 246 | 8 | } | 247 | 28 | max_found_event_count = std::max(max_found_event_count, found_event_count); | 248 | 28 | } | 249 | 14 | return max_found_event_count; | 250 | 22 | } |
Unexecuted instantiation: _ZNK5doris17WindowFunnelState13_get_internalILNS_16WindowFunnelModeE2EEEiv Unexecuted instantiation: _ZNK5doris17WindowFunnelState13_get_internalILNS_16WindowFunnelModeE3EEEiv Unexecuted instantiation: _ZNK5doris17WindowFunnelState13_get_internalILNS_16WindowFunnelModeE4EEEiv |
251 | 24 | int get() const { |
252 | 24 | auto row_count = events_list.size(); |
253 | 24 | if (event_count == 0 || row_count == 0) { |
254 | 2 | return 0; |
255 | 2 | } |
256 | 22 | switch (window_funnel_mode) { |
257 | 22 | case WindowFunnelMode::DEFAULT: |
258 | 22 | return _get_internal<WindowFunnelMode::DEFAULT>(); |
259 | 0 | case WindowFunnelMode::DEDUPLICATION: |
260 | 0 | return _get_internal<WindowFunnelMode::DEDUPLICATION>(); |
261 | 0 | case WindowFunnelMode::FIXED: |
262 | 0 | return _get_internal<WindowFunnelMode::FIXED>(); |
263 | 0 | case WindowFunnelMode::INCREASE: |
264 | 0 | return _get_internal<WindowFunnelMode::INCREASE>(); |
265 | 0 | default: |
266 | 0 | throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Invalid window_funnel mode"); |
267 | 0 | return 0; |
268 | 22 | } |
269 | 22 | } |
270 | | |
271 | 11 | void merge(const WindowFunnelState& other) { |
272 | 11 | if (other.events_list.empty()) { |
273 | 1 | return; |
274 | 1 | } |
275 | 10 | events_list.dt.insert(std::end(events_list.dt), std::begin(other.events_list.dt), |
276 | 10 | std::end(other.events_list.dt)); |
277 | 50 | for (size_t i = 0; i < event_count; i++) { |
278 | 40 | events_list.event_columns_data[i].insert( |
279 | 40 | std::end(events_list.event_columns_data[i]), |
280 | 40 | std::begin(other.events_list.event_columns_data[i]), |
281 | 40 | std::end(other.events_list.event_columns_data[i])); |
282 | 40 | } |
283 | 10 | event_count = event_count > 0 ? event_count : other.event_count; |
284 | 10 | window = window > 0 ? window : other.window; |
285 | 10 | if (enable_mode) { |
286 | 0 | window_funnel_mode = window_funnel_mode == WindowFunnelMode::INVALID |
287 | 0 | ? other.window_funnel_mode |
288 | 0 | : window_funnel_mode; |
289 | 10 | } else { |
290 | 10 | window_funnel_mode = WindowFunnelMode::DEFAULT; |
291 | 10 | } |
292 | 10 | } |
293 | | |
294 | 2 | void write(BufferWritable& out) const { |
295 | 2 | write_var_int(event_count, out); |
296 | 2 | write_var_int(window, out); |
297 | 2 | if (enable_mode) { |
298 | 0 | write_var_int(static_cast<std::underlying_type_t<WindowFunnelMode>>(window_funnel_mode), |
299 | 0 | out); |
300 | 0 | } |
301 | 2 | auto size = events_list.size(); |
302 | 2 | write_var_int(size, out); |
303 | 4 | for (const auto& timestamp : events_list.dt) { |
304 | 4 | write_var_int(timestamp.to_date_int_val(), out); |
305 | 4 | } |
306 | 10 | for (int64_t i = 0; i < event_count; i++) { |
307 | 8 | const auto& event_columns_data = events_list.event_columns_data[i]; |
308 | 16 | for (auto event : event_columns_data) { |
309 | 16 | write_var_int(event, out); |
310 | 16 | } |
311 | 8 | } |
312 | 2 | } |
313 | | |
314 | 2 | void read(BufferReadable& in) { |
315 | 2 | int64_t event_level; |
316 | 2 | read_var_int(event_level, in); |
317 | 2 | event_count = (int)event_level; |
318 | 2 | read_var_int(window, in); |
319 | 2 | window_funnel_mode = WindowFunnelMode::DEFAULT; |
320 | 2 | if (enable_mode) { |
321 | 0 | int64_t mode; |
322 | 0 | read_var_int(mode, in); |
323 | 0 | window_funnel_mode = static_cast<WindowFunnelMode>(mode); |
324 | 0 | } |
325 | 2 | int64_t size = 0; |
326 | 2 | read_var_int(size, in); |
327 | 2 | events_list.clear(); |
328 | 2 | events_list.dt.resize(size); |
329 | 6 | for (auto i = 0; i < size; i++) { |
330 | 4 | read_var_int(*reinterpret_cast<Int64*>(&events_list.dt[i]), in); |
331 | 4 | } |
332 | 2 | events_list.event_columns_data.resize(event_count); |
333 | 10 | for (int64_t i = 0; i < event_count; i++) { |
334 | 8 | auto& event_columns_data = events_list.event_columns_data[i]; |
335 | 8 | event_columns_data.resize(size); |
336 | 24 | for (auto j = 0; j < size; j++) { |
337 | 16 | Int64 temp_value; |
338 | 16 | read_var_int(temp_value, in); |
339 | 16 | event_columns_data[j] = static_cast<UInt8>(temp_value); |
340 | 16 | } |
341 | 8 | } |
342 | 2 | } |
343 | | }; |
344 | | |
345 | | class AggregateFunctionWindowFunnel |
346 | | : public IAggregateFunctionDataHelper<WindowFunnelState, AggregateFunctionWindowFunnel>, |
347 | | MultiExpression, |
348 | | NullableAggregateFunction { |
349 | | public: |
350 | | AggregateFunctionWindowFunnel(const DataTypes& argument_types_) |
351 | 6 | : IAggregateFunctionDataHelper<WindowFunnelState, AggregateFunctionWindowFunnel>( |
352 | 6 | argument_types_) {} |
353 | | |
354 | 34 | void create(AggregateDataPtr __restrict place) const override { |
355 | 34 | auto data = new (place) WindowFunnelState( |
356 | 34 | cast_set<int>(IAggregateFunction::get_argument_types().size() - 3)); |
357 | | /// support window funnel mode from 2.0. See `BeExecVersionManager::max_be_exec_version` |
358 | 34 | data->enable_mode = IAggregateFunction::version >= 3; |
359 | 34 | } |
360 | | |
361 | 0 | String get_name() const override { return "window_funnel"; } |
362 | | |
363 | 0 | DataTypePtr get_return_type() const override { return std::make_shared<DataTypeInt32>(); } |
364 | | |
365 | 0 | void reset(AggregateDataPtr __restrict place) const override { this->data(place).reset(); } |
366 | | |
367 | | void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num, |
368 | 84 | Arena&) const override { |
369 | 84 | const auto& window = assert_cast<const ColumnInt64&>(*columns[0]).get_data()[row_num]; |
370 | 84 | StringRef mode = columns[1]->get_data_at(row_num); |
371 | 84 | this->data(place).add(columns, row_num, window, |
372 | 84 | string_to_window_funnel_mode(mode.to_string())); |
373 | 84 | } |
374 | | |
375 | | void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, |
376 | 11 | Arena&) const override { |
377 | 11 | this->data(place).merge(this->data(rhs)); |
378 | 11 | } |
379 | | |
380 | 2 | void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override { |
381 | 2 | this->data(place).write(buf); |
382 | 2 | } |
383 | | |
384 | | void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, |
385 | 2 | Arena&) const override { |
386 | 2 | this->data(place).read(buf); |
387 | 2 | } |
388 | | |
389 | 24 | void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { |
390 | | // place is essentially an AggregateDataPtr, passed as a ConstAggregateDataPtr. |
391 | 24 | this->data(const_cast<AggregateDataPtr>(place)).sort(); |
392 | 24 | assert_cast<ColumnInt32&>(to).get_data().push_back( |
393 | 24 | IAggregateFunctionDataHelper<WindowFunnelState, |
394 | 24 | AggregateFunctionWindowFunnel>::data(place) |
395 | 24 | .get()); |
396 | 24 | } |
397 | | |
398 | | protected: |
399 | | using IAggregateFunction::version; |
400 | | }; |
401 | | } // namespace doris |
402 | | |
403 | | #include "common/compile_check_end.h" |