Coverage Report

Created: 2026-03-17 00:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exprs/aggregate/aggregate_function_window_funnel.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
// This file is copied from
19
// https://github.com/ClickHouse/ClickHouse/blob/master/AggregateFunctionWindowFunnel.h
20
// and modified by Doris
21
22
#pragma once
23
24
#include <gen_cpp/data.pb.h>
25
26
#include <algorithm>
27
#include <boost/iterator/iterator_facade.hpp>
28
#include <iterator>
29
#include <memory>
30
#include <type_traits>
31
#include <utility>
32
33
#include "common/cast_set.h"
34
#include "common/exception.h"
35
#include "common/status.h"
36
#include "core/assert_cast.h"
37
#include "core/binary_cast.hpp"
38
#include "core/column/column_string.h"
39
#include "core/data_type/data_type_number.h"
40
#include "core/types.h"
41
#include "core/value/vdatetime_value.h"
42
#include "exec/sort/sort_block.h"
43
#include "exprs/aggregate/aggregate_function.h"
44
#include "util/simd/bits.h"
45
#include "util/var_int.h"
46
47
namespace doris {
48
#include "common/compile_check_begin.h"
49
class Arena;
50
class BufferReadable;
51
class BufferWritable;
52
class IColumn;
53
} // namespace doris
54
55
namespace doris {
56
57
enum class WindowFunnelMode : Int64 { INVALID, DEFAULT, DEDUPLICATION, FIXED, INCREASE };
58
59
84
WindowFunnelMode string_to_window_funnel_mode(const String& string) {
60
84
    if (string == "default") {
61
0
        return WindowFunnelMode::DEFAULT;
62
84
    } else if (string == "deduplication") {
63
0
        return WindowFunnelMode::DEDUPLICATION;
64
84
    } else if (string == "fixed") {
65
0
        return WindowFunnelMode::FIXED;
66
84
    } else if (string == "increase") {
67
0
        return WindowFunnelMode::INCREASE;
68
84
    } else {
69
84
        return WindowFunnelMode::INVALID;
70
84
    }
71
84
}
72
73
struct DataValue {
74
    using TimestampEvent = std::vector<ColumnUInt8::Container>;
75
    std::vector<DateV2Value<DateTimeV2ValueType>> dt;
76
    TimestampEvent event_columns_data;
77
0
    bool operator<(const DataValue& other) const { return dt < other.dt; }
78
2
    void clear() {
79
2
        dt.clear();
80
8
        for (auto& data : event_columns_data) {
81
8
            data.clear();
82
8
        }
83
2
    }
84
72
    auto size() const { return dt.size(); }
85
11
    bool empty() const { return dt.empty(); }
86
0
    std::string debug_string() const {
87
0
        std::string result = "\n" + std::to_string(dt.size()) + " " +
88
0
                             std::to_string(event_columns_data[0].size()) + "\n";
89
0
        for (size_t i = 0; i < dt.size(); ++i) {
90
0
            result += dt[i].debug_string() + " ,";
91
0
            for (const auto& event : event_columns_data) {
92
0
                result += std::to_string(event[i]) + ",";
93
0
            }
94
0
            result += "\n";
95
0
        }
96
0
        return result;
97
0
    }
98
};
99
100
struct WindowFunnelState {
101
    static constexpr PrimitiveType PType = PrimitiveType::TYPE_DATETIMEV2;
102
    using NativeType = UInt64;
103
    using DateValueType = DateV2Value<DateTimeV2ValueType>;
104
    int event_count = 0;
105
    int64_t window;
106
    bool enable_mode;
107
    WindowFunnelMode window_funnel_mode;
108
    DataValue events_list;
109
110
34
    WindowFunnelState() {
111
34
        event_count = 0;
112
34
        window = 0;
113
34
        window_funnel_mode = WindowFunnelMode::INVALID;
114
34
    }
115
34
    WindowFunnelState(int arg_event_count) : WindowFunnelState() {
116
34
        event_count = arg_event_count;
117
34
        events_list.event_columns_data.resize(event_count);
118
34
    }
119
120
0
    void reset() { events_list.clear(); }
121
122
84
    void add(const IColumn** arg_columns, ssize_t row_num, int64_t win, WindowFunnelMode mode) {
123
84
        window = win;
124
84
        window_funnel_mode = enable_mode ? mode : WindowFunnelMode::DEFAULT;
125
84
        events_list.dt.emplace_back(
126
84
                assert_cast<const ColumnVector<PType>&>(*arg_columns[2]).get_data()[row_num]);
127
420
        for (int i = 0; i < event_count; i++) {
128
336
            events_list.event_columns_data[i].emplace_back(
129
336
                    assert_cast<const ColumnUInt8&>(*arg_columns[3 + i]).get_data()[row_num]);
130
336
        }
131
84
    }
132
133
    // todo: rethink thid sort method.
134
24
    void sort() {
135
24
        auto num = events_list.size();
136
24
        std::vector<size_t> indices(num);
137
24
        std::iota(indices.begin(), indices.end(), 0);
138
24
        std::sort(indices.begin(), indices.end(),
139
102
                  [this](size_t i1, size_t i2) { return events_list.dt[i1] < events_list.dt[i2]; });
140
141
120
        auto reorder = [&indices, &num](auto& vec) {
142
120
            std::decay_t<decltype(vec)> temp;
143
120
            temp.resize(num);
144
560
            for (auto i = 0; i < num; i++) {
145
440
                temp[i] = vec[indices[i]];
146
440
            }
147
120
            std::swap(vec, temp);
148
120
        };
_ZZN5doris17WindowFunnelState4sortEvENKUlRT_E_clISt6vectorINS_11DateV2ValueINS_19DateTimeV2ValueTypeEEESaIS8_EEEEDaS2_
Line
Count
Source
141
24
        auto reorder = [&indices, &num](auto& vec) {
142
24
            std::decay_t<decltype(vec)> temp;
143
24
            temp.resize(num);
144
112
            for (auto i = 0; i < num; i++) {
145
88
                temp[i] = vec[indices[i]];
146
88
            }
147
24
            std::swap(vec, temp);
148
24
        };
_ZZN5doris17WindowFunnelState4sortEvENKUlRT_E_clINS_8PODArrayIhLm4096ENS_9AllocatorILb0ELb0ELb0ENS_22DefaultMemoryAllocatorELb1EEELm16ELm15EEEEEDaS2_
Line
Count
Source
141
96
        auto reorder = [&indices, &num](auto& vec) {
142
96
            std::decay_t<decltype(vec)> temp;
143
96
            temp.resize(num);
144
448
            for (auto i = 0; i < num; i++) {
145
352
                temp[i] = vec[indices[i]];
146
352
            }
147
96
            std::swap(vec, temp);
148
96
        };
149
150
24
        reorder(events_list.dt);
151
96
        for (auto& inner_vec : events_list.event_columns_data) {
152
96
            reorder(inner_vec);
153
96
        }
154
24
    }
155
156
    template <WindowFunnelMode WINDOW_FUNNEL_MODE>
157
36
    int _match_event_list(size_t& start_row, size_t row_count) const {
158
36
        int matched_count = 0;
159
36
        DateValueType end_timestamp;
160
161
36
        if (window < 0) {
162
0
            throw Exception(ErrorCode::INVALID_ARGUMENT,
163
0
                            "the sliding time window must be a positive integer, but got: {}",
164
0
                            window);
165
0
        }
166
36
        TimeInterval interval(SECOND, window, false);
167
36
        int column_idx = 0;
168
36
        const auto& timestamp_data = events_list.dt;
169
36
        const auto& first_event_data = events_list.event_columns_data[column_idx].data();
170
36
        auto match_row = simd::find_one(first_event_data, start_row, row_count);
171
36
        start_row = match_row + 1;
172
36
        if (match_row < row_count) {
173
22
            auto prev_timestamp = timestamp_data[match_row];
174
22
            end_timestamp = prev_timestamp;
175
22
            end_timestamp.template date_add_interval<SECOND>(interval);
176
177
22
            matched_count++;
178
22
            column_idx++;
179
22
            auto last_match_row = match_row;
180
22
            ++match_row;
181
62
            for (; column_idx < event_count && match_row < row_count; column_idx++, match_row++) {
182
54
                const auto& event_data = events_list.event_columns_data[column_idx];
183
54
                if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::FIXED) {
184
0
                    if (event_data[match_row] == 1) {
185
0
                        auto current_timestamp = timestamp_data[match_row];
186
0
                        if (current_timestamp <= end_timestamp) {
187
0
                            matched_count++;
188
0
                            continue;
189
0
                        }
190
0
                    }
191
0
                    break;
192
0
                }
193
0
                match_row = simd::find_one(event_data.data(), match_row, row_count);
194
54
                if (match_row < row_count) {
195
54
                    auto current_timestamp = timestamp_data[match_row];
196
54
                    bool is_matched = current_timestamp <= end_timestamp;
197
54
                    if (is_matched) {
198
40
                        if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::INCREASE) {
199
0
                            is_matched = current_timestamp > prev_timestamp;
200
0
                        }
201
40
                    }
202
54
                    if (!is_matched) {
203
14
                        break;
204
14
                    }
205
40
                    if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::INCREASE) {
206
0
                        prev_timestamp = timestamp_data[match_row];
207
0
                    }
208
40
                    if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::DEDUPLICATION) {
209
0
                        bool is_dup = false;
210
0
                        if (match_row != last_match_row + 1) {
211
0
                            for (int tmp_column_idx = 0; tmp_column_idx < column_idx;
212
0
                                 tmp_column_idx++) {
213
0
                                const auto& tmp_event_data =
214
0
                                        events_list.event_columns_data[tmp_column_idx].data();
215
0
                                auto dup_match_row = simd::find_one(tmp_event_data,
216
0
                                                                    last_match_row + 1, match_row);
217
0
                                if (dup_match_row < match_row) {
218
0
                                    is_dup = true;
219
0
                                    break;
220
0
                                }
221
0
                            }
222
0
                        }
223
0
                        if (is_dup) {
224
0
                            break;
225
0
                        }
226
0
                        last_match_row = match_row;
227
0
                    }
228
0
                    matched_count++;
229
40
                } else {
230
0
                    break;
231
0
                }
232
54
            }
