Coverage Report

Created: 2026-04-14 17:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exprs/aggregate/aggregate_function_window_funnel.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
// This file is copied from
19
// https://github.com/ClickHouse/ClickHouse/blob/master/AggregateFunctionWindowFunnel.h
20
// and modified by Doris
21
22
#pragma once
23
24
#include <gen_cpp/data.pb.h>
25
26
#include <algorithm>
27
#include <boost/iterator/iterator_facade.hpp>
28
#include <iterator>
29
#include <memory>
30
#include <type_traits>
31
#include <utility>
32
33
#include "common/cast_set.h"
34
#include "common/exception.h"
35
#include "common/status.h"
36
#include "core/assert_cast.h"
37
#include "core/binary_cast.hpp"
38
#include "core/column/column_string.h"
39
#include "core/data_type/data_type_number.h"
40
#include "core/types.h"
41
#include "core/value/vdatetime_value.h"
42
#include "exec/sort/sort_block.h"
43
#include "exprs/aggregate/aggregate_function.h"
44
#include "util/simd/bits.h"
45
#include "util/var_int.h"
46
47
namespace doris {
48
class Arena;
49
class BufferReadable;
50
class BufferWritable;
51
class IColumn;
52
} // namespace doris
53
54
namespace doris {
55
56
enum class WindowFunnelMode : Int64 { INVALID, DEFAULT, DEDUPLICATION, FIXED, INCREASE };
57
58
252
inline WindowFunnelMode string_to_window_funnel_mode(const String& string) {
59
252
    if (string == "default") {
60
95
        return WindowFunnelMode::DEFAULT;
61
157
    } else if (string == "deduplication") {
62
36
        return WindowFunnelMode::DEDUPLICATION;
63
121
    } else if (string == "fixed") {
64
19
        return WindowFunnelMode::FIXED;
65
102
    } else if (string == "increase") {
66
18
        return WindowFunnelMode::INCREASE;
67
84
    } else {
68
84
        return WindowFunnelMode::INVALID;
69
84
    }
70
252
}
71
72
struct DataValue {
73
    using TimestampEvent = std::vector<ColumnUInt8::Container>;
74
    std::vector<DateV2Value<DateTimeV2ValueType>> dt;
75
    TimestampEvent event_columns_data;
76
0
    bool operator<(const DataValue& other) const { return dt < other.dt; }
77
2
    void clear() {
78
2
        dt.clear();
79
8
        for (auto& data : event_columns_data) {
80
8
            data.clear();
81
8
        }
82
2
    }
83
72
    auto size() const { return dt.size(); }
84
11
    bool empty() const { return dt.empty(); }
85
0
    std::string debug_string() const {
86
0
        std::string result = "\n" + std::to_string(dt.size()) + " " +
87
0
                             std::to_string(event_columns_data[0].size()) + "\n";
88
0
        for (size_t i = 0; i < dt.size(); ++i) {
89
0
            result += dt[i].debug_string() + " ,";
90
0
            for (const auto& event : event_columns_data) {
91
0
                result += std::to_string(event[i]) + ",";
92
0
            }
93
0
            result += "\n";
94
0
        }
95
0
        return result;
96
0
    }
97
};
98
99
struct WindowFunnelState {
100
    static constexpr PrimitiveType PType = PrimitiveType::TYPE_DATETIMEV2;
101
    using NativeType = UInt64;
102
    using DateValueType = DateV2Value<DateTimeV2ValueType>;
103
    int event_count = 0;
104
    int64_t window;
105
    bool enable_mode;
106
    WindowFunnelMode window_funnel_mode;
107
    DataValue events_list;
108
109
34
    WindowFunnelState() {
110
34
        event_count = 0;
111
34
        window = 0;
112
34
        window_funnel_mode = WindowFunnelMode::INVALID;
113
34
    }
114
34
    WindowFunnelState(int arg_event_count) : WindowFunnelState() {
115
34
        event_count = arg_event_count;
116
34
        events_list.event_columns_data.resize(event_count);
117
34
    }
118
119
0
    void reset() { events_list.clear(); }
120
121
84
    void add(const IColumn** arg_columns, ssize_t row_num, int64_t win, WindowFunnelMode mode) {
122
84
        window = win;
123
84
        window_funnel_mode = enable_mode ? mode : WindowFunnelMode::DEFAULT;
124
84
        events_list.dt.emplace_back(
125
84
                assert_cast<const ColumnVector<PType>&>(*arg_columns[2]).get_data()[row_num]);
126
420
        for (int i = 0; i < event_count; i++) {
127
336
            events_list.event_columns_data[i].emplace_back(
128
336
                    assert_cast<const ColumnUInt8&>(*arg_columns[3 + i]).get_data()[row_num]);
129
336
        }
130
84
    }
131
132
    // todo: rethink thid sort method.
133
24
    void sort() {
134
24
        auto num = events_list.size();
135
24
        std::vector<size_t> indices(num);
136
24
        std::iota(indices.begin(), indices.end(), 0);
137
24
        std::sort(indices.begin(), indices.end(),
138
102
                  [this](size_t i1, size_t i2) { return events_list.dt[i1] < events_list.dt[i2]; });
139
140
120
        auto reorder = [&indices, &num](auto& vec) {
141
120
            std::decay_t<decltype(vec)> temp;
142
120
            temp.resize(num);
143
560
            for (auto i = 0; i < num; i++) {
144
440
                temp[i] = vec[indices[i]];
145
440
            }
146
120
            std::swap(vec, temp);
147
120
        };
_ZZN5doris17WindowFunnelState4sortEvENKUlRT_E_clISt6vectorINS_11DateV2ValueINS_19DateTimeV2ValueTypeEEESaIS8_EEEEDaS2_
Line
Count
Source
140
24
        auto reorder = [&indices, &num](auto& vec) {
141
24
            std::decay_t<decltype(vec)> temp;
142
24
            temp.resize(num);
143
112
            for (auto i = 0; i < num; i++) {
144
88
                temp[i] = vec[indices[i]];
145
88
            }
146
24
            std::swap(vec, temp);
147
24
        };
_ZZN5doris17WindowFunnelState4sortEvENKUlRT_E_clINS_8PODArrayIhLm4096ENS_9AllocatorILb0ELb0ELb0ENS_22DefaultMemoryAllocatorELb1EEELm16ELm15EEEEEDaS2_
Line
Count
Source
140
96
        auto reorder = [&indices, &num](auto& vec) {
141
96
            std::decay_t<decltype(vec)> temp;
142
96
            temp.resize(num);
143
448
            for (auto i = 0; i < num; i++) {
144
352
                temp[i] = vec[indices[i]];
145
352
            }
146
96
            std::swap(vec, temp);
147
96
        };
148
149
24
        reorder(events_list.dt);
150
96
        for (auto& inner_vec : events_list.event_columns_data) {
151
96
            reorder(inner_vec);
152
96
        }
153
24
    }
154
155
    template <WindowFunnelMode WINDOW_FUNNEL_MODE>
156
36
    int _match_event_list(size_t& start_row, size_t row_count) const {
157
36
        int matched_count = 0;
158
36
        DateValueType end_timestamp;
159
160
36
        if (window < 0) {
161
0
            throw Exception(ErrorCode::INVALID_ARGUMENT,
162
0
                            "the sliding time window must be a positive integer, but got: {}",
163
0
                            window);
164
0
        }
165
36
        TimeInterval interval(SECOND, window, false);
166
36
        int column_idx = 0;
167
36
        const auto& timestamp_data = events_list.dt;
168
36
        const auto& first_event_data = events_list.event_columns_data[column_idx].data();
169
36
        auto match_row = simd::find_one(first_event_data, start_row, row_count);
170
36
        start_row = match_row + 1;
171
36
        if (match_row < row_count) {
172
22
            auto prev_timestamp = timestamp_data[match_row];
173
22
            end_timestamp = prev_timestamp;
174
22
            end_timestamp.template date_add_interval<SECOND>(interval);
175
176
22
            matched_count++;
177
22
            column_idx++;
178
22
            auto last_match_row = match_row;
179
22
            ++match_row;
180
62
            for (; column_idx < event_count && match_row < row_count; column_idx++, match_row++) {
181
54
                const auto& event_data = events_list.event_columns_data[column_idx];
182
54
                if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::FIXED) {
183
0
                    if (event_data[match_row] == 1) {
184
0
                        auto current_timestamp = timestamp_data[match_row];
185
0
                        if (current_timestamp <= end_timestamp) {
186
0
                            matched_count++;
187
0
                            continue;
188
0
                        }
189
0
                    }
190
0
                    break;
191
0
                }
192
0
                match_row = simd::find_one(event_data.data(), match_row, row_count);
193
54
                if (match_row < row_count) {
194
54
                    auto current_timestamp = timestamp_data[match_row];
195
54
                    bool is_matched = current_timestamp <= end_timestamp;
196
54
                    if (is_matched) {
197
40
                        if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::INCREASE) {
198
0
                            is_matched = current_timestamp > prev_timestamp;
199
0
                        }
200
40
                    }
201
54
                    if (!is_matched) {
202
14
                        break;
203
14
                    }
204
40
                    if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::INCREASE) {
205
0
                        prev_timestamp = timestamp_data[match_row];
206
0
                    }
207
40
                    if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::DEDUPLICATION) {
208
0
                        bool is_dup = false;
209
0
                        if (match_row != last_match_row + 1) {
210
0
                            for (int tmp_column_idx = 0; tmp_column_idx < column_idx;
211
0
                                 tmp_column_idx++) {
212
0
                                const auto& tmp_event_data =
213
0
                                        events_list.event_columns_data[tmp_column_idx].data();
214
0
                                auto dup_match_row = simd::find_one(tmp_event_data,
215
0
                                                                    last_match_row + 1, match_row);
216
0
                                if (dup_match_row < match_row) {
217
0
                                    is_dup = true;
218
0
                                    break;
219
0
                                }
220
0
                            }
221
0
                        }
222
0
                        if (is_dup) {
223
0
                            break;
224
0
                        }
225
0
                        last_match_row = match_row;
226
0
                    }
227
0
                    matched_count++;
228
40
                } else {
229
0
                    break;
230
0
                }
231
54
            }
