Coverage Report

Created: 2026-03-31 18:41

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/common/ungroupby_agg_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/common/ungroupby_agg_context.h"
19
20
#include "common/exception.h"
21
#include "exec/common/agg_context_utils.h"
22
#include "exec/common/util.hpp"
23
#include "exprs/vectorized_agg_fn.h"
24
#include "exprs/vslot_ref.h"
25
#include "runtime/descriptors.h"
26
#include "runtime/runtime_state.h"
27
28
namespace doris {
29
30
// ==================== Constructor / Destructor ====================
31
32
UngroupByAggContext::UngroupByAggContext(std::vector<AggFnEvaluator*> agg_evaluators,
33
                                         Sizes agg_state_offsets, size_t total_agg_state_size,
34
                                         size_t agg_state_alignment)
35
8
        : AggContext(std::move(agg_evaluators), std::move(agg_state_offsets), total_agg_state_size,
36
8
                     agg_state_alignment) {}
37
38
8
UngroupByAggContext::~UngroupByAggContext() = default;
39
40
// ==================== Profile ====================
41
42
8
void UngroupByAggContext::init_profile(RuntimeProfile* profile) {
43
8
    _build_timer = ADD_TIMER(profile, "BuildTime");
44
8
    _merge_timer = ADD_TIMER(profile, "MergeTime");
45
8
    _deserialize_data_timer = ADD_TIMER(profile, "DeserializeAndMergeTime");
46
8
    _get_results_timer = ADD_TIMER(profile, "GetResultsTime");
47
48
8
    auto* memory_usage = profile->create_child("MemoryUsage", true, true);
49
8
    _memory_used_counter = profile->get_counter("MemoryUsage");
50
8
    _memory_usage_arena = ADD_COUNTER(memory_usage, "Arena", TUnit::BYTES);
51
8
}
52
53
// ==================== Agg state management ====================
54
55
7
Status UngroupByAggContext::_create_agg_state() {
56
7
    DCHECK(!_agg_state_created);
57
7
    _agg_state_data = reinterpret_cast<AggregateDataPtr>(
58
7
            _alloc_arena.aligned_alloc(_total_agg_state_size, _agg_state_alignment));
59
60
15
    for (int i = 0; i < _agg_evaluators.size(); ++i) {
61
8
        try {
62
8
            _agg_evaluators[i]->create(_agg_state_data + _agg_state_offsets[i]);
63
8
        } catch (...) {
64
0
            for (int j = 0; j < i; ++j) {
65
0
                _agg_evaluators[j]->destroy(_agg_state_data + _agg_state_offsets[j]);
66
0
            }
67
0
            throw;
68
0
        }
69
8
    }
70
71
7
    _agg_state_created = true;
72
7
    return Status::OK();
73
7
}
74
75
8
void UngroupByAggContext::_destroy_agg_state() {
76
8
    if (!_agg_state_created) {
77
1
        return;
78
1
    }
79
15
    for (int i = 0; i < _agg_evaluators.size(); ++i) {
80
8
        _agg_evaluators[i]->function()->destroy(_agg_state_data + _agg_state_offsets[i]);
81
8
    }
82
7
    _agg_state_created = false;
83
7
}
84
85
8
void UngroupByAggContext::close() {
86
8
    _destroy_agg_state();
87
8
}
88
89
// ==================== Aggregation execution (Sink side) ====================
90
91
4
Status UngroupByAggContext::update(Block* block) {
92
    // Create agg state on first call (lazy init to match original behavior, which creates
93
    // state in open() - here we ensure it's created before first use).
94
4
    if (!_agg_state_created) {
95
4
        RETURN_IF_ERROR(_create_agg_state());
96
4
    }
97
4
    input_num_rows += block->rows();
98
99
4
    DCHECK(_agg_state_data != nullptr);
100
4
    SCOPED_TIMER(_build_timer);
101
4
    memory_usage_last_executing = 0;
102
4
    SCOPED_PEAK_MEM(&memory_usage_last_executing);
103
104
9
    for (int i = 0; i < _agg_evaluators.size(); ++i) {
105
5
        RETURN_IF_ERROR(_agg_evaluators[i]->execute_single_add(
106
5
                block, _agg_state_data + _agg_state_offsets[i], _agg_arena));
107
5
    }
108
4
    return Status::OK();
109
4
}
110
111
3
Status UngroupByAggContext::merge(Block* block) {
112
3
    if (!_agg_state_created) {
113
3
        RETURN_IF_ERROR(_create_agg_state());
114
3
    }
115
3
    input_num_rows += block->rows();
116
117
3
    SCOPED_TIMER(_merge_timer);
118
3
    DCHECK(_agg_state_data != nullptr);
119
3
    memory_usage_last_executing = 0;
120
3
    SCOPED_PEAK_MEM(&memory_usage_last_executing);
121
122
6
    for (int i = 0; i < _agg_evaluators.size(); ++i) {
123
3
        if (_agg_evaluators[i]->is_merge()) {
124
1
            int col_id = _get_slot_column_id(_agg_evaluators[i]);
125
1
            auto column = block->get_by_position(col_id).column;
126
127
1
            SCOPED_TIMER(_deserialize_data_timer);
128
1
            _agg_evaluators[i]->function()->deserialize_and_merge_from_column(
129
1
                    _agg_state_data + _agg_state_offsets[i], *column, _agg_arena);
130
2
        } else {
131
2
            RETURN_IF_ERROR(_agg_evaluators[i]->execute_single_add(
132
2
                    block, _agg_state_data + _agg_state_offsets[i], _agg_arena));
133
2
        }
134
3
    }
135
3
    return Status::OK();
136
3
}
137
138
// ==================== Result output (Source side) ====================
139
140
2
Status UngroupByAggContext::serialize(RuntimeState* state, Block* block, bool* eos) {
141
2
    SCOPED_TIMER(_get_results_timer);
142
143
    // Ensure agg state exists even if no data flowed through the sink.
144
2
    if (!_agg_state_created) {
145
0
        RETURN_IF_ERROR(_create_agg_state());
146
0
    }
147
148
    // If no data was ever fed, return empty result.
149
2
    if (UNLIKELY(input_num_rows == 0)) {
150
0
        block->clear();
151
0
        *eos = true;
152
0
        return Status::OK();
153
0
    }
154
2
    block->clear();
155
156
2
    DCHECK(_agg_state_data != nullptr);
157
2
    size_t agg_size = _agg_evaluators.size();
158
159
2
    MutableColumns value_columns(agg_size);
160
2
    std::vector<DataTypePtr> data_types(agg_size);
161
162
4
    for (int i = 0; i < agg_size; ++i) {
163
2
        data_types[i] = _agg_evaluators[i]->function()->get_serialized_type();
164
2
        value_columns[i] = _agg_evaluators[i]->function()->create_serialize_column();
165
2
    }
166
167
4
    for (int i = 0; i < agg_size; ++i) {
168
2
        _agg_evaluators[i]->function()->serialize_without_key_to_column(
169
2
                _agg_state_data + _agg_state_offsets[i], *value_columns[i]);
170
2
    }
171
172
2
    {
173
2
        ColumnsWithTypeAndName data_with_schema;
174
4
        for (int i = 0; i < agg_size; ++i) {
175
2
            ColumnWithTypeAndName column_with_schema = {nullptr, data_types[i], ""};
176
2
            data_with_schema.push_back(std::move(column_with_schema));
177
2
        }
178
2
        *block = Block(data_with_schema);
179
2
    }
180
181
2
    block->set_columns(std::move(value_columns));
182
2
    *eos = true;
183
2
    return Status::OK();
184
2
}
185
186
5
void UngroupByAggContext::set_finalize_output(const RowDescriptor& row_desc) {
187
5
    _finalize_schema = VectorizedUtils::create_columns_with_type_and_name(row_desc);
188
5
}
189
190
5
Status UngroupByAggContext::finalize(RuntimeState* state, Block* block, bool* eos) {
191
    // Ensure agg state exists even if no data flowed through the sink.
192
    // Without GROUP BY, aggregation always produces one row (e.g., COUNT(*) → 0).
193
5
    if (!_agg_state_created) {
194
0
        RETURN_IF_ERROR(_create_agg_state());
195
0
    }
196
5
    DCHECK(_agg_state_data != nullptr);
197
5
    DCHECK(!_finalize_schema.empty());
198
5
    block->clear();
199
200
5
    *block = Block(_finalize_schema);
201
5
    size_t agg_size = _agg_evaluators.size();
202
203
5
    MutableColumns columns(agg_size);
204
5
    std::vector<DataTypePtr> data_types(agg_size);
205
11
    for (int i = 0; i < agg_size; ++i) {
206
6
        data_types[i] = _agg_evaluators[i]->function()->get_return_type();
207
6
        columns[i] = data_types[i]->create_column();
208
6
    }
209
210
11
    for (int i = 0; i < agg_size; ++i) {
211
6
        auto column = columns[i].get();
212
6
        _agg_evaluators[i]->insert_result_info(_agg_state_data + _agg_state_offsets[i], column);
213
6
    }
214
215
5
    const auto& block_schema = block->get_columns_with_type_and_name();
216
5
    DCHECK_EQ(block_schema.size(), columns.size());
217
11
    for (int i = 0; i < block_schema.size(); ++i) {
218
6
        const auto column_type = block_schema[i].type;
219
6
        if (!column_type->equals(*data_types[i])) {
220
1
            if (column_type->get_primitive_type() != TYPE_ARRAY) {
221
1
                if (!column_type->is_nullable() || data_types[i]->is_nullable() ||
222
1
                    !remove_nullable(column_type)->equals(*data_types[i])) {
223
0
                    return Status::InternalError(
224
0
                            "column_type not match data_types, column_type={}, data_types={}",
225
0
                            column_type->get_name(), data_types[i]->get_name());
226
0
                }
227
1
            }
228
229
            // Result of operator is nullable, but aggregate function result is not nullable.
230
            // This happens when: 1) no group by, 2) input empty, 3) all input columns not nullable.
231
1
            if (column_type->is_nullable() && !data_types[i]->is_nullable()) {
232
1
                ColumnPtr ptr = std::move(columns[i]);
233
                // Unless count, other aggregate functions on empty set should produce null.
234
1
                ptr = make_nullable(ptr, input_num_rows == 0);
235
1
                columns[i] = ptr->assume_mutable();
236
1
            }
237
1
        }
238
6
    }
239
240
5
    block->set_columns(std::move(columns));
241
5
    *eos = true;
242
5
    return Status::OK();
243
5
}
244
245
// ==================== Utilities ====================
246
247
7
void UngroupByAggContext::update_memusage() {
248
7
    int64_t arena_memory_usage = _agg_arena.size();
249
7
    if (_memory_used_counter) {
250
0
        COUNTER_SET(_memory_used_counter, arena_memory_usage);
251
0
    }
252
7
    if (_memory_usage_arena) {
253
7
        COUNTER_SET(_memory_usage_arena, arena_memory_usage);
254
7
    }
255
7
}
256
257
0
size_t UngroupByAggContext::memory_usage() const {
258
0
    return _agg_arena.size();
259
0
}
260
261
1
int UngroupByAggContext::_get_slot_column_id(const AggFnEvaluator* evaluator) {
262
1
    return agg_context_utils::get_slot_column_id(evaluator);
263
1
}
264
265
} // namespace doris