233
22
        }
234
0
        return matched_count;
235
0
    }
_ZNK5doris17WindowFunnelState17_match_event_listILNS_16WindowFunnelModeE1EEEiRmm
Line
Count
Source
157
36
    int _match_event_list(size_t& start_row, size_t row_count) const {
158
36
        int matched_count = 0;
159
36
        DateValueType end_timestamp;
160
161
36
        if (window < 0) {
162
0
            throw Exception(ErrorCode::INVALID_ARGUMENT,
163
0
                            "the sliding time window must be a positive integer, but got: {}",
164
0
                            window);
165
0
        }
166
36
        TimeInterval interval(SECOND, window, false);
167
36
        int column_idx = 0;
168
36
        const auto& timestamp_data = events_list.dt;
169
36
        const auto& first_event_data = events_list.event_columns_data[column_idx].data();
170
36
        auto match_row = simd::find_one(first_event_data, start_row, row_count);
171
36
        start_row = match_row + 1;
172
36
        if (match_row < row_count) {
173
22
            auto prev_timestamp = timestamp_data[match_row];
174
22
            end_timestamp = prev_timestamp;
175
22
            end_timestamp.template date_add_interval<SECOND>(interval);
176
177
22
            matched_count++;
178
22
            column_idx++;
179
22
            auto last_match_row = match_row;
180
22
            ++match_row;
181
62
            for (; column_idx < event_count && match_row < row_count; column_idx++, match_row++) {
182
54
                const auto& event_data = events_list.event_columns_data[column_idx];
183
                if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::FIXED) {
184
                    if (event_data[match_row] == 1) {
185
                        auto current_timestamp = timestamp_data[match_row];
186
                        if (current_timestamp <= end_timestamp) {
187
                            matched_count++;
188
                            continue;
189
                        }
190
                    }
191
                    break;
192
                }
193
54
                match_row = simd::find_one(event_data.data(), match_row, row_count);
194
54
                if (match_row < row_count) {
195
54
                    auto current_timestamp = timestamp_data[match_row];
196
54
                    bool is_matched = current_timestamp <= end_timestamp;
197
54
                    if (is_matched) {
198
                        if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::INCREASE) {
199
                            is_matched = current_timestamp > prev_timestamp;
200
                        }
201
40
                    }
202
54
                    if (!is_matched) {
203
14
                        break;
204
14
                    }
205
                    if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::INCREASE) {
206
                        prev_timestamp = timestamp_data[match_row];
207
                    }
208
                    if constexpr (WINDOW_FUNNEL_MODE == WindowFunnelMode::DEDUPLICATION) {
209
                        bool is_dup = false;
210
                        if (match_row != last_match_row + 1) {
211
                            for (int tmp_column_idx = 0; tmp_column_idx < column_idx;
212
                                 tmp_column_idx++) {
213
                                const auto& tmp_event_data =
214
                                        events_list.event_columns_data[tmp_column_idx].data();
215
                                auto dup_match_row = simd::find_one(tmp_event_data,
216
                                                                    last_match_row + 1, match_row);
217
                                if (dup_match_row < match_row) {
218
                                    is_dup = true;
219
                                    break;
220
                                }
221
                            }
222
                        }
223
                        if (is_dup) {
224
                            break;
225
                        }
226
                        last_match_row = match_row;
227
                    }
228
40
                    matched_count++;
229
40
                } else {
230
0
                    break;
231
0
                }
