be/src/exprs/vectorized_agg_fn.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exprs/vectorized_agg_fn.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <fmt/ranges.h> // IWYU pragma: keep |
22 | | #include <gen_cpp/Exprs_types.h> |
23 | | #include <gen_cpp/PlanNodes_types.h> |
24 | | #include <glog/logging.h> |
25 | | |
26 | | #include <memory> |
27 | | #include <ostream> |
28 | | #include <string_view> |
29 | | |
30 | | #include "common/config.h" |
31 | | #include "common/object_pool.h" |
32 | | #include "core/block/block.h" |
33 | | #include "core/block/column_with_type_and_name.h" |
34 | | #include "core/block/materialize_block.h" |
35 | | #include "core/data_type/data_type_agg_state.h" |
36 | | #include "core/data_type/data_type_factory.hpp" |
37 | | #include "exec/common/util.hpp" |
38 | | #include "exprs/aggregate/aggregate_function_ai_agg.h" |
39 | | #include "exprs/aggregate/aggregate_function_java_udaf.h" |
40 | | #include "exprs/aggregate/aggregate_function_python_udaf.h" |
41 | | #include "exprs/aggregate/aggregate_function_rpc.h" |
42 | | #include "exprs/aggregate/aggregate_function_simple_factory.h" |
43 | | #include "exprs/aggregate/aggregate_function_sort.h" |
44 | | #include "exprs/aggregate/aggregate_function_state_merge.h" |
45 | | #include "exprs/aggregate/aggregate_function_state_union.h" |
46 | | #include "exprs/vexpr.h" |
47 | | #include "exprs/vexpr_context.h" |
48 | | |
49 | | static constexpr int64_t BE_VERSION_THAT_SUPPORT_NULLABLE_CHECK = 8; |
50 | | |
51 | | namespace doris { |
52 | | class RowDescriptor; |
53 | | class Arena; |
54 | | class BufferWritable; |
55 | | class IColumn; |
56 | | } // namespace doris |
57 | | |
58 | | namespace doris { |
59 | | |
60 | | template <class FunctionType> |
61 | | AggregateFunctionPtr get_agg_state_function(const DataTypes& argument_types, |
62 | 516 | DataTypePtr return_type) { |
63 | 516 | return FunctionType::create( |
64 | 516 | assert_cast<const DataTypeAggState*>(argument_types[0].get())->get_nested_function(), |
65 | 516 | argument_types, return_type); |
66 | 516 | } _ZN5doris22get_agg_state_functionINS_19AggregateStateUnionEEESt10shared_ptrINS_18IAggregateFunctionEERKSt6vectorIS2_IKNS_9IDataTypeEESaIS8_EES8_ Line | Count | Source | 62 | 149 | DataTypePtr return_type) { | 63 | 149 | return FunctionType::create( | 64 | 149 | assert_cast<const DataTypeAggState*>(argument_types[0].get())->get_nested_function(), | 65 | 149 | argument_types, return_type); | 66 | 149 | } |
_ZN5doris22get_agg_state_functionINS_19AggregateStateMergeEEESt10shared_ptrINS_18IAggregateFunctionEERKSt6vectorIS2_IKNS_9IDataTypeEESaIS8_EES8_ Line | Count | Source | 62 | 367 | DataTypePtr return_type) { | 63 | 367 | return FunctionType::create( | 64 | 367 | assert_cast<const DataTypeAggState*>(argument_types[0].get())->get_nested_function(), | 65 | 367 | argument_types, return_type); | 66 | 367 | } |
|
67 | | |
68 | | AggFnEvaluator::AggFnEvaluator(const TExprNode& desc, const bool without_key, |
69 | | const bool is_window_function) |
70 | 184k | : _fn(desc.fn), |
71 | 184k | _is_merge(desc.agg_expr.is_merge_agg), |
72 | 184k | _without_key(without_key), |
73 | 184k | _is_window_function(is_window_function), |
74 | 184k | _data_type(DataTypeFactory::instance().create_data_type( |
75 | 18.4E | desc.fn.ret_type, desc.__isset.is_nullable ? desc.is_nullable : true)) { |
76 | 184k | if (desc.agg_expr.__isset.param_types) { |
77 | 184k | const auto& param_types = desc.agg_expr.param_types; |
78 | 184k | for (const auto& param_type : param_types) { |
79 | 149k | _argument_types_with_sort.push_back( |
80 | 149k | DataTypeFactory::instance().create_data_type(param_type)); |
81 | 149k | } |
82 | 184k | } |
83 | 184k | } |
84 | | |
85 | | Status AggFnEvaluator::create(ObjectPool* pool, const TExpr& desc, const TSortInfo& sort_info, |
86 | | const bool without_key, const bool is_window_function, |
87 | 184k | AggFnEvaluator** result) { |
88 | 184k | *result = |
89 | 184k | pool->add(AggFnEvaluator::create_unique(desc.nodes[0], without_key, is_window_function) |
90 | 184k | .release()); |
91 | 184k | auto& agg_fn_evaluator = *result; |
92 | 184k | int node_idx = 0; |
93 | 353k | for (int i = 0; i < desc.nodes[0].num_children; ++i) { |
94 | 168k | ++node_idx; |
95 | 168k | VExprSPtr expr; |
96 | 168k | VExprContextSPtr ctx; |
97 | 168k | RETURN_IF_ERROR(VExpr::create_tree_from_thrift(desc.nodes, &node_idx, expr, ctx)); |
98 | 168k | agg_fn_evaluator->_input_exprs_ctxs.push_back(ctx); |
99 | 168k | } |
100 | | |
101 | 184k | auto sort_size = sort_info.ordering_exprs.size(); |
102 | 184k | auto real_arguments_size = agg_fn_evaluator->_argument_types_with_sort.size() - sort_size; |
103 | | // Child arguments contains [real arguments, order by arguments], we pass the arguments |
104 | | // to the order by functions |
105 | 184k | for (int i = 0; i < sort_size; ++i) { |
106 | 119 | agg_fn_evaluator->_sort_description.emplace_back(real_arguments_size + i, |
107 | 119 | sort_info.is_asc_order[i] ? 1 : -1, |
108 | 119 | sort_info.nulls_first[i] ? -1 : 1); |
109 | 119 | } |
110 | | |
111 | | // Pass the real arguments to get functions |
112 | 333k | for (int i = 0; i < real_arguments_size; ++i) { |
113 | 149k | agg_fn_evaluator->_real_argument_types.emplace_back( |
114 | 149k | agg_fn_evaluator->_argument_types_with_sort[i]); |
115 | 149k | } |
116 | 184k | return Status::OK(); |
117 | 184k | } |
118 | | |
119 | | Status AggFnEvaluator::prepare(RuntimeState* state, const RowDescriptor& desc, |
120 | | const SlotDescriptor* intermediate_slot_desc, |
121 | 184k | const SlotDescriptor* output_slot_desc) { |
122 | 184k | DCHECK(intermediate_slot_desc != nullptr); |
123 | 184k | DCHECK(_intermediate_slot_desc == nullptr); |
124 | 184k | _output_slot_desc = output_slot_desc; |
125 | 184k | _intermediate_slot_desc = intermediate_slot_desc; |
126 | | |
127 | 184k | Status status = VExpr::prepare(_input_exprs_ctxs, state, desc); |
128 | 184k | RETURN_IF_ERROR(status); |
129 | | |
130 | 184k | DataTypes tmp_argument_types; |
131 | 184k | tmp_argument_types.reserve(_input_exprs_ctxs.size()); |
132 | | |
133 | 184k | std::vector<std::string_view> child_expr_name; |
134 | | |
135 | | // prepare for argument |
136 | 184k | for (auto& _input_exprs_ctx : _input_exprs_ctxs) { |
137 | 168k | auto data_type = _input_exprs_ctx->root()->data_type(); |
138 | 168k | tmp_argument_types.emplace_back(data_type); |
139 | 168k | child_expr_name.emplace_back(_input_exprs_ctx->root()->expr_name()); |
140 | 168k | } |
141 | | |
142 | 184k | std::vector<std::string> column_names; |
143 | 184k | for (const auto& expr_ctx : _input_exprs_ctxs) { |
144 | 168k | const auto& root = expr_ctx->root(); |
145 | 168k | if (!root->expr_name().empty() && !root->is_constant()) { |
146 | 73.4k | column_names.emplace_back(root->expr_name()); |
147 | 73.4k | } |
148 | 168k | } |
149 | | |
150 | 184k | const DataTypes& argument_types = |
151 | 184k | _real_argument_types.empty() ? tmp_argument_types : _real_argument_types; |
152 | | |
153 | 184k | if (_fn.binary_type == TFunctionBinaryType::JAVA_UDF) { |
154 | 115 | if (config::enable_java_support) { |
155 | 115 | _function = AggregateJavaUdaf::create(_fn, argument_types, _data_type); |
156 | 115 | RETURN_IF_ERROR(static_cast<AggregateJavaUdaf*>(_function.get())->check_udaf(_fn)); |
157 | 115 | } else { |
158 | 0 | return Status::InternalError( |
159 | 0 | "Java UDAF is not enabled, you can change be config enable_java_support to " |
160 | 0 | "true and restart be."); |
161 | 0 | } |
162 | 184k | } else if (_fn.binary_type == TFunctionBinaryType::PYTHON_UDF) { |
163 | 679 | if (config::enable_python_udf_support) { |
164 | 679 | _function = AggregatePythonUDAF::create(_fn, argument_types, _data_type); |
165 | 679 | RETURN_IF_ERROR(static_cast<AggregatePythonUDAF*>(_function.get())->open()); |
166 | 679 | LOG(INFO) << fmt::format( |
167 | 679 | "Created Python UDAF: {}, runtime_version: {}, function_code: {}", |
168 | 679 | _fn.name.function_name, _fn.runtime_version, _fn.function_code); |
169 | 679 | } else { |
170 | 0 | return Status::InternalError( |
171 | 0 | "Python UDAF is not enabled, you can change be config " |
172 | 0 | "enable_python_udf_support to true and restart be."); |
173 | 0 | } |
174 | 184k | } else if (_fn.binary_type == TFunctionBinaryType::RPC) { |
175 | 0 | _function = AggregateRpcUdaf::create(_fn, argument_types, _data_type); |
176 | 184k | } else if (_fn.binary_type == TFunctionBinaryType::AGG_STATE) { |
177 | 517 | if (argument_types.size() != 1) { |
178 | 0 | return Status::InternalError("Agg state Function must input 1 argument but get {}", |
179 | 0 | argument_types.size()); |
180 | 0 | } |
181 | 517 | if (argument_types[0]->is_nullable()) { |
182 | 0 | return Status::InternalError("Agg state function input type must be not nullable"); |
183 | 0 | } |
184 | 517 | if (argument_types[0]->get_primitive_type() != PrimitiveType::TYPE_AGG_STATE) { |
185 | 0 | return Status::InternalError( |
186 | 0 | "Agg state function input type must be agg_state but get {}", |
187 | 0 | argument_types[0]->get_family_name()); |
188 | 0 | } |
189 | | |
190 | 517 | std::string type_function_name = |
191 | 517 | assert_cast<const DataTypeAggState*>(argument_types[0].get())->get_function_name(); |
192 | 517 | if (type_function_name + AGG_UNION_SUFFIX == _fn.name.function_name) { |
193 | 149 | if (_data_type->is_nullable()) { |
194 | 0 | return Status::InternalError( |
195 | 0 | "Union function return type must be not nullable, real={}", |
196 | 0 | _data_type->get_name()); |
197 | 0 | } |
198 | 149 | if (_data_type->get_primitive_type() != PrimitiveType::TYPE_AGG_STATE) { |
199 | 0 | return Status::InternalError( |
200 | 0 | "Union function return type must be AGG_STATE, real={}", |
201 | 0 | _data_type->get_name()); |
202 | 0 | } |
203 | 149 | _function = get_agg_state_function<AggregateStateUnion>(argument_types, _data_type); |
204 | 368 | } else if (type_function_name + AGG_MERGE_SUFFIX == _fn.name.function_name) { |
205 | 367 | auto type = assert_cast<const DataTypeAggState*>(argument_types[0].get()) |
206 | 367 | ->get_nested_function() |
207 | 367 | ->get_return_type(); |
208 | 367 | if (!type->equals(*_data_type)) { |
209 | 0 | return Status::InternalError("{}'s expect return type is {}, but input {}", |
210 | 0 | argument_types[0]->get_name(), type->get_name(), |
211 | 0 | _data_type->get_name()); |
212 | 0 | } |
213 | 367 | _function = get_agg_state_function<AggregateStateMerge>(argument_types, _data_type); |
214 | 367 | } else { |
215 | 1 | return Status::InternalError("{} not match function {}", argument_types[0]->get_name(), |
216 | 1 | _fn.name.function_name); |
217 | 1 | } |
218 | 183k | } else { |
219 | 183k | const bool is_foreach = |
220 | 183k | AggregateFunctionSimpleFactory::is_foreach(_fn.name.function_name) || |
221 | 183k | AggregateFunctionSimpleFactory::is_foreachv2(_fn.name.function_name); |
222 | | // Here, only foreachv1 needs special treatment, and v2 can follow the normal code logic. |
223 | 183k | if (AggregateFunctionSimpleFactory::is_foreach(_fn.name.function_name)) { |
224 | 0 | _function = AggregateFunctionSimpleFactory::instance().get( |
225 | 0 | _fn.name.function_name, argument_types, _data_type, |
226 | 0 | AggregateFunctionSimpleFactory::result_nullable_by_foreach(_data_type), |
227 | 0 | state->be_exec_version(), |
228 | 0 | {.is_window_function = _is_window_function, |
229 | 0 | .is_foreach = is_foreach, |
230 | 0 | .enable_aggregate_function_null_v2 = |
231 | 0 | state->enable_aggregate_function_null_v2(), |
232 | 0 | .column_names = std::move(column_names)}); |
233 | 183k | } else { |
234 | 183k | _function = AggregateFunctionSimpleFactory::instance().get( |
235 | 183k | _fn.name.function_name, argument_types, _data_type, _data_type->is_nullable(), |
236 | 183k | state->be_exec_version(), |
237 | 183k | {.is_window_function = _is_window_function, |
238 | 183k | .is_foreach = is_foreach, |
239 | 183k | .enable_aggregate_function_null_v2 = |
240 | 183k | state->enable_aggregate_function_null_v2(), |
241 | 183k | .column_names = std::move(column_names)}); |
242 | 183k | } |
243 | 183k | } |
244 | 184k | if (_function == nullptr) { |
245 | 0 | return Status::InternalError("Agg Function {} is not implemented", _fn.signature); |
246 | 0 | } |
247 | | |
248 | 184k | if (!_sort_description.empty()) { |
249 | 101 | _function = transform_to_sort_agg_function(_function, _argument_types_with_sort, |
250 | 101 | _sort_description, state); |
251 | 101 | } |
252 | | |
253 | 184k | if (_fn.name.function_name == "ai_agg") { |
254 | 0 | _function->set_query_context(state->get_query_ctx()); |
255 | 0 | } |
256 | | |
257 | | // Foreachv2, like foreachv1, does not check the return type, |
258 | | // because its return type is related to the internal agg. |
259 | 184k | if (!AggregateFunctionSimpleFactory::is_foreach(_fn.name.function_name) && |
260 | 184k | !AggregateFunctionSimpleFactory::is_foreachv2(_fn.name.function_name)) { |
261 | 184k | if (state->be_exec_version() >= BE_VERSION_THAT_SUPPORT_NULLABLE_CHECK) { |
262 | 184k | RETURN_IF_ERROR( |
263 | 184k | _function->verify_result_type(_without_key, argument_types, _data_type)); |
264 | 184k | } |
265 | 184k | } |
266 | 184k | _expr_name = fmt::format("{}({})", _fn.name.function_name, child_expr_name); |
267 | 184k | return Status::OK(); |
268 | 184k | } |
269 | | |
270 | 184k | Status AggFnEvaluator::open(RuntimeState* state) { |
271 | 184k | return VExpr::open(_input_exprs_ctxs, state); |
272 | 184k | } |
273 | | |
274 | 3.66M | void AggFnEvaluator::create(AggregateDataPtr place) { |
275 | 3.66M | _function->create(place); |
276 | 3.66M | } |
277 | | |
278 | 9.76k | void AggFnEvaluator::destroy(AggregateDataPtr place) { |
279 | 9.76k | _function->destroy(place); |
280 | 9.76k | } |
281 | | |
282 | 230k | Status AggFnEvaluator::execute_single_add(Block* block, AggregateDataPtr place, Arena& arena) { |
283 | 230k | RETURN_IF_ERROR(_calc_argument_columns(block)); |
284 | 230k | _function->add_batch_single_place(block->rows(), place, _agg_columns.data(), arena); |
285 | 230k | return Status::OK(); |
286 | 230k | } |
287 | | |
288 | | Status AggFnEvaluator::execute_batch_add(Block* block, size_t offset, AggregateDataPtr* places, |
289 | 90.6k | Arena& arena, bool agg_many) { |
290 | 90.6k | RETURN_IF_ERROR(_calc_argument_columns(block)); |
291 | 90.6k | _function->add_batch(block->rows(), places, offset, _agg_columns.data(), arena, agg_many); |
292 | 90.6k | return Status::OK(); |
293 | 90.6k | } |
294 | | |
295 | | Status AggFnEvaluator::execute_batch_add_selected(Block* block, size_t offset, |
296 | 2 | AggregateDataPtr* places, Arena& arena) { |
297 | 2 | RETURN_IF_ERROR(_calc_argument_columns(block)); |
298 | 2 | _function->add_batch_selected(block->rows(), places, offset, _agg_columns.data(), arena); |
299 | 2 | return Status::OK(); |
300 | 2 | } |
301 | | |
302 | | Status AggFnEvaluator::streaming_agg_serialize_to_column(Block* block, MutableColumnPtr& dst, |
303 | 24 | const size_t num_rows, Arena& arena) { |
304 | 24 | RETURN_IF_ERROR(_calc_argument_columns(block)); |
305 | 24 | _function->streaming_agg_serialize_to_column(_agg_columns.data(), dst, num_rows, arena); |
306 | 24 | return Status::OK(); |
307 | 24 | } |
308 | | |
309 | 85.8k | void AggFnEvaluator::insert_result_info(AggregateDataPtr place, IColumn* column) { |
310 | 85.8k | _function->insert_result_into(place, *column); |
311 | 85.8k | } |
312 | | |
313 | | void AggFnEvaluator::insert_result_info_vec(const std::vector<AggregateDataPtr>& places, |
314 | 56.5k | size_t offset, IColumn* column, const size_t num_rows) { |
315 | 56.5k | _function->insert_result_into_vec(places, offset, *column, num_rows); |
316 | 56.5k | } |
317 | | |
318 | 15.4k | void AggFnEvaluator::reset(AggregateDataPtr place) { |
319 | 15.4k | _function->reset(place); |
320 | 15.4k | } |
321 | | |
322 | 0 | std::string AggFnEvaluator::debug_string(const std::vector<AggFnEvaluator*>& exprs) { |
323 | 0 | std::stringstream out; |
324 | 0 | out << "["; |
325 | |
|
326 | 0 | for (int i = 0; i < exprs.size(); ++i) { |
327 | 0 | out << (i == 0 ? "" : " ") << exprs[i]->debug_string(); |
328 | 0 | } |
329 | |
|
330 | 0 | out << "]"; |
331 | 0 | return out.str(); |
332 | 0 | } |
333 | | |
334 | 0 | std::string AggFnEvaluator::debug_string() const { |
335 | 0 | std::stringstream out; |
336 | 0 | out << "AggFnEvaluator("; |
337 | 0 | out << _fn.signature; |
338 | 0 | out << ")"; |
339 | 0 | return out.str(); |
340 | 0 | } |
341 | | |
342 | 321k | Status AggFnEvaluator::_calc_argument_columns(Block* block) { |
343 | 321k | SCOPED_TIMER(_expr_timer); |
344 | 321k | _agg_columns.resize(_input_exprs_ctxs.size()); |
345 | 321k | std::vector<int> column_ids(_input_exprs_ctxs.size()); |
346 | 577k | for (int i = 0; i < _input_exprs_ctxs.size(); ++i) { |
347 | 256k | int column_id = -1; |
348 | 256k | RETURN_IF_ERROR(_input_exprs_ctxs[i]->execute(block, &column_id)); |
349 | 256k | column_ids[i] = column_id; |
350 | 256k | } |
351 | 321k | materialize_block_inplace(*block, column_ids.data(), |
352 | 321k | column_ids.data() + _input_exprs_ctxs.size()); |
353 | 577k | for (int i = 0; i < _input_exprs_ctxs.size(); ++i) { |
354 | 256k | _agg_columns[i] = block->get_by_position(column_ids[i]).column.get(); |
355 | 256k | } |
356 | 321k | return Status::OK(); |
357 | 321k | } |
358 | | |
359 | 320k | AggFnEvaluator* AggFnEvaluator::clone(RuntimeState* state, ObjectPool* pool) { |
360 | 320k | return pool->add(AggFnEvaluator::create_unique(*this, state).release()); |
361 | 320k | } |
362 | | |
363 | | AggFnEvaluator::AggFnEvaluator(AggFnEvaluator& evaluator, RuntimeState* state) |
364 | 320k | : _fn(evaluator._fn), |
365 | 320k | _is_merge(evaluator._is_merge), |
366 | 320k | _without_key(evaluator._without_key), |
367 | 320k | _is_window_function(evaluator._is_window_function), |
368 | 320k | _argument_types_with_sort(evaluator._argument_types_with_sort), |
369 | 320k | _real_argument_types(evaluator._real_argument_types), |
370 | 320k | _intermediate_slot_desc(evaluator._intermediate_slot_desc), |
371 | 320k | _output_slot_desc(evaluator._output_slot_desc), |
372 | 320k | _sort_description(evaluator._sort_description), |
373 | 320k | _data_type(evaluator._data_type), |
374 | 320k | _function(evaluator._function), |
375 | 320k | _expr_name(evaluator._expr_name), |
376 | 320k | _agg_columns(evaluator._agg_columns) { |
377 | 320k | if (evaluator._fn.binary_type == TFunctionBinaryType::JAVA_UDF) { |
378 | 720 | DataTypes tmp_argument_types; |
379 | 720 | tmp_argument_types.reserve(evaluator._input_exprs_ctxs.size()); |
380 | | // prepare for argument |
381 | 774 | for (auto& _input_exprs_ctx : evaluator._input_exprs_ctxs) { |
382 | 774 | auto data_type = _input_exprs_ctx->root()->data_type(); |
383 | 774 | tmp_argument_types.emplace_back(data_type); |
384 | 774 | } |
385 | 720 | const DataTypes& argument_types = |
386 | 720 | _real_argument_types.empty() ? tmp_argument_types : _real_argument_types; |
387 | 720 | _function = AggregateJavaUdaf::create(evaluator._fn, argument_types, evaluator._data_type); |
388 | 720 | THROW_IF_ERROR(static_cast<AggregateJavaUdaf*>(_function.get())->check_udaf(evaluator._fn)); |
389 | 720 | } |
390 | 320k | DCHECK(_function != nullptr); |
391 | | |
392 | 320k | _input_exprs_ctxs.resize(evaluator._input_exprs_ctxs.size()); |
393 | 605k | for (size_t i = 0; i < _input_exprs_ctxs.size(); i++) { |
394 | 285k | WARN_IF_ERROR(evaluator._input_exprs_ctxs[i]->clone(state, _input_exprs_ctxs[i]), ""); |
395 | 285k | } |
396 | 320k | } |
397 | | |
398 | | Status AggFnEvaluator::check_agg_fn_output(uint32_t key_size, |
399 | | const std::vector<AggFnEvaluator*>& agg_fn, |
400 | 33.8k | const RowDescriptor& output_row_desc) { |
401 | 33.8k | auto name_and_types = VectorizedUtils::create_name_and_data_types(output_row_desc); |
402 | 128k | for (uint32_t i = key_size, j = 0; i < name_and_types.size(); i++, j++) { |
403 | 95.0k | auto&& [name, column_type] = name_and_types[i]; |
404 | 95.0k | auto agg_return_type = agg_fn[j]->function()->get_return_type(); |
405 | 95.0k | if (!column_type->equals(*agg_return_type)) { |
406 | 14.9k | if (!column_type->is_nullable() || agg_return_type->is_nullable() || |
407 | 14.9k | !remove_nullable(column_type)->equals(*agg_return_type)) { |
408 | 0 | return Status::InternalError( |
409 | 0 | "column_type not match data_types in agg node, column_type={}, " |
410 | 0 | "data_types={},column name={}", |
411 | 0 | column_type->get_name(), agg_return_type->get_name(), name); |
412 | 0 | } |
413 | 14.9k | } |
414 | 95.0k | } |
415 | 33.8k | return Status::OK(); |
416 | 33.8k | } |
417 | | |
418 | 1.48M | bool AggFnEvaluator::is_blockable() const { |
419 | 1.48M | return _function->is_blockable() || |
420 | 1.48M | std::any_of(_input_exprs_ctxs.begin(), _input_exprs_ctxs.end(), |
421 | 1.48M | [](VExprContextSPtr ctx) { return ctx->root()->is_blockable(); }); |
422 | 1.48M | } |
423 | | |
424 | | } // namespace doris |