232
22
        }
233
0
        return matched_count;
234
0
    }
_ZNK5doris17WindowFunnelState17_match_event_listILNS_16WindowFunnelModeE1EEEiRmm
Line
Count
Source
156
36
    int _match_event_list(size_t& start_row, size_t row_count) const {
157
36
        int matched_count = 0;
158
36
        DateValueType end_timestamp;
159
160
36
        if (window < 0) {
161
0
            throw Exception(ErrorCode::INVALID_ARGUMENT,
162
0
                            "the sliding time window must be a positive integer, but got: {}",
163
0
                            window);
164
0
        }
165
36
        TimeInterval interval(SECOND, window, false);
166
36
        int column_idx = 0;
167
36
        const auto& timestamp_data = events_list.dt;
168
36
        const auto& first_event_data = events_list.event_columns_data[column_idx].data();
169
36
        auto match_row = simd::find_one(first_event_data, start_row, row_count);
170
36
        start_row = match_row + 1;
171
36
        if (match_row < row_count) {
172
22
            auto prev_timestamp = timestamp_data[match_row];
173
22
            end_timestamp = prev_timestamp;
174
22
            end_timestamp.template date_add_interval<SECOND>(interval);
175
176
22
            matched_count++;
177
22
            column_idx++;
178
22
            auto last_match_row = match_row;
179
22
            ++match_row;
180
62
            for (; column_idx < event_count && match_row < row_count; column_idx++, match_row++) {
181
54
                const auto& event_data = events_list.event_columns_data[column_idx];
182
                if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::FIXED) {
183
                    if (event_data[match_row] == 1) {
184
                        auto current_timestamp = timestamp_data[match_row];
185
                        if (current_timestamp <= end_timestamp) {
186
                            matched_count++;
187
                            continue;
188
                        }
189
                    }
190
                    break;
191
                }
192
54
                match_row = simd::find_one(event_data.data(), match_row, row_count);
193
54
                if (match_row < row_count) {
194
54
                    auto current_timestamp = timestamp_data[match_row];
195
54
                    bool is_matched = current_timestamp <= end_timestamp;
196
54
                    if (is_matched) {
197
                        if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::INCREASE) {
198
                            is_matched = current_timestamp > prev_timestamp;
199
                        }
200
40
                    }
201
54
                    if (!is_matched) {
202
14
                        break;
203
14
                    }
204
                    if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::INCREASE) {
205
                        prev_timestamp = timestamp_data[match_row];
206
                    }
207
                    if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::DEDUPLICATION) {
208
                        bool is_dup = false;
209
                        if (match_row != last_match_row + 1) {
210
                            for (int tmp_column_idx = 0; tmp_column_idx < column_idx;
211
                                 tmp_column_idx++) {
212
                                const auto& tmp_event_data =
213
                                        events_list.event_columns_data[tmp_column_idx].data();
214
                                auto dup_match_row = simd::find_one(tmp_event_data,
215
                                                                    last_match_row + 1, match_row);
216
                                if (dup_match_row < match_row) {
217
                                    is_dup = true;
218
                                    break;
219
                                }
220
                            }
221
                        }
222
                        if (is_dup) {
223
                            break;
224
                        }
225
                        last_match_row = match_row;
226
                    }