232
54
            }
233
22
        }
234
36
        return matched_count;
235
36
    }
Unexecuted instantiation: _ZNK5doris17WindowFunnelState17_match_event_listILNS_16WindowFunnelModeE2EEEiRmm
Unexecuted instantiation: _ZNK5doris17WindowFunnelState17_match_event_listILNS_16WindowFunnelModeE3EEEiRmm
Unexecuted instantiation: _ZNK5doris17WindowFunnelState17_match_event_listILNS_16WindowFunnelModeE4EEEiRmm
236
237
    template <WindowFunnelMode WINDOW_FUNNEL_MODE>
238
22
    int _get_internal() const {
239
22
        size_t start_row = 0;
240
22
        int max_found_event_count = 0;
241
22
        auto row_count = events_list.size();
242
50
        while (start_row < row_count) {
243
36
            auto found_event_count = _match_event_list<WINDOW_FUNNEL_MODE>(start_row, row_count);
244
36
            if (found_event_count == event_count) {
245
8
                return found_event_count;
246
8
            }
247
28
            max_found_event_count = std::max(max_found_event_count, found_event_count);
248
28
        }
249
14
        return max_found_event_count;
250
22
    }
_ZNK5doris17WindowFunnelState13_get_internalILNS_16WindowFunnelModeE1EEEiv
Line
Count
Source
238
22
    int _get_internal() const {
239
22
        size_t start_row = 0;
240
22
        int max_found_event_count = 0;
241
22
        auto row_count = events_list.size();
242
50
        while (start_row < row_count) {
243
36
            auto found_event_count = _match_event_list<WINDOW_FUNNEL_MODE>(start_row, row_count);
244
36
            if (found_event_count == event_count) {
245
8
                return found_event_count;
246
8
            }
247
28
            max_found_event_count = std::max(max_found_event_count, found_event_count);
248
28
        }
249
14
        return max_found_event_count;
250
22
    }
