Coverage Report

Created: 2026-03-15 08:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exprs/vectorized_agg_fn.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
#include <gen_cpp/Types_types.h>
20
21
#include <cstddef>
22
#include <string>
23
#include <vector>
24
25
#include "common/be_mock_util.h"
26
#include "common/status.h"
27
#include "core/data_type/data_type.h"
28
#include "exec/sort/sort_description.h"
29
#include "exprs/aggregate/aggregate_function.h"
30
#include "exprs/vexpr_fwd.h"
31
#include "runtime/runtime_profile.h"
32
33
namespace doris {
34
#include "common/compile_check_begin.h"
35
36
class RuntimeState;
37
class SlotDescriptor;
38
class ObjectPool;
39
class RowDescriptor;
40
class TExpr;
41
class TExprNode;
42
class TSortInfo;
43
44
class Arena;
45
class Block;
46
class BufferWritable;
47
class IColumn;
48
49
class AggFnEvaluator {
50
public:
51
    ENABLE_FACTORY_CREATOR(AggFnEvaluator);
52
134
    MOCK_DEFINE(virtual) ~AggFnEvaluator() = default;
53
54
public:
55
    static Status create(ObjectPool* pool, const TExpr& desc, const TSortInfo& sort_info,
56
                         const bool without_key, const bool is_window_function,
57
                         AggFnEvaluator** result);
58
59
    Status prepare(RuntimeState* state, const RowDescriptor& desc,
60
                   const SlotDescriptor* intermediate_slot_desc,
61
                   const SlotDescriptor* output_slot_desc);
62
63
43
    void set_timer(RuntimeProfile::Counter* merge_timer, RuntimeProfile::Counter* expr_timer) {
64
43
        _merge_timer = merge_timer;
65
43
        _expr_timer = expr_timer;
66
43
    }
67
68
    Status open(RuntimeState* state);
69
70
    // create/destroy AGG Data
71
    void create(AggregateDataPtr place);
72
    void destroy(AggregateDataPtr place);
73
74
    // agg_function
75
    Status execute_single_add(Block* block, AggregateDataPtr place, Arena& arena);
76
77
    Status execute_batch_add(Block* block, size_t offset, AggregateDataPtr* places, Arena& arena,
78
                             bool agg_many = false);
79
80
    Status execute_batch_add_selected(Block* block, size_t offset, AggregateDataPtr* places,
81
                                      Arena& arena);
82
83
    Status streaming_agg_serialize_to_column(Block* block, MutableColumnPtr& dst,
84
                                             const size_t num_rows, Arena& arena);
85
86
    void insert_result_info(AggregateDataPtr place, IColumn* column);
87
88
    void insert_result_info_vec(const std::vector<AggregateDataPtr>& place, size_t offset,
89
                                IColumn* column, const size_t num_rows);
90
91
    void reset(AggregateDataPtr place);
92
93
82
    DataTypePtr& data_type() { return _data_type; }
94
95
1.04M
    const AggregateFunctionPtr& function() { return _function; }
96
    static std::string debug_string(const std::vector<AggFnEvaluator*>& exprs);
97
    std::string debug_string() const;
98
51
    bool is_merge() const { return _is_merge; }
99
7
    const VExprContextSPtrs& input_exprs_ctxs() const { return _input_exprs_ctxs; }
100
101
    static Status check_agg_fn_output(uint32_t key_size, const std::vector<AggFnEvaluator*>& agg_fn,
102
                                      const RowDescriptor& output_row_desc);
103
104
11
    void set_version(const int version) { _function->set_version(version); }
105
106
    AggFnEvaluator* clone(RuntimeState* state, ObjectPool* pool);
107
108
    bool is_blockable() const;
109
110
private:
111
    const TFunction _fn;
112
113
    const bool _is_merge;
114
    // We need this flag to distinguish between the two types of aggregation functions:
115
    // 1. executed without group by key (agg function used with window function is also regarded as this type)
116
    // 2. executed with group by key
117
    const bool _without_key;
118
119
    const bool _is_window_function;
120
121
    AggFnEvaluator(const TExprNode& desc, const bool without_key, const bool is_window_function);
122
    AggFnEvaluator(AggFnEvaluator& evaluator, RuntimeState* state);
123
124
#ifdef BE_TEST
125
    AggFnEvaluator(bool is_merge, bool without_key, const bool is_window_function)
126
71
            : _is_merge(is_merge),
127
71
              _without_key(without_key),
128
71
              _is_window_function(is_window_function) {};
129
#endif
130
    Status _calc_argument_columns(Block* block);
131
132
    DataTypes _argument_types_with_sort;
133
    DataTypes _real_argument_types;
134
135
    const SlotDescriptor* _intermediate_slot_desc = nullptr;
136
    const SlotDescriptor* _output_slot_desc = nullptr;
137
138
    RuntimeProfile::Counter* _merge_timer = nullptr;
139
    RuntimeProfile::Counter* _expr_timer = nullptr;
140
141
    // input context
142
    VExprContextSPtrs _input_exprs_ctxs;
143
144
    SortDescription _sort_description;
145
146
    DataTypePtr _data_type;
147
148
    AggregateFunctionPtr _function;
149
150
    std::string _expr_name;
151
152
    std::vector<const IColumn*> _agg_columns;
153
};
154
155
#include "common/compile_check_end.h"
156
} // namespace doris