227
40
                    matched_count++;
228
40
                } else {
229
0
                    break;
230
0
                }
231
54
            }
232
22
        }
233
36
        return matched_count;
234
36
    }
Unexecuted instantiation: _ZNK5doris17WindowFunnelState17_match_event_listILNS_16WindowFunnelModeE2EEEiRmm
Unexecuted instantiation: _ZNK5doris17WindowFunnelState17_match_event_listILNS_16WindowFunnelModeE3EEEiRmm
Unexecuted instantiation: _ZNK5doris17WindowFunnelState17_match_event_listILNS_16WindowFunnelModeE4EEEiRmm
235
236
    template <WindowFunnelMode WINDOW_FUNNEL_MODE>
237
22
    int _get_internal() const {
238
22
        size_t start_row = 0;
239
22
        int max_found_event_count = 0;
240
22
        auto row_count = events_list.size();
241
50
        while (start_row < row_count) {
242
36
            auto found_event_count = _match_event_list<WINDOW_FUNNEL_MODE>(start_row, row_count);
243
36
            if (found_event_count == event_count) {
244
8
                return found_event_count;
245
8
            }
246
28
            max_found_event_count = std::max(max_found_event_count, found_event_count);
247
28
        }
248
14
        return max_found_event_count;
249
22
    }