Unexecuted instantiation: _ZNK5doris17WindowFunnelState13_get_internalILNS_16WindowFunnelModeE2EEEiv
Unexecuted instantiation: _ZNK5doris17WindowFunnelState13_get_internalILNS_16WindowFunnelModeE3EEEiv
Unexecuted instantiation: _ZNK5doris17WindowFunnelState13_get_internalILNS_16WindowFunnelModeE4EEEiv
251
24
    int get() const {
252
24
        auto row_count = events_list.size();
253
24
        if (event_count == 0 || row_count == 0) {
254
2
            return 0;
255
2
        }
256
22
        switch (window_funnel_mode) {
257
22
        case WindowFunnelMode::DEFAULT:
258
22
            return _get_internal<WindowFunnelMode::DEFAULT>();
259
0
        case WindowFunnelMode::DEDUPLICATION:
260
0
            return _get_internal<WindowFunnelMode::DEDUPLICATION>();
261
0
        case WindowFunnelMode::FIXED:
262
0
            return _get_internal<WindowFunnelMode::FIXED>();
263
0
        case WindowFunnelMode::INCREASE:
264
0
            return _get_internal<WindowFunnelMode::INCREASE>();
265
0
        default:
266
0
            throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Invalid window_funnel mode");
267
0
            return 0;
268
22
        }
269
22
    }
270
271
11
    void merge(const WindowFunnelState& other) {
272
11
        if (other.events_list.empty()) {
273
1
            return;
274
1
        }
275
10
        events_list.dt.insert(std::end(events_list.dt), std::begin(other.events_list.dt),
276
10
                              std::end(other.events_list.dt));
277
50
        for (size_t i = 0; i < event_count; i++) {
278
40
            events_list.event_columns_data[i].insert(
279
40
                    std::end(events_list.event_columns_data[i]),
280
40
                    std::begin(other.events_list.event_columns_data[i]),
281
40
                    std::end(other.events_list.event_columns_data[i]));
282
40
        }
283
10
        event_count = event_count > 0 ? event_count : other.event_count;
284
10
        window = window > 0 ? window : other.window;
285
10
        if (enable_mode) {
286
0
            window_funnel_mode = window_funnel_mode == WindowFunnelMode::INVALID
287
0
                                         ? other.window_funnel_mode
288
0
                                         : window_funnel_mode;
289
10
        } else {
290
10
            window_funnel_mode = WindowFunnelMode::DEFAULT;
291
10
        }
292
10
    }
