Coverage Report

Created: 2026-03-31 18:41

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/common/agg_context.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include "common/status.h"
21
#include "core/arena.h"
22
#include "core/block/block.h"
23
#include "runtime/runtime_profile.h"
24
25
namespace doris {
26
27
class AggFnEvaluator;
28
class RuntimeState;
29
class RowDescriptor;
30
using AggregateDataPtr = char*;
31
using Sizes = std::vector<size_t>;
32
33
/// AggContext is the abstract base class for all aggregation context implementations.
34
/// It unifies GroupByAggContext (with hash table) and UngroupByAggContext (without GROUP BY)
35
/// behind a common interface aligned with AggMode semantics:
36
///   - update:    original data -> aggregate state  (INPUT_TO_*)
37
///   - merge:     intermediate state -> aggregate state  (BUFFER_TO_*)
38
///   - serialize: aggregate state -> intermediate state  (*_TO_BUFFER)
39
///   - finalize:  aggregate state -> final result  (*_TO_RESULT)
40
class AggContext {
41
public:
42
    AggContext(std::vector<AggFnEvaluator*> agg_evaluators, Sizes agg_state_offsets,
43
               size_t total_agg_state_size, size_t agg_state_alignment)
44
66
            : _agg_evaluators(std::move(agg_evaluators)),
45
66
              _agg_state_offsets(std::move(agg_state_offsets)),
46
66
              _total_agg_state_size(total_agg_state_size),
47
66
              _agg_state_alignment(agg_state_alignment) {}
48
49
66
    virtual ~AggContext() = default;
50
51
    // ==================== Sink-side operations ====================
52
53
    /// Update mode: feed original data into aggregate state.
54
    virtual Status update(Block* block) = 0;
55
56
    /// Merge mode: merge intermediate state into aggregate state.
57
    virtual Status merge(Block* block) = 0;
58
59
    // ==================== Source-side operations ====================
60
61
    /// Serialize: output intermediate aggregate state for downstream merge.
62
    virtual Status serialize(RuntimeState* state, Block* block, bool* eos) = 0;
63
64
    /// Finalize: output final aggregate results.
65
    /// Caller must call set_finalize_output() before the first finalize() call.
66
    virtual Status finalize(RuntimeState* state, Block* block, bool* eos) = 0;
67
68
    /// Set output schema for finalize(). Called once during source-side init.
69
    /// GroupByAggContext converts RowDescriptor to ColumnsWithTypeAndName.
70
    /// UngroupByAggContext stores the RowDescriptor reference.
71
0
    virtual void set_finalize_output(const RowDescriptor& row_desc) {}
72
73
    // ==================== Lifecycle ====================
74
75
    virtual void close() = 0;
76
    virtual void update_memusage() = 0;
77
78
    // ==================== Profile ====================
79
80
    /// Initialize source-side profile counters. Only GroupByAggContext has source-side counters.
81
7
    virtual void init_source_profile(RuntimeProfile* /*profile*/) {}
82
83
    // ==================== Memory / hash table queries ====================
84
85
    /// Return total memory usage of this aggregation context.
86
0
    virtual size_t memory_usage() const { return 0; }
87
88
    /// Return the number of rows in the hash table (0 for UngroupByAggContext).
89
0
    virtual size_t hash_table_size() const { return 0; }
90
91
    /// Reset the hash table (destroy agg states, clear buckets).
92
0
    virtual Status reset_hash_table() { return Status::OK(); }
93
94
    /// Estimate memory needed for reserving `rows` more rows (for spill decisions).
95
0
    virtual size_t get_reserve_mem_size(RuntimeState* /*state*/) const { return 0; }
96
97
    /// Estimate memory needed to merge `rows` rows into the hash table (for spill decisions).
98
0
    virtual size_t estimated_memory_for_merging(size_t /*rows*/) const { return 0; }
99
100
    // ==================== Spill support ====================
101
102
    /// Merge for spill restore (keys already materialized as first N columns of block).
103
    /// Only GroupByAggContext implements this; UngroupByAggContext never spills.
104
0
    virtual Status merge_for_spill(Block* /*block*/) {
105
0
        return Status::InternalError("merge_for_spill not supported for this context");
106
0
    }
107
108
    // ==================== Limit support ====================
109
110
    /// Apply limit/sort-limit filtering on the output block.
111
    /// Returns true if the caller should apply reached_limit() truncation.
112
    /// Returns false if the block is already filtered (or no limit applies) — caller just counts.
113
7
    virtual bool apply_limit_filter(Block* /*block*/) { return false; }
114
115
    // ==================== Common accessors ====================
116
117
427
    std::vector<AggFnEvaluator*>& agg_evaluators() { return _agg_evaluators; }
118
87
    const Sizes& agg_state_offsets() const { return _agg_state_offsets; }
119
0
    size_t total_agg_state_size() const { return _total_agg_state_size; }
120
0
    size_t agg_state_alignment() const { return _agg_state_alignment; }
121
0
    Arena& agg_arena() { return _agg_arena; }
122
123
    // ==================== Common profile timer accessors ====================
124
125
8
    RuntimeProfile::Counter* build_timer() const { return _build_timer; }
126
60
    RuntimeProfile::Counter* merge_timer() const { return _merge_timer; }
127
0
    RuntimeProfile::Counter* deserialize_data_timer() const { return _deserialize_data_timer; }
128
0
    RuntimeProfile::Counter* get_results_timer() const { return _get_results_timer; }
129
130
    /// Memory tracking for reserve estimation.
131
    int64_t memory_usage_last_executing = 0;
132
133
protected:
134
    std::vector<AggFnEvaluator*> _agg_evaluators;
135
    Sizes _agg_state_offsets;
136
    size_t _total_agg_state_size;
137
    size_t _agg_state_alignment;
138
    Arena _agg_arena;
139
140
    // Common profile counters (initialized by subclass init_profile / init_sink_profile)
141
    RuntimeProfile::Counter* _build_timer = nullptr;
142
    RuntimeProfile::Counter* _merge_timer = nullptr;
143
    RuntimeProfile::Counter* _deserialize_data_timer = nullptr;
144
    RuntimeProfile::Counter* _get_results_timer = nullptr;
145
    RuntimeProfile::Counter* _memory_used_counter = nullptr;
146
    RuntimeProfile::Counter* _memory_usage_arena = nullptr;
147
};
148
149
} // namespace doris