_ZNK5doris17WindowFunnelState13_get_internalILNS_16WindowFunnelModeE1EEEiv
Line
Count
Source
237
22
    int _get_internal() const {
238
22
        size_t start_row = 0;
239
22
        int max_found_event_count = 0;
240
22
        auto row_count = events_list.size();
241
50
        while (start_row < row_count) {
242
36
            auto found_event_count = _match_event_list<WINDOW_FUNNEL_MODE>(start_row, row_count);
243
36
            if (found_event_count == event_count) {
244
8
                return found_event_count;
245
8
            }
246
28
            max_found_event_count = std::max(max_found_event_count, found_event_count);
247
28
        }
248
14
        return max_found_event_count;
249
22
    }
Unexecuted instantiation: _ZNK5doris17WindowFunnelState13_get_internalILNS_16WindowFunnelModeE2EEEiv
Unexecuted instantiation: _ZNK5doris17WindowFunnelState13_get_internalILNS_16WindowFunnelModeE3EEEiv
Unexecuted instantiation: _ZNK5doris17WindowFunnelState13_get_internalILNS_16WindowFunnelModeE4EEEiv
250
24
    int get() const {
251
24
        auto row_count = events_list.size();
252
24
        if (event_count == 0 || row_count == 0) {
253
2
            return 0;
254
2
        }
255
22
        switch (window_funnel_mode) {
256
22
        case WindowFunnelMode::DEFAULT:
257
22
            return _get_internal<WindowFunnelMode::DEFAULT>();
258
0
        case WindowFunnelMode::DEDUPLICATION:
259
0
            return _get_internal<WindowFunnelMode::DEDUPLICATION>();
260
0
        case WindowFunnelMode::FIXED:
261
0
            return _get_internal<WindowFunnelMode::FIXED>();
262
0
        case WindowFunnelMode::INCREASE:
263
0
            return _get_internal<WindowFunnelMode::INCREASE>();
264
0
        default:
265
0
            throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Invalid window_funnel mode");
266
0
            return 0;
267
22
        }
268
22
    }
269
270
11
    void merge(const WindowFunnelState& other) {
271
11
        if (other.events_list.empty()) {
272
1
            return;
273
1
        }
274
10
        events_list.dt.insert(std::end(events_list.dt), std::begin(other.events_list.dt),
275
10
                              std::end(other.events_list.dt));
276
50
        for (size_t i = 0; i < event_count; i++) {
277
40
            events_list.event_columns_data[i].insert(
278
40
                    std::end(events_list.event_columns_data[i]),
279
40
                    std::begin(other.events_list.event_columns_data[i]),
280
40
                    std::end(other.events_list.event_columns_data[i]));
281
40
        }
282
10
        event_count = event_count > 0 ? event_count : other.event_count;
283
10
        window = window > 0 ? window : other.window;
284
10
        if (enable_mode) {
285
0
            window_funnel_mode = window_funnel_mode == WindowFunnelMode::INVALID
286
0
                                         ? other.window_funnel_mode
287
0
                                         : window_funnel_mode;
288
10
        } else {
289
10
            window_funnel_mode = WindowFunnelMode::DEFAULT;
290
10
        }
291
10
    }
