Coverage Report

Created: 2026-03-27 10:44

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/common/ungroupby_agg_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/common/ungroupby_agg_context.h"
19
20
#include "common/exception.h"
21
#include "exec/common/agg_context_utils.h"
22
#include "exec/common/util.hpp"
23
#include "exprs/vectorized_agg_fn.h"
24
#include "exprs/vslot_ref.h"
25
#include "runtime/descriptors.h"
26
#include "runtime/runtime_state.h"
27
28
namespace doris {
29
30
// ==================== Constructor / Destructor ====================
31
32
UngroupByAggContext::UngroupByAggContext(std::vector<AggFnEvaluator*> agg_evaluators,
33
                                         Sizes agg_state_offsets, size_t total_agg_state_size,
34
                                         size_t agg_state_alignment)
35
67.9k
        : _agg_evaluators(std::move(agg_evaluators)),
36
67.9k
          _agg_state_offsets(std::move(agg_state_offsets)),
37
67.9k
          _total_agg_state_size(total_agg_state_size),
38
67.9k
          _agg_state_alignment(agg_state_alignment) {}
39
40
67.8k
UngroupByAggContext::~UngroupByAggContext() = default;
41
42
// ==================== Profile ====================
43
44
67.8k
void UngroupByAggContext::init_profile(RuntimeProfile* profile) {
45
67.8k
    _build_timer = ADD_TIMER(profile, "BuildTime");
46
67.8k
    _merge_timer = ADD_TIMER(profile, "MergeTime");
47
67.8k
    _deserialize_data_timer = ADD_TIMER(profile, "DeserializeAndMergeTime");
48
67.8k
    _get_results_timer = ADD_TIMER(profile, "GetResultsTime");
49
50
67.8k
    auto* memory_usage =
51
67.8k
            profile->create_child("MemoryUsage", true, true);
52
67.8k
    _memory_used_counter = profile->get_counter("MemoryUsage");
53
67.8k
    _memory_usage_arena = ADD_COUNTER(memory_usage, "Arena", TUnit::BYTES);
54
67.8k
}
55
56
// ==================== Agg state management ====================
57
58
67.9k
Status UngroupByAggContext::_create_agg_state() {
59
67.9k
    DCHECK(!_agg_state_created);
60
67.9k
    _agg_state_data = reinterpret_cast<AggregateDataPtr>(
61
67.9k
            _alloc_arena.aligned_alloc(_total_agg_state_size, _agg_state_alignment));
62
63
263k
    for (int i = 0; i < _agg_evaluators.size(); ++i) {
64
196k
        try {
65
196k
            _agg_evaluators[i]->create(_agg_state_data + _agg_state_offsets[i]);
66
196k
        } catch (...) {
67
17
            for (int j = 0; j < i; ++j) {
68
0
                _agg_evaluators[j]->destroy(_agg_state_data + _agg_state_offsets[j]);
69
0
            }
70
17
            throw;
71
17
        }
72
196k
    }
73
74
67.9k
    _agg_state_created = true;
75
67.9k
    return Status::OK();
76
67.9k
}
77
78
67.8k
void UngroupByAggContext::_destroy_agg_state() {
79
67.8k
    if (!_agg_state_created) {
80
86
        return;
81
86
    }
82
263k
    for (int i = 0; i < _agg_evaluators.size(); ++i) {
83
195k
        _agg_evaluators[i]->function()->destroy(_agg_state_data + _agg_state_offsets[i]);
84
195k
    }
85
67.7k
    _agg_state_created = false;
86
67.7k
}
87
88
67.8k
void UngroupByAggContext::close() {
89
67.8k
    _destroy_agg_state();
90
67.8k
}
91
92
// ==================== Aggregation execution (Sink side) ====================
93
94
68.5k
Status UngroupByAggContext::execute(Block* block) {
95
    // Create agg state on first call (lazy init to match original behavior, which creates
96
    // state in open() - here we ensure it's created before first use).
97
68.5k
    if (!_agg_state_created) {
98
26.0k
        RETURN_IF_ERROR(_create_agg_state());
99
26.0k
    }
100
101
68.5k
    DCHECK(_agg_state_data != nullptr);
102
68.5k
    SCOPED_TIMER(_build_timer);
103
68.5k
    memory_usage_last_executing = 0;
104
68.5k
    SCOPED_PEAK_MEM(&memory_usage_last_executing);
105
106
266k
    for (int i = 0; i < _agg_evaluators.size(); ++i) {
107
197k
        RETURN_IF_ERROR(_agg_evaluators[i]->execute_single_add(
108
197k
                block, _agg_state_data + _agg_state_offsets[i], _agg_arena));
109
197k
    }
110
68.5k
    return Status::OK();
111
68.5k
}
112
113
27.8k
Status UngroupByAggContext::merge(Block* block) {
114
27.8k
    if (!_agg_state_created) {
115
21.7k
        RETURN_IF_ERROR(_create_agg_state());
116
21.7k
    }
117
118
27.8k
    SCOPED_TIMER(_merge_timer);
119
27.8k
    DCHECK(_agg_state_data != nullptr);
120
27.8k
    memory_usage_last_executing = 0;
121
27.8k
    SCOPED_PEAK_MEM(&memory_usage_last_executing);
122
123
118k
    for (int i = 0; i < _agg_evaluators.size(); ++i) {
124
90.1k
        if (_agg_evaluators[i]->is_merge()) {
125
88.7k
            int col_id = _get_slot_column_id(_agg_evaluators[i]);
126
88.7k
            auto column = block->get_by_position(col_id).column;
127
128
88.7k
            SCOPED_TIMER(_deserialize_data_timer);
129
88.7k
            _agg_evaluators[i]->function()->deserialize_and_merge_from_column(
130
88.7k
                    _agg_state_data + _agg_state_offsets[i], *column, _agg_arena);
131
88.7k
        } else {
132
1.39k
            RETURN_IF_ERROR(_agg_evaluators[i]->execute_single_add(
133
1.39k
                    block, _agg_state_data + _agg_state_offsets[i], _agg_arena));
134
1.39k
        }
135
90.1k
    }
136
27.8k
    return Status::OK();
137
27.8k
}
138
139
// ==================== Result output (Source side) ====================
140
141
44.4k
Status UngroupByAggContext::get_serialized_result(RuntimeState* state, Block* block, bool* eos) {
142
44.4k
    SCOPED_TIMER(_get_results_timer);
143
144
    // Ensure agg state exists even if no data flowed through the sink.
145
44.4k
    if (!_agg_state_created) {
146
18.0k
        RETURN_IF_ERROR(_create_agg_state());
147
18.0k
    }
148
149
    // If no data was ever fed, return empty result.
150
44.4k
    if (UNLIKELY(input_num_rows == 0)) {
151
18.0k
        *eos = true;
152
18.0k
        return Status::OK();
153
18.0k
    }
154
26.4k
    block->clear();
155
156
26.4k
    DCHECK(_agg_state_data != nullptr);
157
26.4k
    size_t agg_size = _agg_evaluators.size();
158
159
26.4k
    MutableColumns value_columns(agg_size);
160
26.4k
    std::vector<DataTypePtr> data_types(agg_size);
161
162
113k
    for (int i = 0; i < agg_size; ++i) {
163
87.3k
        data_types[i] = _agg_evaluators[i]->function()->get_serialized_type();
164
87.3k
        value_columns[i] = _agg_evaluators[i]->function()->create_serialize_column();
165
87.3k
    }
166
167
113k
    for (int i = 0; i < agg_size; ++i) {
168
87.3k
        _agg_evaluators[i]->function()->serialize_without_key_to_column(
169
87.3k
                _agg_state_data + _agg_state_offsets[i], *value_columns[i]);
170
87.3k
    }
171
172
26.4k
    {
173
26.4k
        ColumnsWithTypeAndName data_with_schema;
174
113k
        for (int i = 0; i < agg_size; ++i) {
175
87.3k
            ColumnWithTypeAndName column_with_schema = {nullptr, data_types[i], ""};
176
87.3k
            data_with_schema.push_back(std::move(column_with_schema));
177
87.3k
        }
178
26.4k
        *block = Block(data_with_schema);
179
26.4k
    }
180
181
26.4k
    block->set_columns(std::move(value_columns));
182
26.4k
    *eos = true;
183
26.4k
    return Status::OK();
184
44.4k
}
185
186
Status UngroupByAggContext::get_finalized_result(RuntimeState* state, Block* block, bool* eos,
187
23.4k
                                                  const RowDescriptor& row_desc) {
188
    // Ensure agg state exists even if no data flowed through the sink.
189
    // Without GROUP BY, aggregation always produces one row (e.g., COUNT(*) → 0).
190
23.4k
    if (!_agg_state_created) {
191
2.18k
        RETURN_IF_ERROR(_create_agg_state());
192
2.18k
    }
193
23.4k
    DCHECK(_agg_state_data != nullptr);
194
23.4k
    block->clear();
195
196
23.4k
    *block = VectorizedUtils::create_empty_columnswithtypename(row_desc);
197
23.4k
    size_t agg_size = _agg_evaluators.size();
198
199
23.4k
    MutableColumns columns(agg_size);
200
23.4k
    std::vector<DataTypePtr> data_types(agg_size);
201
106k
    for (int i = 0; i < agg_size; ++i) {
202
83.2k
        data_types[i] = _agg_evaluators[i]->function()->get_return_type();
203
83.2k
        columns[i] = data_types[i]->create_column();
204
83.2k
    }
205
206
106k
    for (int i = 0; i < agg_size; ++i) {
207
83.2k
        auto column = columns[i].get();
208
83.2k
        _agg_evaluators[i]->insert_result_info(_agg_state_data + _agg_state_offsets[i], column);
209
83.2k
    }
210
211
23.4k
    const auto& block_schema = block->get_columns_with_type_and_name();
212
23.4k
    DCHECK_EQ(block_schema.size(), columns.size());
213
106k
    for (int i = 0; i < block_schema.size(); ++i) {
214
83.2k
        const auto column_type = block_schema[i].type;
215
83.2k
        if (!column_type->equals(*data_types[i])) {
216
15.1k
            if (column_type->get_primitive_type() != TYPE_ARRAY) {
217
15.1k
                if (!column_type->is_nullable() || data_types[i]->is_nullable() ||
218
15.1k
                    !remove_nullable(column_type)->equals(*data_types[i])) {
219
0
                    return Status::InternalError(
220
0
                            "column_type not match data_types, column_type={}, data_types={}",
221
0
                            column_type->get_name(), data_types[i]->get_name());
222
0
                }
223
15.1k
            }
224
225
            // Result of operator is nullable, but aggregate function result is not nullable.
226
            // This happens when: 1) no group by, 2) input empty, 3) all input columns not nullable.
227
15.1k
            if (column_type->is_nullable() && !data_types[i]->is_nullable()) {
228
15.1k
                ColumnPtr ptr = std::move(columns[i]);
229
                // Unless count, other aggregate functions on empty set should produce null.
230
15.1k
                ptr = make_nullable(ptr, input_num_rows == 0);
231
15.1k
                columns[i] = ptr->assume_mutable();
232
15.1k
            }
233
15.1k
        }
234
83.2k
    }
235
236
23.4k
    block->set_columns(std::move(columns));
237
23.4k
    *eos = true;
238
23.4k
    return Status::OK();
239
23.4k
}
240
241
// ==================== Utilities ====================
242
243
96.4k
void UngroupByAggContext::update_memusage() {
244
96.4k
    int64_t arena_memory_usage = _agg_arena.size();
245
96.4k
    if (_memory_used_counter) {
246
0
        COUNTER_SET(_memory_used_counter, arena_memory_usage);
247
0
    }
248
96.4k
    if (_memory_usage_arena) {
249
96.4k
        COUNTER_SET(_memory_usage_arena, arena_memory_usage);
250
96.4k
    }
251
96.4k
}
252
253
88.7k
int UngroupByAggContext::_get_slot_column_id(const AggFnEvaluator* evaluator) {
254
88.7k
    return agg_context_utils::get_slot_column_id(evaluator);
255
88.7k
}
256
257
} // namespace doris