293
294
2
    void write(BufferWritable& out) const {
295
2
        write_var_int(event_count, out);
296
2
        write_var_int(window, out);
297
2
        if (enable_mode) {
298
0
            write_var_int(static_cast<std::underlying_type_t<WindowFunnelMode>>(window_funnel_mode),
299
0
                          out);
300
0
        }
301
2
        auto size = events_list.size();
302
2
        write_var_int(size, out);
303
4
        for (const auto& timestamp : events_list.dt) {
304
4
            write_var_int(timestamp.to_date_int_val(), out);
305
4
        }
306
10
        for (int64_t i = 0; i < event_count; i++) {
307
8
            const auto& event_columns_data = events_list.event_columns_data[i];
308
16
            for (auto event : event_columns_data) {
309
16
                write_var_int(event, out);
310
16
            }
311
8
        }
312
2
    }
313
314
2
    void read(BufferReadable& in) {
315
2
        int64_t event_level;
316
2
        read_var_int(event_level, in);
317
2
        event_count = (int)event_level;
318
2
        read_var_int(window, in);
319
2
        window_funnel_mode = WindowFunnelMode::DEFAULT;
320
2
        if (enable_mode) {
321
0
            int64_t mode;
322
0
            read_var_int(mode, in);
323
0
            window_funnel_mode = static_cast<WindowFunnelMode>(mode);
324
0
        }
325
2
        int64_t size = 0;
326
2
        read_var_int(size, in);
327
2
        events_list.clear();
328
2
        events_list.dt.resize(size);
329
6
        for (auto i = 0; i < size; i++) {
330
4
            read_var_int(*reinterpret_cast<Int64*>(&events_list.dt[i]), in);
331
4
        }
332
2
        events_list.event_columns_data.resize(event_count);
333
10
        for (int64_t i = 0; i < event_count; i++) {
334
8
            auto& event_columns_data = events_list.event_columns_data[i];
335
8
            event_columns_data.resize(size);
336
24
            for (auto j = 0; j < size; j++) {
337
16
                Int64 temp_value;
338
16
                read_var_int(temp_value, in);
339
16
                event_columns_data[j] = static_cast<UInt8>(temp_value);
340
16
            }
341
8
        }
342
2
    }
343
};
344
345
class AggregateFunctionWindowFunnel
346
        : public IAggregateFunctionDataHelper<WindowFunnelState, AggregateFunctionWindowFunnel>,
347
          MultiExpression,
348
          NullableAggregateFunction {
349
public:
350
    AggregateFunctionWindowFunnel(const DataTypes& argument_types_)
351
6
            : IAggregateFunctionDataHelper<WindowFunnelState, AggregateFunctionWindowFunnel>(
352
6
                      argument_types_) {}
353
354
34
    void create(AggregateDataPtr __restrict place) const override {
355
34
        auto data = new (place) WindowFunnelState(
356
34
                cast_set<int>(IAggregateFunction::get_argument_types().size() - 3));
357
        /// support window funnel mode from 2.0. See `BeExecVersionManager::max_be_exec_version`
358
34
        data->enable_mode = IAggregateFunction::version >= 3;
359
34
    }
360
361
0
    String get_name() const override { return "window_funnel"; }
362
363
0
    DataTypePtr get_return_type() const override { return std::make_shared<DataTypeInt32>(); }
364
365
0
    void reset(AggregateDataPtr __restrict place) const override { this->data(place).reset(); }
366
367
    void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num,
368
84
             Arena&) const override {
369
84
        const auto& window = assert_cast<const ColumnInt64&>(*columns[0]).get_data()[row_num];
370
84
        StringRef mode = columns[1]->get_data_at(row_num);
371
84
        this->data(place).add(columns, row_num, window,
372
84
                              string_to_window_funnel_mode(mode.to_string()));
373
84
    }
374
375
    void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
376
11
               Arena&) const override {
377
11
        this->data(place).merge(this->data(rhs));
378
11
    }
379
380
2
    void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override {
381
2
        this->data(place).write(buf);
382
2
    }
383
384
    void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
385
2
                     Arena&) const override {
386
2
        this->data(place).read(buf);
387
2
    }
388
389
24
    void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override {
390
        // place is essentially an AggregateDataPtr, passed as a ConstAggregateDataPtr.
391
24
        this->data(const_cast<AggregateDataPtr>(place)).sort();
392
24
        assert_cast<ColumnInt32&>(to).get_data().push_back(
393
24
                IAggregateFunctionDataHelper<WindowFunnelState,
394
24
                                             AggregateFunctionWindowFunnel>::data(place)
395
24
                        .get());
396
24
    }
397
398
protected:
399
    using IAggregateFunction::version;
400
};
401
} // namespace doris
402
403
#include "common/compile_check_end.h"