be/src/exprs/lambda_function/varray_map_function.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include <algorithm> |
19 | | #include <memory> |
20 | | #include <set> |
21 | | #include <string> |
22 | | #include <vector> |
23 | | |
24 | | #include "common/status.h" |
25 | | #include "core/assert_cast.h" |
26 | | #include "core/block/block.h" |
27 | | #include "core/block/column_numbers.h" |
28 | | #include "core/block/column_with_type_and_name.h" |
29 | | #include "core/block/columns_with_type_and_name.h" |
30 | | #include "core/column/column.h" |
31 | | #include "core/column/column_array.h" |
32 | | #include "core/column/column_nothing.h" |
33 | | #include "core/column/column_nullable.h" |
34 | | #include "core/column/column_vector.h" |
35 | | #include "core/data_type/data_type.h" |
36 | | #include "core/data_type/data_type_array.h" |
37 | | #include "core/data_type/data_type_nullable.h" |
38 | | #include "core/data_type/data_type_number.h" |
39 | | #include "exec/common/util.hpp" |
40 | | #include "exprs/aggregate/aggregate_function.h" |
41 | | #include "exprs/lambda_function/lambda_execution_context.h" |
42 | | #include "exprs/lambda_function/lambda_function.h" |
43 | | #include "exprs/lambda_function/lambda_function_factory.h" |
44 | | #include "exprs/vcolumn_ref.h" |
45 | | #include "exprs/vexpr_context.h" |
46 | | #include "exprs/vlambda_function_expr.h" |
47 | | |
48 | | namespace doris { |
49 | | |
50 | | // extend a block with all required parameters |
51 | | struct LambdaArgs { |
52 | | // which line is extended to the original block |
53 | | int64_t current_row_idx = 0; |
54 | | // when a block is filled, the array may be truncated, recording where it was truncated |
55 | | int64_t current_offset_in_array = 0; |
56 | | // the beginning position of the array |
57 | | size_t array_start = 0; |
58 | | // the size of the array |
59 | | int64_t cur_size = 0; |
60 | | // offset of column array |
61 | | const ColumnArray::Offsets64* offsets_ptr = nullptr; |
62 | | // expend data of repeat times |
63 | | int current_repeat_times = 0; |
64 | | // whether the current row of the original block has been extended |
65 | | bool current_row_eos = false; |
66 | | }; |
67 | | |
68 | | class ArrayMapFunction : public LambdaFunction { |
69 | | ENABLE_FACTORY_CREATOR(ArrayMapFunction); |
70 | | |
71 | | public: |
72 | 27 | ~ArrayMapFunction() override = default; |
73 | | |
74 | | static constexpr auto name = "array_map"; |
75 | | |
76 | 27 | static LambdaFunctionPtr create() { return std::make_shared<ArrayMapFunction>(); } |
77 | | |
78 | 0 | std::string get_name() const override { return name; } |
79 | | |
80 | 27 | Status prepare(RuntimeState* state, const VExprSPtrs& children) override { |
81 | 27 | RETURN_IF_ERROR(LambdaFunction::prepare(state, children)); |
82 | 27 | DCHECK_GE(children.size(), 2); |
83 | | |
84 | 27 | return _prepare_lambda_argument_binding(children[0], children.size() - 1, |
85 | 27 | _lambda_argument_binding); |
86 | 27 | } |
87 | | |
88 | | Status execute(VExprContext* context, const Block* block, const Selector* expr_selector, |
89 | | size_t count, ColumnPtr& result_column, const DataTypePtr& result_type, |
90 | 16 | const VExprSPtrs& children) const override { |
91 | 16 | LambdaArgs args_info; |
92 | | |
93 | | ///* array_map(lambda,arg1,arg2,.....) */// |
94 | | //1. child[1:end]->execute(src_block) |
95 | 16 | ColumnsWithTypeAndName arguments(children.size() - 1); |
96 | 34 | for (int i = 1; i < children.size(); ++i) { |
97 | 18 | ColumnPtr column; |
98 | 18 | RETURN_IF_ERROR( |
99 | 18 | children[i]->execute_column(context, block, expr_selector, count, column)); |
100 | 18 | arguments[i - 1].column = column; |
101 | 18 | arguments[i - 1].type = children[i]->execute_type(block); |
102 | 18 | arguments[i - 1].name = children[i]->expr_name(); |
103 | 18 | } |
104 | | |
105 | | // used for save column array outside null map |
106 | 16 | auto outside_null_map = ColumnUInt8::create( |
107 | 16 | arguments[0].column->convert_to_full_column_if_const()->size(), 0); |
108 | | // offset column |
109 | 16 | ColumnPtr array_column_offset; |
110 | 16 | size_t nested_array_column_rows = 0; |
111 | 16 | ColumnPtr first_array_offsets = nullptr; |
112 | | //2. get the result column from executed expr, and the needed is nested column of array |
113 | 16 | std::vector<ColumnPtr> lambda_datas(arguments.size()); |
114 | 16 | DataTypes lambda_argument_types(arguments.size()); |
115 | | |
116 | 34 | for (int i = 0; i < arguments.size(); ++i) { |
117 | 18 | const auto& array_column_type_name = arguments[i]; |
118 | 18 | auto column_array = array_column_type_name.column->convert_to_full_column_if_const(); |
119 | 18 | auto type_array = array_column_type_name.type; |
120 | 18 | if (type_array->is_nullable()) { |
121 | | // get the nullmap of nullable column |
122 | | // hold the null column instead of a reference 'cause `column_array` will be assigned and freed below. |
123 | 1 | DORIS_CHECK(is_column_nullable(*column_array)); |
124 | 1 | auto column_array_nullmap = |
125 | 1 | assert_cast<const ColumnNullable&>(*column_array).get_null_map_column_ptr(); |
126 | | |
127 | | // get the array column from nullable column |
128 | 1 | column_array = assert_cast<const ColumnNullable*>(column_array.get()) |
129 | 1 | ->get_nested_column_ptr(); |
130 | | |
131 | | // get the nested type from nullable type |
132 | 1 | type_array = assert_cast<const DataTypeNullable*>(array_column_type_name.type.get()) |
133 | 1 | ->get_nested_type(); |
134 | | |
135 | | // need to union nullmap from all columns |
136 | 1 | VectorizedUtils::update_null_map(outside_null_map->get_data(), |
137 | 1 | column_array_nullmap->get_data()); |
138 | 1 | } |
139 | | |
140 | | // here is the array column |
141 | 18 | const auto& col_array = assert_cast<const ColumnArray&>(*column_array); |
142 | | |
143 | 18 | if (i == 0) { |
144 | 16 | nested_array_column_rows = col_array.get_data_ptr()->size(); |
145 | 16 | first_array_offsets = col_array.get_offsets_ptr(); |
146 | 16 | array_column_offset = first_array_offsets; |
147 | 16 | args_info.offsets_ptr = &col_array.get_offsets(); |
148 | 16 | } else { |
149 | | // select array_map((x,y)->x+y,c_array1,[0,1,2,3]) from array_test2; |
150 | | // c_array1: [0,1,2,3,4,5,6,7,8,9] |
151 | 2 | const auto& array_offsets = |
152 | 2 | assert_cast<const ColumnArray::ColumnOffsets&>(*first_array_offsets) |
153 | 2 | .get_data(); |
154 | 2 | if (nested_array_column_rows != col_array.get_data_ptr()->size() || |
155 | 2 | (!array_offsets.empty() && |
156 | 2 | memcmp(array_offsets.data(), col_array.get_offsets().data(), |
157 | 2 | sizeof(array_offsets[0]) * array_offsets.size()) != 0)) { |
158 | 0 | return Status::InvalidArgument( |
159 | 0 | "in array map function, the input column size " |
160 | 0 | "are " |
161 | 0 | "not equal completely, nested column data rows 1st size is {}, {}th " |
162 | 0 | "size is {}.", |
163 | 0 | nested_array_column_rows, i + 1, col_array.get_data_ptr()->size()); |
164 | 0 | } |
165 | 2 | } |
166 | 18 | lambda_datas[i] = col_array.get_data_ptr(); |
167 | 18 | const auto& col_type = assert_cast<const DataTypeArray&>(*type_array); |
168 | 18 | lambda_argument_types[i] = col_type.get_nested_type(); |
169 | 18 | } |
170 | 16 | std::set<int> required_input_column_ids; |
171 | 16 | children[0]->collect_slot_column_ids(required_input_column_ids); |
172 | 16 | context->lambda_execution_context().collect_visible_binding_column_positions( |
173 | 16 | required_input_column_ids); |
174 | 16 | const int lambda_argument_base = |
175 | 16 | required_input_column_ids.empty() ? 0 : *required_input_column_ids.rbegin() + 1; |
176 | 16 | if (!_lambda_argument_binding.bind_by_name) { |
177 | 1 | RETURN_IF_ERROR( |
178 | 1 | _set_legacy_lambda_argument_gap(children[0]->get_child(0), lambda_argument_base, |
179 | 1 | _lambda_argument_binding.argument_size)); |
180 | 1 | } |
181 | 16 | std::vector<std::string> names(lambda_argument_base); |
182 | 16 | DataTypes data_types(lambda_argument_base); |
183 | 16 | std::vector<bool> materialized_input_columns(lambda_argument_base, false); |
184 | 16 | names.reserve(lambda_argument_base + arguments.size()); |
185 | 16 | data_types.reserve(lambda_argument_base + arguments.size()); |
186 | 16 | for (int column_id : required_input_column_ids) { |
187 | 12 | if (column_id < 0 || static_cast<size_t>(column_id) >= block->columns()) { |
188 | 0 | return Status::InternalError( |
189 | 0 | "array_map lambda input column id {} is outside input block, block={}", |
190 | 0 | column_id, block->dump_structure()); |
191 | 0 | } |
192 | 12 | materialized_input_columns[column_id] = true; |
193 | 12 | names[column_id] = block->get_by_position(column_id).name; |
194 | 12 | data_types[column_id] = block->get_by_position(column_id).type; |
195 | 12 | } |
196 | 37 | for (int i = 0; i < lambda_argument_base; ++i) { |
197 | 21 | if (!materialized_input_columns[i]) { |
198 | | // Keep sparse input positions stable for SlotRef/parent lambda bindings without |
199 | | // materializing unrelated wide-table columns into every lambda batch. |
200 | 9 | names[i] = "temp"; |
201 | 9 | data_types[i] = std::make_shared<DataTypeUInt8>(); |
202 | 9 | } |
203 | 21 | } |
204 | 34 | for (int i = 0; i < arguments.size(); ++i) { |
205 | 18 | const auto& array_column_type_name = arguments[i]; |
206 | 18 | if (_lambda_argument_binding.bind_by_name && |
207 | 18 | i < _lambda_argument_binding.names.size()) { |
208 | 16 | names.push_back(_lambda_argument_binding.names[i]); |
209 | 16 | } else { |
210 | 2 | names.push_back("R" + array_column_type_name.name); |
211 | 2 | } |
212 | 18 | data_types.push_back(lambda_argument_types[i]); |
213 | 18 | } |
214 | | |
215 | 16 | LambdaExecutionContext::Frame lambda_frame; |
216 | 16 | lambda_frame.bind_by_name = _lambda_argument_binding.bind_by_name; |
217 | 16 | lambda_frame.parent_bindings_visible = true; |
218 | 33 | for (int i = 0; i < _lambda_argument_binding.argument_size; ++i) { |
219 | 17 | const int column_position = lambda_argument_base + i; |
220 | 17 | if (_lambda_argument_binding.bind_by_name) { |
221 | 16 | lambda_frame.argument_bindings.push_back( |
222 | 16 | {_lambda_argument_binding.names[i], column_position}); |
223 | 16 | } |
224 | 17 | } |
225 | 16 | LambdaExecutionContext::FrameGuard lambda_frame_guard(context->lambda_execution_context(), |
226 | 16 | std::move(lambda_frame)); |
227 | | |
228 | | // if column_array is NULL, we know the array_data_column will not write any data, |
229 | | // so the column is empty. eg : (x) -> concat('|',x + "1"). if still execute the lambda function, will cause the bolck rows are not equal |
230 | | // the x column is empty, but "|" is const literal, size of column is 1, so the block rows is 1, but the x column is empty, will be coredump. |
231 | 18 | if (std::ranges::any_of(lambda_datas, [](const auto& v) { return v->empty(); })) { |
232 | 0 | DataTypePtr nested_type; |
233 | 0 | bool is_nullable = result_type->is_nullable(); |
234 | 0 | if (is_nullable) { |
235 | 0 | nested_type = |
236 | 0 | assert_cast<const DataTypeNullable*>(result_type.get())->get_nested_type(); |
237 | 0 | } else { |
238 | 0 | nested_type = result_type; |
239 | 0 | } |
240 | 0 | auto empty_nested_column = assert_cast<const DataTypeArray*>(nested_type.get()) |
241 | 0 | ->get_nested_type() |
242 | 0 | ->create_column(); |
243 | 0 | auto result_array_column = |
244 | 0 | ColumnArray::create(std::move(empty_nested_column), array_column_offset); |
245 | |
|
246 | 0 | if (is_nullable) { |
247 | 0 | result_column = ColumnNullable::create(std::move(result_array_column), |
248 | 0 | std::move(outside_null_map)); |
249 | 0 | } else { |
250 | 0 | result_column = std::move(result_array_column); |
251 | 0 | } |
252 | |
|
253 | 0 | return Status::OK(); |
254 | 0 | } |
255 | | |
256 | 16 | MutableColumnPtr result_col = nullptr; |
257 | 16 | DataTypePtr res_type; |
258 | | |
259 | | //process first row |
260 | 16 | args_info.array_start = (*args_info.offsets_ptr)[args_info.current_row_idx - 1]; |
261 | 16 | args_info.cur_size = |
262 | 16 | (*args_info.offsets_ptr)[args_info.current_row_idx] - args_info.array_start; |
263 | | |
264 | | // lambda block to exectute the lambda, and reuse the memory |
265 | 16 | Block lambda_block; |
266 | 16 | auto column_size = names.size(); |
267 | 16 | MutableColumns columns(column_size); |
268 | 16 | do { |
269 | 16 | bool mem_reuse = lambda_block.mem_reuse(); |
270 | 55 | for (int i = 0; i < column_size; i++) { |
271 | 39 | if (mem_reuse) { |
272 | 0 | columns[i] = lambda_block.get_by_position(i).column->assert_mutable(); |
273 | 39 | } else { |
274 | 39 | columns[i] = data_types[i]->create_column(); |
275 | 39 | } |
276 | 39 | } |
277 | | // batch_size of array nested data every time inorder to avoid memory overflow |
278 | 30 | while (columns[lambda_argument_base]->size() < batch_size) { |
279 | 30 | long max_step = batch_size - columns[lambda_argument_base]->size(); |
280 | 30 | long current_step = std::min( |
281 | 30 | max_step, (long)(args_info.cur_size - args_info.current_offset_in_array)); |
282 | 30 | size_t pos = args_info.array_start + args_info.current_offset_in_array; |
283 | 64 | for (int i = 0; i < arguments.size() && current_step > 0; ++i) { |
284 | 34 | columns[lambda_argument_base + i]->insert_range_from(*lambda_datas[i], pos, |
285 | 34 | current_step); |
286 | 34 | } |
287 | 30 | args_info.current_offset_in_array += current_step; |
288 | 30 | args_info.current_repeat_times += current_step; |
289 | 30 | if (args_info.current_offset_in_array >= args_info.cur_size) { |
290 | 30 | args_info.current_row_eos = true; |
291 | 30 | } |
292 | 30 | _repeat_input_columns(columns, block, args_info.current_repeat_times, |
293 | 30 | materialized_input_columns, args_info.current_row_idx); |
294 | 30 | args_info.current_repeat_times = 0; |
295 | 30 | if (args_info.current_row_eos) { |
296 | | //current row is end of array, move to next row |
297 | 30 | args_info.current_row_idx++; |
298 | 30 | args_info.current_offset_in_array = 0; |
299 | 30 | if (args_info.current_row_idx >= count) { |
300 | 16 | break; |
301 | 16 | } |
302 | 14 | args_info.current_row_eos = false; |
303 | 14 | args_info.array_start = (*args_info.offsets_ptr)[args_info.current_row_idx - 1]; |
304 | 14 | args_info.cur_size = (*args_info.offsets_ptr)[args_info.current_row_idx] - |
305 | 14 | args_info.array_start; |
306 | 14 | } |
307 | 30 | } |
308 | | |
309 | 16 | if (!mem_reuse) { |
310 | 55 | for (int i = 0; i < column_size; ++i) { |
311 | 39 | lambda_block.insert( |
312 | 39 | ColumnWithTypeAndName(std::move(columns[i]), data_types[i], names[i])); |
313 | 39 | } |
314 | 16 | } |
315 | | //3. child[0]->execute(new_block) |
316 | | |
317 | 16 | ColumnPtr res_col; |
318 | | // lambda body executes on the internal lambda_block, not the original block. |
319 | | // The outer expr_selector is irrelevant here, so pass nullptr. |
320 | 16 | RETURN_IF_ERROR(children[0]->execute_column(context, &lambda_block, nullptr, |
321 | 16 | lambda_block.rows(), res_col)); |
322 | 16 | res_col = res_col->convert_to_full_column_if_const(); |
323 | 16 | res_type = children[0]->execute_type(&lambda_block); |
324 | | |
325 | 16 | if (!result_col) { |
326 | 16 | result_col = res_col->clone_empty(); |
327 | 16 | } |
328 | 16 | result_col->insert_range_from(*res_col, 0, res_col->size()); |
329 | 16 | lambda_block.clear_column_data(column_size); |
330 | 16 | } while (args_info.current_row_idx < count); |
331 | | |
332 | | //4. get the result column after execution, reassemble it into a new array column, and return. |
333 | 16 | if (result_type->is_nullable()) { |
334 | 0 | if (res_type->is_nullable()) { |
335 | 0 | result_column = ColumnNullable::create( |
336 | 0 | ColumnArray::create(std::move(result_col), array_column_offset), |
337 | 0 | std::move(outside_null_map)); |
338 | 0 | } else { |
339 | | // deal with eg: select array_map(x -> x is null, [null, 1, 2]); |
340 | | // need to create the nested column null map for column array |
341 | 0 | auto nested_null_map = ColumnUInt8::create(result_col->size(), 0); |
342 | |
|
343 | 0 | result_column = ColumnNullable::create( |
344 | 0 | ColumnArray::create(ColumnNullable::create(std::move(result_col), |
345 | 0 | std::move(nested_null_map)), |
346 | 0 | array_column_offset), |
347 | 0 | std::move(outside_null_map)); |
348 | 0 | } |
349 | 16 | } else { |
350 | 16 | if (res_type->is_nullable()) { |
351 | 4 | result_column = ColumnArray::create(std::move(result_col), array_column_offset); |
352 | 12 | } else { |
353 | 12 | auto nested_null_map = ColumnUInt8::create(result_col->size(), 0); |
354 | | |
355 | 12 | result_column = ColumnArray::create( |
356 | 12 | ColumnNullable::create(std::move(result_col), std::move(nested_null_map)), |
357 | 12 | array_column_offset); |
358 | 12 | } |
359 | 16 | } |
360 | 16 | return Status::OK(); |
361 | 16 | } |
362 | | |
363 | | private: |
364 | | struct LambdaArgumentBinding { |
365 | | bool bind_by_name = true; |
366 | | size_t argument_size = 0; |
367 | | std::vector<std::string> names; |
368 | | }; |
369 | | |
370 | | Status _prepare_lambda_argument_binding(const VExprSPtr& expr, size_t expected_argument_size, |
371 | 27 | LambdaArgumentBinding& argument_binding) const { |
372 | 27 | DORIS_CHECK_EQ(expr->node_type(), TExprNodeType::LAMBDA_FUNCTION_EXPR); |
373 | 27 | const auto* lambda_expr = assert_cast<const VLambdaFunctionExpr*>(expr.get()); |
374 | | |
375 | 27 | argument_binding.argument_size = 0; |
376 | 27 | argument_binding.names.clear(); |
377 | 27 | argument_binding.bind_by_name = lambda_expr->has_argument_names(); |
378 | | |
379 | 27 | if (!argument_binding.bind_by_name) { |
380 | 5 | if (_contains_nested_lambda_call(expr->get_child(0))) { |
381 | 1 | return Status::InternalError( |
382 | 1 | "Cannot resolve nested lambda argument without lambda metadata"); |
383 | 1 | } |
384 | 4 | argument_binding.argument_size = expected_argument_size; |
385 | 4 | argument_binding.names.resize(expected_argument_size); |
386 | 4 | return Status::OK(); |
387 | 5 | } |
388 | | |
389 | 22 | argument_binding.names = lambda_expr->argument_names(); |
390 | 22 | if (argument_binding.names.size() > expected_argument_size) { |
391 | 0 | return Status::InternalError( |
392 | 0 | "lambda argument metadata size exceeds parameter size, maximum={}, actual={}", |
393 | 0 | expected_argument_size, argument_binding.names.size()); |
394 | 0 | } |
395 | 22 | argument_binding.argument_size = argument_binding.names.size(); |
396 | 22 | if (std::ranges::any_of(argument_binding.names, |
397 | 23 | [](const auto& argument_name) { return argument_name.empty(); })) { |
398 | 0 | return Status::InternalError("lambda argument metadata contains empty name"); |
399 | 0 | } |
400 | 22 | return Status::OK(); |
401 | 22 | } |
402 | | |
403 | | Status _set_legacy_lambda_argument_gap(const VExprSPtr& expr, int lambda_argument_base, |
404 | 1 | size_t argument_size) const { |
405 | 1 | if (expr->is_column_ref()) { |
406 | 1 | auto* ref = static_cast<VColumnRef*>(expr.get()); |
407 | 1 | if (ref->column_id() >= 0 && static_cast<size_t>(ref->column_id()) < argument_size) { |
408 | 1 | const int argument_index = ref->column_id(); |
409 | 1 | ref->set_gap(lambda_argument_base + argument_index - ref->column_id()); |
410 | 1 | } |
411 | 1 | return Status::OK(); |
412 | 1 | } |
413 | | |
414 | 0 | for (const auto& child : expr->children()) { |
415 | 0 | RETURN_IF_ERROR( |
416 | 0 | _set_legacy_lambda_argument_gap(child, lambda_argument_base, argument_size)); |
417 | 0 | } |
418 | 0 | return Status::OK(); |
419 | 0 | } |
420 | | |
421 | 9 | bool _is_lambda_call_with_lambda_expr(const VExprSPtr& expr) const { |
422 | 9 | return expr->node_type() == TExprNodeType::LAMBDA_FUNCTION_CALL_EXPR && |
423 | 9 | !expr->children().empty() && |
424 | 9 | expr->children()[0]->node_type() == TExprNodeType::LAMBDA_FUNCTION_EXPR; |
425 | 9 | } |
426 | | |
427 | 9 | bool _contains_nested_lambda_call(const VExprSPtr& expr) const { |
428 | 9 | if (_is_lambda_call_with_lambda_expr(expr)) { |
429 | 1 | return true; |
430 | 1 | } |
431 | 8 | return std::ranges::any_of(expr->children(), [this](const auto& child) { |
432 | 4 | return _contains_nested_lambda_call(child); |
433 | 4 | }); |
434 | 9 | } |
435 | | |
436 | | void _repeat_input_columns(std::vector<MutableColumnPtr>& columns, const Block* block, |
437 | | int repeat_times, |
438 | | const std::vector<bool>& materialized_input_columns, |
439 | 30 | int64_t row_idx) const { |
440 | 30 | if (!repeat_times || materialized_input_columns.empty()) { |
441 | 10 | return; |
442 | 10 | } |
443 | 71 | for (size_t i = 0; i < materialized_input_columns.size(); i++) { |
444 | 51 | if (!materialized_input_columns[i]) { |
445 | 23 | columns[i]->resize(columns[i]->size() + repeat_times); |
446 | 23 | continue; |
447 | 23 | } |
448 | 28 | DORIS_CHECK(block != nullptr); |
449 | 28 | auto src_column = block->get_by_position(i).column->convert_to_full_column_if_const(); |
450 | 28 | if (check_and_get_column<ColumnNothing>(src_column.get())) { |
451 | | // A ColumnNothing in the outer block is a placeholder for an unmaterialized |
452 | | // virtual column. Keep it as a placeholder in the lambda block as well, so |
453 | | // VirtualSlotRef can still materialize it lazily if the lambda body reads it. |
454 | 0 | if (!check_and_get_column<ColumnNothing>(columns[i].get())) { |
455 | 0 | columns[i] = ColumnNothing::create(columns[i]->size()); |
456 | 0 | } |
457 | 0 | } |
458 | 28 | columns[i]->insert_many_from(*src_column, row_idx, repeat_times); |
459 | 28 | } |
460 | 20 | } |
461 | | |
462 | | LambdaArgumentBinding _lambda_argument_binding; |
463 | | }; |
464 | | |
465 | 1 | void register_function_array_map(doris::LambdaFunctionFactory& factory) { |
466 | 1 | factory.register_function<ArrayMapFunction>(); |
467 | 1 | } |
468 | | |
469 | | } // namespace doris |