Coverage Report

Created: 2026-04-14 12:18

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exprs/vectorized_agg_fn.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
#include <gen_cpp/Types_types.h>
20
21
#include <cstddef>
22
#include <string>
23
#include <vector>
24
25
#include "common/be_mock_util.h"
26
#include "common/status.h"
27
#include "core/data_type/data_type.h"
28
#include "exec/sort/sort_description.h"
29
#include "exprs/aggregate/aggregate_function.h"
30
#include "exprs/vexpr_fwd.h"
31
#include "runtime/runtime_profile.h"
32
33
namespace doris {
34
35
class RuntimeState;
36
class SlotDescriptor;
37
class ObjectPool;
38
class RowDescriptor;
39
class TExpr;
40
class TExprNode;
41
class TSortInfo;
42
43
class Arena;
44
class Block;
45
class BufferWritable;
46
class IColumn;
47
48
class AggFnEvaluator {
49
public:
50
    ENABLE_FACTORY_CREATOR(AggFnEvaluator);
51
509k
    MOCK_DEFINE(virtual) ~AggFnEvaluator() = default;
52
53
public:
54
    static Status create(ObjectPool* pool, const TExpr& desc, const TSortInfo& sort_info,
55
                         const bool without_key, const bool is_window_function,
56
                         AggFnEvaluator** result);
57
58
    Status prepare(RuntimeState* state, const RowDescriptor& desc,
59
                   const SlotDescriptor* intermediate_slot_desc,
60
                   const SlotDescriptor* output_slot_desc);
61
62
309k
    void set_timer(RuntimeProfile::Counter* merge_timer, RuntimeProfile::Counter* expr_timer) {
63
309k
        _merge_timer = merge_timer;
64
309k
        _expr_timer = expr_timer;
65
309k
    }
66
67
    Status open(RuntimeState* state);
68
69
    // create/destroy AGG Data
70
    void create(AggregateDataPtr place);
71
    void destroy(AggregateDataPtr place);
72
73
    // agg_function
74
    Status execute_single_add(Block* block, AggregateDataPtr place, Arena& arena);
75
76
    Status execute_batch_add(Block* block, size_t offset, AggregateDataPtr* places, Arena& arena,
77
                             bool agg_many = false);
78
79
    Status execute_batch_add_selected(Block* block, size_t offset, AggregateDataPtr* places,
80
                                      Arena& arena);
81
82
    Status streaming_agg_serialize_to_column(Block* block, MutableColumnPtr& dst,
83
                                             const size_t num_rows, Arena& arena);
84
85
    void insert_result_info(AggregateDataPtr place, IColumn* column);
86
87
    void insert_result_info_vec(const std::vector<AggregateDataPtr>& place, size_t offset,
88
                                IColumn* column, const size_t num_rows);
89
90
    void reset(AggregateDataPtr place);
91
92
15.8k
    DataTypePtr& data_type() { return _data_type; }
93
94
4.34M
    const AggregateFunctionPtr& function() { return _function; }
95
    static std::string debug_string(const std::vector<AggFnEvaluator*>& exprs);
96
    std::string debug_string() const;
97
297k
    bool is_merge() const { return _is_merge; }
98
117k
    const VExprContextSPtrs& input_exprs_ctxs() const { return _input_exprs_ctxs; }
99
100
    static Status check_agg_fn_output(uint32_t key_size, const std::vector<AggFnEvaluator*>& agg_fn,
101
                                      const RowDescriptor& output_row_desc);
102
103
187k
    void set_version(const int version) { _function->set_version(version); }
104
105
    AggFnEvaluator* clone(RuntimeState* state, ObjectPool* pool);
106
107
    bool is_blockable() const;
108
109
private:
110
    const TFunction _fn;
111
112
    const bool _is_merge;
113
    // We need this flag to distinguish between the two types of aggregation functions:
114
    // 1. executed without group by key (agg function used with window function is also regarded as this type)
115
    // 2. executed with group by key
116
    const bool _without_key;
117
118
    const bool _is_window_function;
119
120
    AggFnEvaluator(const TExprNode& desc, const bool without_key, const bool is_window_function);
121
    AggFnEvaluator(AggFnEvaluator& evaluator, RuntimeState* state);
122
123
#ifdef BE_TEST
124
    AggFnEvaluator(bool is_merge, bool without_key, const bool is_window_function)
125
            : _is_merge(is_merge),
126
              _without_key(without_key),
127
              _is_window_function(is_window_function) {};
128
#endif
129
    Status _calc_argument_columns(Block* block);
130
131
    DataTypes _argument_types_with_sort;
132
    DataTypes _real_argument_types;
133
134
    const SlotDescriptor* _intermediate_slot_desc = nullptr;
135
    const SlotDescriptor* _output_slot_desc = nullptr;
136
137
    RuntimeProfile::Counter* _merge_timer = nullptr;
138
    RuntimeProfile::Counter* _expr_timer = nullptr;
139
140
    // input context
141
    VExprContextSPtrs _input_exprs_ctxs;
142
143
    SortDescription _sort_description;
144
145
    DataTypePtr _data_type;
146
147
    AggregateFunctionPtr _function;
148
149
    std::string _expr_name;
150
151
    std::vector<const IColumn*> _agg_columns;
152
};
153
154
} // namespace doris