Coverage Report

Created: 2026-03-30 16:03

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/common/groupby_agg_context.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <queue>
21
22
#include "common/status.h"
23
#include "core/block/block.h"
24
#include "exec/common/agg_context.h"
25
#include "exec/common/agg_utils.h"
26
#include "runtime/runtime_profile.h"
27
28
namespace doris {
29
30
class AggFnEvaluator;
31
class RuntimeState;
32
class VExprContext;
33
using VExprContextSPtr = std::shared_ptr<VExprContext>;
34
using VExprContextSPtrs = std::vector<VExprContextSPtr>;
35
36
/// GroupByAggContext encapsulates all hash-table-based aggregation logic for GROUP BY queries.
37
/// It is shared between AggSinkLocalState (write path) and AggLocalState (read path) in
38
/// 2-phase aggregation, or owned locally by StreamingAggLocalState in 1-phase streaming agg.
39
///
40
/// InlineCountAggContext (subclass) overrides virtual methods to implement the
41
/// inline-count optimization (storing UInt64 count directly in the hash table mapped slot
42
/// instead of a full aggregate state).
43
class GroupByAggContext : public AggContext {
44
public:
45
    GroupByAggContext(std::vector<AggFnEvaluator*> agg_evaluators,
46
                      VExprContextSPtrs groupby_expr_ctxs, Sizes agg_state_offsets,
47
                      size_t total_agg_state_size, size_t agg_state_alignment, bool is_first_phase);
48
49
    virtual ~GroupByAggContext();
50
51
    // ==================== Aggregation execution (Sink side) ====================
52
53
    /// Update mode: evaluate groupby exprs → emplace → execute_batch_add
54
    Status update(Block* block) override;
55
56
    /// Emplace + execute_batch_add with pre-evaluated key columns.
57
    /// InlineCountAggContext overrides to only emplace (count++ is done internally).
58
    /// Used by StreamingAgg which evaluates key expressions separately.
59
    virtual Status emplace_and_forward(AggregateDataPtr* places, ColumnRawPtrs& key_columns,
60
                                       uint32_t num_rows, Block* block, bool expand_hash_table);
61
62
    /// Merge mode: evaluate groupby exprs → emplace → deserialize_and_merge
63
    Status merge(Block* block) override;
64
65
    /// Merge for spill restore (keys already materialized as first N columns of block)
66
    /// (declared via AggContext::merge_for_spill override above)
67
68
    // ==================== Result output (Source side) ====================
69
70
    /// Serialize mode output (for non-finalize path and StreamingAgg)
71
    Status serialize(RuntimeState* state, Block* block, bool* eos) override;
72
73
    /// Finalize mode output (for AggSource finalize path).
74
    /// Caller must call set_finalize_output() before the first call.
75
    Status finalize(RuntimeState* state, Block* block, bool* eos) override;
76
77
    /// Store output schema for finalize(). Converts RowDescriptor to ColumnsWithTypeAndName.
78
    void set_finalize_output(const RowDescriptor& row_desc) override;
79
80
    // ==================== Agg state management ====================
81
82
    virtual Status create_agg_state(AggregateDataPtr data);
83
    void close() override;
84
85
    // ==================== Utilities ====================
86
87
    size_t hash_table_size() const override;
88
    size_t memory_usage() const override;
89
    void update_memusage() override;
90
    void init_hash_method();
91
    /// Initialize the AggregateDataContainer after hash method is set up.
92
    /// Must be called after init_hash_method().
93
    virtual void init_agg_data_container();
94
    Status reset_hash_table() override;
95
96
    /// Sink operator calls this to register sink-side profile counters.
97
    void init_sink_profile(RuntimeProfile* profile);
98
    /// Source operator calls this to register source-side profile counters.
99
    void init_source_profile(RuntimeProfile* profile) override;
100
101
    /// Evaluate groupby expressions on block, filling key_columns and optionally key_locs.
102
    /// Handles convert_to_full_column_if_const and replace_float_special_values.
103
    Status evaluate_groupby_keys(Block* block, ColumnRawPtrs& key_columns,
104
                                 std::vector<int>* key_locs = nullptr);
105
106
    // ==================== Sort limit ====================
107
108
    void build_limit_heap(size_t hash_table_size);
109
    bool do_limit_filter(size_t num_rows, const ColumnRawPtrs& key_columns);
110
    void refresh_top_limit(size_t row_id, const ColumnRawPtrs& key_columns);
111
    /// Update limit heap with new top-N candidates from passthrough path.
112
    /// Finds the first row where cmp_res==1 && need_computes[i], inserts into heap, then breaks.
113
    void add_limit_heap_top(ColumnRawPtrs& key_columns, size_t rows);
114
115
    /// Emplace with sort-limit filtering. Returns true if aggregation should proceed.
116
    /// When key_locs is provided, re-fetches key_columns from block after filtering.
117
    bool emplace_into_hash_table_limit(AggregateDataPtr* places, Block* block,
118
                                       const std::vector<int>* key_locs, ColumnRawPtrs& key_columns,
119
                                       uint32_t num_rows);
120
121
    // ==================== Streaming preagg support ====================
122
123
    /// Check if preagg hash table should expand based on reduction statistics.
124
    /// Updates internal _should_expand_hash_table flag.
125
    bool should_expand_preagg_hash_table(int64_t input_rows, int64_t returned_rows,
126
                                         bool is_single_backend);
127
128
    /// Check if preagg should be skipped (passthrough mode).
129
    /// mem_limit: spill mem limit, 0 means no limit. Returns true if should skip.
130
    bool should_skip_preagg(size_t rows, size_t mem_limit, int64_t input_rows,
131
                            int64_t returned_rows, bool is_single_backend);
132
133
    /// Passthrough serialize for streaming agg: serialize agg values directly without aggregating.
134
    Status streaming_serialize_passthrough(Block* in_block, Block* out_block,
135
                                           ColumnRawPtrs& key_columns, uint32_t rows,
136
                                           bool mem_reuse);
137
138
    /// Preagg emplace + forward using internal _places buffer and _should_expand_hash_table.
139
    Status preagg_emplace_and_forward(ColumnRawPtrs& key_columns, uint32_t num_rows, Block* block);
140
141
    /// Emplace with sort-limit + execute_batch_add using internal _places buffer.
142
    Status emplace_and_forward_limit(Block* block, ColumnRawPtrs& key_columns, uint32_t num_rows);
143
144
    /// Query whether hash table should be expanded (for streaming preagg).
145
0
    bool should_expand_hash_table() const { return _should_expand_hash_table; }
146
147
    // ==================== Data accessors ====================
148
149
40
    AggregatedDataVariants* hash_table_data() { return _hash_table_data.get(); }
150
1.34M
    AggregateDataContainer* agg_data_container() { return _agg_data_container.get(); }
151
37.0k
    const VExprContextSPtrs& groupby_expr_ctxs() const { return _groupby_expr_ctxs; }
152
8
    PaddedPODArray<uint8_t>& need_computes() { return _need_computes; }
153
154
    // Sort limit public state
155
    int64_t limit = -1;
156
    bool do_sort_limit = false;
157
    bool reach_limit = false;
158
    std::vector<int> order_directions;
159
    std::vector<int> null_directions;
160
161
    // Limit check configuration (set by operator during open)
162
    bool should_limit_output = false;
163
    bool enable_spill = false;
164
165
    // Key columns that need nullable wrapping in output (left/full join).
166
    // When non-empty, mem_reuse must be disabled in get_*_results to avoid
167
    // column type mismatch after make_nullable_output_key transforms the block.
168
    std::vector<size_t> make_nullable_keys;
169
170
    // Sink-side profile counters (public for operator-level SCOPED_TIMER access)
171
113k
    RuntimeProfile::Counter* expr_timer() const { return _expr_timer; }
172
    RuntimeProfile::Counter* hash_table_compute_timer() const { return _hash_table_compute_timer; }
173
    RuntimeProfile::Counter* hash_table_emplace_timer() const { return _hash_table_emplace_timer; }
174
    RuntimeProfile::Counter* hash_table_input_counter() const { return _hash_table_input_counter; }
175
176
    // Source-side profile counters
177
0
    RuntimeProfile::Counter* hash_table_iterate_timer() const { return _hash_table_iterate_timer; }
178
0
    RuntimeProfile::Counter* insert_keys_to_column_timer() const {
179
0
        return _insert_keys_to_column_timer;
180
0
    }
181
0
    RuntimeProfile::Counter* insert_values_to_column_timer() const {
182
0
        return _insert_values_to_column_timer;
183
0
    }
184
0
    RuntimeProfile::Counter* hash_table_limit_compute_timer() const {
185
0
        return _hash_table_limit_compute_timer;
186
0
    }
187
188
    // For spill: estimate memory needed
189
    size_t get_reserve_mem_size(RuntimeState* state) const override;
190
191
    /// Estimate memory needed to merge `rows` rows into the hash table.
192
    size_t estimated_memory_for_merging(size_t rows) const override;
193
194
    /// Apply limit/sort-limit filtering on the output block.
195
    /// Returns true if the caller should apply reached_limit() truncation.
196
    bool apply_limit_filter(Block* block) override;
197
198
    /// Merge for spill restore (keys already materialized as first N columns of block).
199
    Status merge_for_spill(Block* block) override;
200
201
protected:
202
    // ==================== Internal hash table operations ====================
203
204
    /// Insert keys into the hash table, fill places array. New keys get agg state created.
205
    /// Counter parameters allow callers to direct timing to sink or source profile counters.
206
    virtual void emplace_into_hash_table(AggregateDataPtr* places, ColumnRawPtrs& key_columns,
207
                                         uint32_t num_rows,
208
                                         RuntimeProfile::Counter* hash_table_compute_timer,
209
                                         RuntimeProfile::Counter* hash_table_emplace_timer,
210
                                         RuntimeProfile::Counter* hash_table_input_counter);
211
212
    /// Find existing keys in hash table (used when reach_limit && !do_sort_limit).
213
    void find_in_hash_table(AggregateDataPtr* places, ColumnRawPtrs& key_columns,
214
                            uint32_t num_rows);
215
216
    virtual void destroy_agg_state(AggregateDataPtr data);
217
218
    /// Convert columns at specified positions to nullable.
219
    static void make_nullable_output_key(Block* block,
220
                                         const std::vector<size_t>& make_nullable_keys);
221
222
    /// Get the column id from an evaluator's input expression (used in merge path).
223
    /// Only valid for 1st phase evaluators with a single SlotRef input.
224
    static int get_slot_column_id(const AggFnEvaluator* evaluator);
225
226
    // Core hash table data
227
    AggregatedDataVariantsUPtr _hash_table_data;
228
    std::unique_ptr<AggregateDataContainer> _agg_data_container;
229
230
    // GroupBy-specific metadata
231
    VExprContextSPtrs _groupby_expr_ctxs;
232
    bool _is_first_phase;
233
234
    // Working buffers
235
    PODArray<AggregateDataPtr> _places;
236
    std::vector<char> _deserialize_buffer;
237
    std::vector<AggregateDataPtr> _values;
238
239
    // Streaming preagg state
240
    bool _should_expand_hash_table = true;
241
242
    // Finalize output schema (set by set_finalize_output, used by finalize)
243
    ColumnsWithTypeAndName _finalize_schema;
244
245
    // Sort limit state
246
    MutableColumns _limit_columns;
247
    int _limit_columns_min = -1;
248
    PaddedPODArray<uint8_t> _need_computes;
249
    std::vector<uint8_t> _cmp_res;
250
251
    struct HeapLimitCursor {
252
        HeapLimitCursor(int row_id, MutableColumns& limit_columns,
253
                        std::vector<int>& order_directions, std::vector<int>& null_directions)
254
166k
                : _row_id(row_id),
255
166k
                  _limit_columns(limit_columns),
256
166k
                  _order_directions(order_directions),
257
166k
                  _null_directions(null_directions) {}
258
259
        HeapLimitCursor(const HeapLimitCursor& other) = default;
260
261
        HeapLimitCursor(HeapLimitCursor&& other) noexcept
262
1.01M
                : _row_id(other._row_id),
263
1.01M
                  _limit_columns(other._limit_columns),
264
1.01M
                  _order_directions(other._order_directions),
265
1.01M
                  _null_directions(other._null_directions) {}
266
267
        // Only copy _row_id. The three reference members (_limit_columns, _order_directions,
268
        // _null_directions) are not rebindable and all HeapLimitCursor instances reference the
269
        // same GroupByAggContext members, so skipping them is correct.
270
0
        HeapLimitCursor& operator=(const HeapLimitCursor& other) noexcept {
271
0
            _row_id = other._row_id;
272
0
            return *this;
273
0
        }
274
275
2.05M
        HeapLimitCursor& operator=(HeapLimitCursor&& other) noexcept {
276
2.05M
            _row_id = other._row_id;
277
2.05M
            return *this;
278
2.05M
        }
279
280
1.94M
        bool operator<(const HeapLimitCursor& rhs) const {
281
2.07M
            for (int i = 0; i < _limit_columns.size(); ++i) {
282
2.07M
                const auto& col = _limit_columns[i];
283
2.07M
                auto res = col->compare_at(_row_id, rhs._row_id, *col, _null_directions[i]) *
284
2.07M
                           _order_directions[i];
285
2.07M
                if (res < 0) {
286
1.01M
                    return true;
287
1.06M
                } else if (res > 0) {
288
1.04M
                    return false;
289
1.04M
                }
290
2.07M
            }
291
18.4E
            return false;
292
1.94M
        }
293
294
        int _row_id;
295
        MutableColumns& _limit_columns;
296
        std::vector<int>& _order_directions;
297
        std::vector<int>& _null_directions;
298
    };
299
300
    std::priority_queue<HeapLimitCursor> _limit_heap;
301
    MutableColumns _get_keys_hash_table();
302
303
    template <bool limit, bool for_spill = false>
304
    Status _merge_with_serialized_key_helper(Block* block);
305
306
    // ---- Evaluator loop helpers ----
307
308
    /// execute_batch_add for all evaluators.
309
    Status _execute_batch_add_evaluators(Block* block, AggregateDataPtr* places,
310
                                         bool expand_hash_table = false);
311
312
    /// execute_batch_add_selected for all evaluators (limit && !sort_limit path).
313
    Status _execute_batch_add_selected_evaluators(Block* block, AggregateDataPtr* places);
314
315
    /// Merge-mode evaluator loop using deserialize_and_merge_vec_selected (limit && !sort_limit).
316
    Status _merge_evaluators_selected(Block* block, size_t rows,
317
                                      RuntimeProfile::Counter* deser_timer);
318
319
    /// Merge-mode evaluator loop using deserialize_and_merge_vec.
320
    template <bool for_spill>
321
    Status _merge_evaluators(Block* block, size_t rows, RuntimeProfile::Counter* deser_timer);
322
323
    /// Serialize agg values into value_columns (for serialize()).
324
    void _serialize_agg_values(MutableColumns& value_columns, DataTypes& value_data_types,
325
                               Block* block, bool mem_reuse, size_t key_size, uint32_t num_rows);
326
327
    /// Insert finalized agg results into value_columns for all evaluators.
328
    void _insert_finalized_values(MutableColumns& value_columns, uint32_t num_rows);
329
330
    /// Insert single-row finalized result (for null key).
331
    void _insert_finalized_single(AggregateDataPtr mapped, MutableColumns& value_columns);
332
333
    /// Check and update reach_limit after emplace (execute path)
334
    void _check_limit_after_emplace();
335
    /// Check and update reach_limit after emplace (merge path, simpler: no topn multiplier)
336
    void _check_limit_after_emplace_for_merge();
337
338
    // ---- Sink-side profile counters (created by init_sink_profile) ----
339
    RuntimeProfile::Counter* _hash_table_compute_timer = nullptr;
340
    RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr;
341
    RuntimeProfile::Counter* _hash_table_input_counter = nullptr;
342
    RuntimeProfile::Counter* _hash_table_limit_compute_timer = nullptr;
343
    RuntimeProfile::Counter* _expr_timer = nullptr;
344
    RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
345
    RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
346
    RuntimeProfile::Counter* _serialize_key_arena_memory_usage = nullptr;
347
    RuntimeProfile::Counter* _memory_usage_container = nullptr;
348
349
    // ---- Source-side profile counters (created by init_source_profile) ----
350
    RuntimeProfile::Counter* _hash_table_iterate_timer = nullptr;
351
    RuntimeProfile::Counter* _insert_keys_to_column_timer = nullptr;
352
    RuntimeProfile::Counter* _insert_values_to_column_timer = nullptr;
353
354
    // Source-side counters for overlapping metrics (same names as sink, different profile).
355
    // Used during spill recovery merge path (for_spill=true) so that
356
    // PartitionedAggLocalState::_update_profile can read them from the inner source profile.
357
    RuntimeProfile::Counter* _source_merge_timer = nullptr;
358
    RuntimeProfile::Counter* _source_deserialize_data_timer = nullptr;
359
    RuntimeProfile::Counter* _source_hash_table_compute_timer = nullptr;
360
    RuntimeProfile::Counter* _source_hash_table_emplace_timer = nullptr;
361
    RuntimeProfile::Counter* _source_hash_table_input_counter = nullptr;
362
    RuntimeProfile::Counter* _source_hash_table_size_counter = nullptr;
363
    RuntimeProfile::Counter* _source_hash_table_memory_usage = nullptr;
364
    RuntimeProfile::Counter* _source_memory_usage_container = nullptr;
365
    RuntimeProfile::Counter* _source_memory_usage_arena = nullptr;
366
};
367
368
} // namespace doris