292
293
2
    void write(BufferWritable& out) const {
294
2
        write_var_int(event_count, out);
295
2
        write_var_int(window, out);
296
2
        if (enable_mode) {
297
0
            write_var_int(static_cast<std::underlying_type_t<WindowFunnelMode>>(window_funnel_mode),
298
0
                          out);
299
0
        }
300
2
        auto size = events_list.size();
301
2
        write_var_int(size, out);
302
4
        for (const auto& timestamp : events_list.dt) {
303
4
            write_var_int(timestamp.to_date_int_val(), out);
304
4
        }
305
10
        for (int64_t i = 0; i < event_count; i++) {
306
8
            const auto& event_columns_data = events_list.event_columns_data[i];
307
16
            for (auto event : event_columns_data) {
308
16
                write_var_int(event, out);
309
16
            }
310
8
        }
311
2
    }
312
313
2
    void read(BufferReadable& in) {
314
2
        int64_t event_level;
315
2
        read_var_int(event_level, in);
316
2
        event_count = (int)event_level;
317
2
        read_var_int(window, in);
318
2
        window_funnel_mode = WindowFunnelMode::DEFAULT;
319
2
        if (enable_mode) {
320
0
            int64_t mode;
321
0
            read_var_int(mode, in);
322
0
            window_funnel_mode = static_cast<WindowFunnelMode>(mode);
323
0
        }
324
2
        int64_t size = 0;
325
2
        read_var_int(size, in);
326
2
        events_list.clear();
327
2
        events_list.dt.resize(size);
328
6
        for (auto i = 0; i < size; i++) {
329
4
            read_var_int(*reinterpret_cast<Int64*>(&events_list.dt[i]), in);
330
4
        }
331
2
        events_list.event_columns_data.resize(event_count);
332
10
        for (int64_t i = 0; i < event_count; i++) {
333
8
            auto& event_columns_data = events_list.event_columns_data[i];
334
8
            event_columns_data.resize(size);
335
24
            for (auto j = 0; j < size; j++) {
336
16
                Int64 temp_value;
337
16
                read_var_int(temp_value, in);
338
16
                event_columns_data[j] = static_cast<UInt8>(temp_value);
339
16
            }
340
8
        }
341
2
    }
342
};
343
344
class AggregateFunctionWindowFunnel
345
        : public IAggregateFunctionDataHelper<WindowFunnelState, AggregateFunctionWindowFunnel>,
346
          MultiExpression,
347
          NullableAggregateFunction {
348
public:
349
    AggregateFunctionWindowFunnel(const DataTypes& argument_types_)
350
6
            : IAggregateFunctionDataHelper<WindowFunnelState, AggregateFunctionWindowFunnel>(
351
6
                      argument_types_) {}
352
353
34
    void create(AggregateDataPtr __restrict place) const override {
354
34
        auto data = new (place) WindowFunnelState(
355
34
                cast_set<int>(IAggregateFunction::get_argument_types().size() - 3));
356
        /// support window funnel mode from 2.0. See `BeExecVersionManager::max_be_exec_version`
357
34
        data->enable_mode = IAggregateFunction::version >= 3;
358
34
    }
359
360
0
    String get_name() const override { return "window_funnel"; }
361
362
0
    DataTypePtr get_return_type() const override { return std::make_shared<DataTypeInt32>(); }
363
364
0
    void reset(AggregateDataPtr __restrict place) const override { this->data(place).reset(); }
365
366
    void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num,
367
84
             Arena&) const override {
368
84
        const auto& window = assert_cast<const ColumnInt64&>(*columns[0]).get_data()[row_num];
369
84
        StringRef mode = columns[1]->get_data_at(row_num);
370
84
        this->data(place).add(columns, row_num, window,
371
84
                              string_to_window_funnel_mode(mode.to_string()));
372
84
    }
373
374
    void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
375
11
               Arena&) const override {
376
11
        this->data(place).merge(this->data(rhs));
377
11
    }
378
379
2
    void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override {
380
2
        this->data(place).write(buf);
381
2
    }
382
383
    void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
384
2
                     Arena&) const override {
385
2
        this->data(place).read(buf);
386
2
    }
387
388
24
    void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override {
389
        // place is essentially an AggregateDataPtr, passed as a ConstAggregateDataPtr.
390
24
        this->data(const_cast<AggregateDataPtr>(place)).sort();
391
24
        assert_cast<ColumnInt32&>(to).get_data().push_back(
392
24
                IAggregateFunctionDataHelper<WindowFunnelState,
393
24
                                             AggregateFunctionWindowFunnel>::data(place)
394
24
                        .get());
395
24
    }
396
397
protected:
398
    using IAggregateFunction::version;
399
};
400
} // namespace doris