be/src/exec/common/groupby_agg_context.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <queue> |
21 | | |
22 | | #include "common/status.h" |
23 | | #include "core/block/block.h" |
24 | | #include "exec/common/agg_context.h" |
25 | | #include "exec/common/agg_utils.h" |
26 | | #include "runtime/runtime_profile.h" |
27 | | |
28 | | namespace doris { |
29 | | |
30 | | class AggFnEvaluator; |
31 | | class RuntimeState; |
32 | | class VExprContext; |
33 | | using VExprContextSPtr = std::shared_ptr<VExprContext>; |
34 | | using VExprContextSPtrs = std::vector<VExprContextSPtr>; |
35 | | |
36 | | /// GroupByAggContext encapsulates all hash-table-based aggregation logic for GROUP BY queries. |
37 | | /// It is shared between AggSinkLocalState (write path) and AggLocalState (read path) in |
38 | | /// 2-phase aggregation, or owned locally by StreamingAggLocalState in 1-phase streaming agg. |
39 | | /// |
40 | | /// InlineCountAggContext (subclass) overrides virtual methods to implement the |
41 | | /// inline-count optimization (storing UInt64 count directly in the hash table mapped slot |
42 | | /// instead of a full aggregate state). |
43 | | class GroupByAggContext : public AggContext { |
44 | | public: |
45 | | GroupByAggContext(std::vector<AggFnEvaluator*> agg_evaluators, |
46 | | VExprContextSPtrs groupby_expr_ctxs, Sizes agg_state_offsets, |
47 | | size_t total_agg_state_size, size_t agg_state_alignment, bool is_first_phase); |
48 | | |
49 | | virtual ~GroupByAggContext(); |
50 | | |
51 | | // ==================== Aggregation execution (Sink side) ==================== |
52 | | |
53 | | /// Update mode: evaluate groupby exprs → emplace → execute_batch_add |
54 | | Status update(Block* block) override; |
55 | | |
56 | | /// Emplace + execute_batch_add with pre-evaluated key columns. |
57 | | /// InlineCountAggContext overrides to only emplace (count++ is done internally). |
58 | | /// Used by StreamingAgg which evaluates key expressions separately. |
59 | | virtual Status emplace_and_forward(AggregateDataPtr* places, ColumnRawPtrs& key_columns, |
60 | | uint32_t num_rows, Block* block, bool expand_hash_table); |
61 | | |
62 | | /// Merge mode: evaluate groupby exprs → emplace → deserialize_and_merge |
63 | | Status merge(Block* block) override; |
64 | | |
65 | | /// Merge for spill restore (keys already materialized as first N columns of block) |
66 | | /// (declared via AggContext::merge_for_spill override above) |
67 | | |
68 | | // ==================== Result output (Source side) ==================== |
69 | | |
70 | | /// Serialize mode output (for non-finalize path and StreamingAgg) |
71 | | Status serialize(RuntimeState* state, Block* block, bool* eos) override; |
72 | | |
73 | | /// Finalize mode output (for AggSource finalize path). |
74 | | /// Caller must call set_finalize_output() before the first call. |
75 | | Status finalize(RuntimeState* state, Block* block, bool* eos) override; |
76 | | |
77 | | /// Store output schema for finalize(). Converts RowDescriptor to ColumnsWithTypeAndName. |
78 | | void set_finalize_output(const RowDescriptor& row_desc) override; |
79 | | |
80 | | // ==================== Agg state management ==================== |
81 | | |
82 | | virtual Status create_agg_state(AggregateDataPtr data); |
83 | | void close() override; |
84 | | |
85 | | // ==================== Utilities ==================== |
86 | | |
87 | | size_t hash_table_size() const override; |
88 | | size_t memory_usage() const override; |
89 | | void update_memusage() override; |
90 | | void init_hash_method(); |
91 | | /// Initialize the AggregateDataContainer after hash method is set up. |
92 | | /// Must be called after init_hash_method(). |
93 | | virtual void init_agg_data_container(); |
94 | | Status reset_hash_table() override; |
95 | | |
96 | | /// Sink operator calls this to register sink-side profile counters. |
97 | | void init_sink_profile(RuntimeProfile* profile); |
98 | | /// Source operator calls this to register source-side profile counters. |
99 | | void init_source_profile(RuntimeProfile* profile) override; |
100 | | |
101 | | /// Evaluate groupby expressions on block, filling key_columns and optionally key_locs. |
102 | | /// Handles convert_to_full_column_if_const and replace_float_special_values. |
103 | | Status evaluate_groupby_keys(Block* block, ColumnRawPtrs& key_columns, |
104 | | std::vector<int>* key_locs = nullptr); |
105 | | |
106 | | // ==================== Sort limit ==================== |
107 | | |
108 | | void build_limit_heap(size_t hash_table_size); |
109 | | bool do_limit_filter(size_t num_rows, const ColumnRawPtrs& key_columns); |
110 | | void refresh_top_limit(size_t row_id, const ColumnRawPtrs& key_columns); |
111 | | /// Update limit heap with new top-N candidates from passthrough path. |
112 | | /// Finds the first row where cmp_res==1 && need_computes[i], inserts into heap, then breaks. |
113 | | void add_limit_heap_top(ColumnRawPtrs& key_columns, size_t rows); |
114 | | |
115 | | /// Emplace with sort-limit filtering. Returns true if aggregation should proceed. |
116 | | /// When key_locs is provided, re-fetches key_columns from block after filtering. |
117 | | bool emplace_into_hash_table_limit(AggregateDataPtr* places, Block* block, |
118 | | const std::vector<int>* key_locs, ColumnRawPtrs& key_columns, |
119 | | uint32_t num_rows); |
120 | | |
121 | | // ==================== Streaming preagg support ==================== |
122 | | |
123 | | /// Check if preagg hash table should expand based on reduction statistics. |
124 | | /// Updates internal _should_expand_hash_table flag. |
125 | | bool should_expand_preagg_hash_table(int64_t input_rows, int64_t returned_rows, |
126 | | bool is_single_backend); |
127 | | |
128 | | /// Check if preagg should be skipped (passthrough mode). |
129 | | /// mem_limit: spill mem limit, 0 means no limit. Returns true if should skip. |
130 | | bool should_skip_preagg(size_t rows, size_t mem_limit, int64_t input_rows, |
131 | | int64_t returned_rows, bool is_single_backend); |
132 | | |
133 | | /// Passthrough serialize for streaming agg: serialize agg values directly without aggregating. |
134 | | Status streaming_serialize_passthrough(Block* in_block, Block* out_block, |
135 | | ColumnRawPtrs& key_columns, uint32_t rows, |
136 | | bool mem_reuse); |
137 | | |
138 | | /// Preagg emplace + forward using internal _places buffer and _should_expand_hash_table. |
139 | | Status preagg_emplace_and_forward(ColumnRawPtrs& key_columns, uint32_t num_rows, Block* block); |
140 | | |
141 | | /// Emplace with sort-limit + execute_batch_add using internal _places buffer. |
142 | | Status emplace_and_forward_limit(Block* block, ColumnRawPtrs& key_columns, uint32_t num_rows); |
143 | | |
144 | | /// Query whether hash table should be expanded (for streaming preagg). |
145 | 0 | bool should_expand_hash_table() const { return _should_expand_hash_table; } |
146 | | |
147 | | // ==================== Data accessors ==================== |
148 | | |
149 | 40 | AggregatedDataVariants* hash_table_data() { return _hash_table_data.get(); } |
150 | 1.34M | AggregateDataContainer* agg_data_container() { return _agg_data_container.get(); } |
151 | 37.0k | const VExprContextSPtrs& groupby_expr_ctxs() const { return _groupby_expr_ctxs; } |
152 | 8 | PaddedPODArray<uint8_t>& need_computes() { return _need_computes; } |
153 | | |
154 | | // Sort limit public state |
155 | | int64_t limit = -1; |
156 | | bool do_sort_limit = false; |
157 | | bool reach_limit = false; |
158 | | std::vector<int> order_directions; |
159 | | std::vector<int> null_directions; |
160 | | |
161 | | // Limit check configuration (set by operator during open) |
162 | | bool should_limit_output = false; |
163 | | bool enable_spill = false; |
164 | | |
165 | | // Key columns that need nullable wrapping in output (left/full join). |
166 | | // When non-empty, mem_reuse must be disabled in get_*_results to avoid |
167 | | // column type mismatch after make_nullable_output_key transforms the block. |
168 | | std::vector<size_t> make_nullable_keys; |
169 | | |
170 | | // Sink-side profile counters (public for operator-level SCOPED_TIMER access) |
171 | 113k | RuntimeProfile::Counter* expr_timer() const { return _expr_timer; } |
172 | | RuntimeProfile::Counter* hash_table_compute_timer() const { return _hash_table_compute_timer; } |
173 | | RuntimeProfile::Counter* hash_table_emplace_timer() const { return _hash_table_emplace_timer; } |
174 | | RuntimeProfile::Counter* hash_table_input_counter() const { return _hash_table_input_counter; } |
175 | | |
176 | | // Source-side profile counters |
177 | 0 | RuntimeProfile::Counter* hash_table_iterate_timer() const { return _hash_table_iterate_timer; } |
178 | 0 | RuntimeProfile::Counter* insert_keys_to_column_timer() const { |
179 | 0 | return _insert_keys_to_column_timer; |
180 | 0 | } |
181 | 0 | RuntimeProfile::Counter* insert_values_to_column_timer() const { |
182 | 0 | return _insert_values_to_column_timer; |
183 | 0 | } |
184 | 0 | RuntimeProfile::Counter* hash_table_limit_compute_timer() const { |
185 | 0 | return _hash_table_limit_compute_timer; |
186 | 0 | } |
187 | | |
188 | | // For spill: estimate memory needed |
189 | | size_t get_reserve_mem_size(RuntimeState* state) const override; |
190 | | |
191 | | /// Estimate memory needed to merge `rows` rows into the hash table. |
192 | | size_t estimated_memory_for_merging(size_t rows) const override; |
193 | | |
194 | | /// Apply limit/sort-limit filtering on the output block. |
195 | | /// Returns true if the caller should apply reached_limit() truncation. |
196 | | bool apply_limit_filter(Block* block) override; |
197 | | |
198 | | /// Merge for spill restore (keys already materialized as first N columns of block). |
199 | | Status merge_for_spill(Block* block) override; |
200 | | |
201 | | protected: |
202 | | // ==================== Internal hash table operations ==================== |
203 | | |
204 | | /// Insert keys into the hash table, fill places array. New keys get agg state created. |
205 | | /// Counter parameters allow callers to direct timing to sink or source profile counters. |
206 | | virtual void emplace_into_hash_table(AggregateDataPtr* places, ColumnRawPtrs& key_columns, |
207 | | uint32_t num_rows, |
208 | | RuntimeProfile::Counter* hash_table_compute_timer, |
209 | | RuntimeProfile::Counter* hash_table_emplace_timer, |
210 | | RuntimeProfile::Counter* hash_table_input_counter); |
211 | | |
212 | | /// Find existing keys in hash table (used when reach_limit && !do_sort_limit). |
213 | | void find_in_hash_table(AggregateDataPtr* places, ColumnRawPtrs& key_columns, |
214 | | uint32_t num_rows); |
215 | | |
216 | | virtual void destroy_agg_state(AggregateDataPtr data); |
217 | | |
218 | | /// Convert columns at specified positions to nullable. |
219 | | static void make_nullable_output_key(Block* block, |
220 | | const std::vector<size_t>& make_nullable_keys); |
221 | | |
222 | | /// Get the column id from an evaluator's input expression (used in merge path). |
223 | | /// Only valid for 1st phase evaluators with a single SlotRef input. |
224 | | static int get_slot_column_id(const AggFnEvaluator* evaluator); |
225 | | |
226 | | // Core hash table data |
227 | | AggregatedDataVariantsUPtr _hash_table_data; |
228 | | std::unique_ptr<AggregateDataContainer> _agg_data_container; |
229 | | |
230 | | // GroupBy-specific metadata |
231 | | VExprContextSPtrs _groupby_expr_ctxs; |
232 | | bool _is_first_phase; |
233 | | |
234 | | // Working buffers |
235 | | PODArray<AggregateDataPtr> _places; |
236 | | std::vector<char> _deserialize_buffer; |
237 | | std::vector<AggregateDataPtr> _values; |
238 | | |
239 | | // Streaming preagg state |
240 | | bool _should_expand_hash_table = true; |
241 | | |
242 | | // Finalize output schema (set by set_finalize_output, used by finalize) |
243 | | ColumnsWithTypeAndName _finalize_schema; |
244 | | |
245 | | // Sort limit state |
246 | | MutableColumns _limit_columns; |
247 | | int _limit_columns_min = -1; |
248 | | PaddedPODArray<uint8_t> _need_computes; |
249 | | std::vector<uint8_t> _cmp_res; |
250 | | |
251 | | struct HeapLimitCursor { |
252 | | HeapLimitCursor(int row_id, MutableColumns& limit_columns, |
253 | | std::vector<int>& order_directions, std::vector<int>& null_directions) |
254 | 166k | : _row_id(row_id), |
255 | 166k | _limit_columns(limit_columns), |
256 | 166k | _order_directions(order_directions), |
257 | 166k | _null_directions(null_directions) {} |
258 | | |
259 | | HeapLimitCursor(const HeapLimitCursor& other) = default; |
260 | | |
261 | | HeapLimitCursor(HeapLimitCursor&& other) noexcept |
262 | 1.01M | : _row_id(other._row_id), |
263 | 1.01M | _limit_columns(other._limit_columns), |
264 | 1.01M | _order_directions(other._order_directions), |
265 | 1.01M | _null_directions(other._null_directions) {} |
266 | | |
267 | | // Only copy _row_id. The three reference members (_limit_columns, _order_directions, |
268 | | // _null_directions) are not rebindable and all HeapLimitCursor instances reference the |
269 | | // same GroupByAggContext members, so skipping them is correct. |
270 | 0 | HeapLimitCursor& operator=(const HeapLimitCursor& other) noexcept { |
271 | 0 | _row_id = other._row_id; |
272 | 0 | return *this; |
273 | 0 | } |
274 | | |
275 | 2.05M | HeapLimitCursor& operator=(HeapLimitCursor&& other) noexcept { |
276 | 2.05M | _row_id = other._row_id; |
277 | 2.05M | return *this; |
278 | 2.05M | } |
279 | | |
280 | 1.94M | bool operator<(const HeapLimitCursor& rhs) const { |
281 | 2.07M | for (int i = 0; i < _limit_columns.size(); ++i) { |
282 | 2.07M | const auto& col = _limit_columns[i]; |
283 | 2.07M | auto res = col->compare_at(_row_id, rhs._row_id, *col, _null_directions[i]) * |
284 | 2.07M | _order_directions[i]; |
285 | 2.07M | if (res < 0) { |
286 | 1.01M | return true; |
287 | 1.06M | } else if (res > 0) { |
288 | 1.04M | return false; |
289 | 1.04M | } |
290 | 2.07M | } |
291 | 18.4E | return false; |
292 | 1.94M | } |
293 | | |
294 | | int _row_id; |
295 | | MutableColumns& _limit_columns; |
296 | | std::vector<int>& _order_directions; |
297 | | std::vector<int>& _null_directions; |
298 | | }; |
299 | | |
300 | | std::priority_queue<HeapLimitCursor> _limit_heap; |
301 | | MutableColumns _get_keys_hash_table(); |
302 | | |
303 | | template <bool limit, bool for_spill = false> |
304 | | Status _merge_with_serialized_key_helper(Block* block); |
305 | | |
306 | | // ---- Evaluator loop helpers ---- |
307 | | |
308 | | /// execute_batch_add for all evaluators. |
309 | | Status _execute_batch_add_evaluators(Block* block, AggregateDataPtr* places, |
310 | | bool expand_hash_table = false); |
311 | | |
312 | | /// execute_batch_add_selected for all evaluators (limit && !sort_limit path). |
313 | | Status _execute_batch_add_selected_evaluators(Block* block, AggregateDataPtr* places); |
314 | | |
315 | | /// Merge-mode evaluator loop using deserialize_and_merge_vec_selected (limit && !sort_limit). |
316 | | Status _merge_evaluators_selected(Block* block, size_t rows, |
317 | | RuntimeProfile::Counter* deser_timer); |
318 | | |
319 | | /// Merge-mode evaluator loop using deserialize_and_merge_vec. |
320 | | template <bool for_spill> |
321 | | Status _merge_evaluators(Block* block, size_t rows, RuntimeProfile::Counter* deser_timer); |
322 | | |
323 | | /// Serialize agg values into value_columns (for serialize()). |
324 | | void _serialize_agg_values(MutableColumns& value_columns, DataTypes& value_data_types, |
325 | | Block* block, bool mem_reuse, size_t key_size, uint32_t num_rows); |
326 | | |
327 | | /// Insert finalized agg results into value_columns for all evaluators. |
328 | | void _insert_finalized_values(MutableColumns& value_columns, uint32_t num_rows); |
329 | | |
330 | | /// Insert single-row finalized result (for null key). |
331 | | void _insert_finalized_single(AggregateDataPtr mapped, MutableColumns& value_columns); |
332 | | |
333 | | /// Check and update reach_limit after emplace (execute path) |
334 | | void _check_limit_after_emplace(); |
335 | | /// Check and update reach_limit after emplace (merge path, simpler: no topn multiplier) |
336 | | void _check_limit_after_emplace_for_merge(); |
337 | | |
338 | | // ---- Sink-side profile counters (created by init_sink_profile) ---- |
339 | | RuntimeProfile::Counter* _hash_table_compute_timer = nullptr; |
340 | | RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr; |
341 | | RuntimeProfile::Counter* _hash_table_input_counter = nullptr; |
342 | | RuntimeProfile::Counter* _hash_table_limit_compute_timer = nullptr; |
343 | | RuntimeProfile::Counter* _expr_timer = nullptr; |
344 | | RuntimeProfile::Counter* _hash_table_size_counter = nullptr; |
345 | | RuntimeProfile::Counter* _hash_table_memory_usage = nullptr; |
346 | | RuntimeProfile::Counter* _serialize_key_arena_memory_usage = nullptr; |
347 | | RuntimeProfile::Counter* _memory_usage_container = nullptr; |
348 | | |
349 | | // ---- Source-side profile counters (created by init_source_profile) ---- |
350 | | RuntimeProfile::Counter* _hash_table_iterate_timer = nullptr; |
351 | | RuntimeProfile::Counter* _insert_keys_to_column_timer = nullptr; |
352 | | RuntimeProfile::Counter* _insert_values_to_column_timer = nullptr; |
353 | | |
354 | | // Source-side counters for overlapping metrics (same names as sink, different profile). |
355 | | // Used during spill recovery merge path (for_spill=true) so that |
356 | | // PartitionedAggLocalState::_update_profile can read them from the inner source profile. |
357 | | RuntimeProfile::Counter* _source_merge_timer = nullptr; |
358 | | RuntimeProfile::Counter* _source_deserialize_data_timer = nullptr; |
359 | | RuntimeProfile::Counter* _source_hash_table_compute_timer = nullptr; |
360 | | RuntimeProfile::Counter* _source_hash_table_emplace_timer = nullptr; |
361 | | RuntimeProfile::Counter* _source_hash_table_input_counter = nullptr; |
362 | | RuntimeProfile::Counter* _source_hash_table_size_counter = nullptr; |
363 | | RuntimeProfile::Counter* _source_hash_table_memory_usage = nullptr; |
364 | | RuntimeProfile::Counter* _source_memory_usage_container = nullptr; |
365 | | RuntimeProfile::Counter* _source_memory_usage_arena = nullptr; |
366 | | }; |
367 | | |
368 | | } // namespace doris |