be/src/exprs/lambda_function/varray_map_function.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include <memory> |
19 | | #include <string> |
20 | | #include <vector> |
21 | | |
22 | | #include "common/status.h" |
23 | | #include "core/assert_cast.h" |
24 | | #include "core/block/block.h" |
25 | | #include "core/block/column_numbers.h" |
26 | | #include "core/block/column_with_type_and_name.h" |
27 | | #include "core/block/columns_with_type_and_name.h" |
28 | | #include "core/column/column.h" |
29 | | #include "core/column/column_array.h" |
30 | | #include "core/column/column_nullable.h" |
31 | | #include "core/column/column_vector.h" |
32 | | #include "core/data_type/data_type.h" |
33 | | #include "core/data_type/data_type_array.h" |
34 | | #include "core/data_type/data_type_nullable.h" |
35 | | #include "core/data_type/data_type_number.h" |
36 | | #include "exec/common/util.hpp" |
37 | | #include "exprs/aggregate/aggregate_function.h" |
38 | | #include "exprs/lambda_function/lambda_function.h" |
39 | | #include "exprs/lambda_function/lambda_function_factory.h" |
40 | | #include "exprs/vcolumn_ref.h" |
41 | | #include "exprs/vslot_ref.h" |
42 | | |
43 | | namespace doris { |
44 | | #include "common/compile_check_begin.h" |
45 | | class VExprContext; |
46 | | |
47 | | // extend a block with all required parameters |
48 | | struct LambdaArgs { |
49 | | // the lambda function need the column ids of all the slots |
50 | | std::vector<int> output_slot_ref_indexs; |
51 | | // which line is extended to the original block |
52 | | int64_t current_row_idx = 0; |
53 | | // when a block is filled, the array may be truncated, recording where it was truncated |
54 | | int64_t current_offset_in_array = 0; |
55 | | // the beginning position of the array |
56 | | size_t array_start = 0; |
57 | | // the size of the array |
58 | | int64_t cur_size = 0; |
59 | | // offset of column array |
60 | | const ColumnArray::Offsets64* offsets_ptr = nullptr; |
61 | | // expend data of repeat times |
62 | | int current_repeat_times = 0; |
63 | | // whether the current row of the original block has been extended |
64 | | bool current_row_eos = false; |
65 | | }; |
66 | | |
67 | | class ArrayMapFunction : public LambdaFunction { |
68 | | ENABLE_FACTORY_CREATOR(ArrayMapFunction); |
69 | | |
70 | | public: |
71 | 0 | ~ArrayMapFunction() override = default; |
72 | | |
73 | | static constexpr auto name = "array_map"; |
74 | | |
75 | 0 | static LambdaFunctionPtr create() { return std::make_shared<ArrayMapFunction>(); } |
76 | | |
77 | 0 | std::string get_name() const override { return name; } |
78 | | |
79 | | Status execute(VExprContext* context, const Block* block, Selector* expr_selector, size_t count, |
80 | | ColumnPtr& result_column, const DataTypePtr& result_type, |
81 | 0 | const VExprSPtrs& children) const override { |
82 | 0 | LambdaArgs args_info; |
83 | | // collect used slot ref in lambda function body |
84 | 0 | std::vector<int>& output_slot_ref_indexs = args_info.output_slot_ref_indexs; |
85 | 0 | _collect_slot_ref_column_id(children[0], output_slot_ref_indexs); |
86 | |
|
87 | 0 | int gap = 0; |
88 | 0 | if (!output_slot_ref_indexs.empty()) { |
89 | 0 | auto max_id = |
90 | 0 | std::max_element(output_slot_ref_indexs.begin(), output_slot_ref_indexs.end()); |
91 | 0 | gap = *max_id + 1; |
92 | 0 | _set_column_ref_column_id(children[0], gap); |
93 | 0 | } |
94 | |
|
95 | 0 | std::vector<std::string> names(gap); |
96 | 0 | DataTypes data_types(gap); |
97 | |
|
98 | 0 | for (int i = 0; i < gap; ++i) { |
99 | 0 | if (_contains_column_id(output_slot_ref_indexs, i)) { |
100 | 0 | names[i] = block->get_by_position(i).name; |
101 | 0 | data_types[i] = block->get_by_position(i).type; |
102 | 0 | } else { |
103 | | // padding some mock data to hold the position, like call block#rows function need |
104 | 0 | names[i] = "temp"; |
105 | 0 | data_types[i] = std::make_shared<DataTypeUInt8>(); |
106 | 0 | } |
107 | 0 | } |
108 | | |
109 | | ///* array_map(lambda,arg1,arg2,.....) */// |
110 | | //1. child[1:end]->execute(src_block) |
111 | 0 | ColumnsWithTypeAndName arguments(children.size() - 1); |
112 | 0 | for (int i = 1; i < children.size(); ++i) { |
113 | 0 | ColumnPtr column; |
114 | 0 | RETURN_IF_ERROR( |
115 | 0 | children[i]->execute_column(context, block, expr_selector, count, column)); |
116 | 0 | arguments[i - 1].column = column; |
117 | 0 | arguments[i - 1].type = children[i]->execute_type(block); |
118 | 0 | arguments[i - 1].name = children[i]->expr_name(); |
119 | 0 | } |
120 | | |
121 | | // used for save column array outside null map |
122 | 0 | auto outside_null_map = ColumnUInt8::create( |
123 | 0 | arguments[0].column->convert_to_full_column_if_const()->size(), 0); |
124 | | // offset column |
125 | 0 | MutableColumnPtr array_column_offset; |
126 | 0 | size_t nested_array_column_rows = 0; |
127 | 0 | ColumnPtr first_array_offsets = nullptr; |
128 | | //2. get the result column from executed expr, and the needed is nested column of array |
129 | 0 | std::vector<ColumnPtr> lambda_datas(arguments.size()); |
130 | |
|
131 | 0 | for (int i = 0; i < arguments.size(); ++i) { |
132 | 0 | const auto& array_column_type_name = arguments[i]; |
133 | 0 | auto column_array = array_column_type_name.column->convert_to_full_column_if_const(); |
134 | 0 | auto type_array = array_column_type_name.type; |
135 | 0 | if (type_array->is_nullable()) { |
136 | | // get the nullmap of nullable column |
137 | | // hold the null column instead of a reference 'cause `column_array` will be assigned and freed below. |
138 | 0 | auto column_array_nullmap = |
139 | 0 | assert_cast<const ColumnNullable&>(*column_array).get_null_map_column_ptr(); |
140 | | |
141 | | // get the array column from nullable column |
142 | 0 | column_array = assert_cast<const ColumnNullable*>(column_array.get()) |
143 | 0 | ->get_nested_column_ptr(); |
144 | | |
145 | | // get the nested type from nullable type |
146 | 0 | type_array = assert_cast<const DataTypeNullable*>(array_column_type_name.type.get()) |
147 | 0 | ->get_nested_type(); |
148 | | |
149 | | // need to union nullmap from all columns |
150 | 0 | VectorizedUtils::update_null_map( |
151 | 0 | outside_null_map->get_data(), |
152 | 0 | assert_cast<const ColumnUInt8&>(*column_array_nullmap).get_data()); |
153 | 0 | } |
154 | | |
155 | | // here is the array column |
156 | 0 | const auto& col_array = assert_cast<const ColumnArray&>(*column_array); |
157 | 0 | const auto& col_type = assert_cast<const DataTypeArray&>(*type_array); |
158 | |
|
159 | 0 | if (i == 0) { |
160 | 0 | nested_array_column_rows = col_array.get_data_ptr()->size(); |
161 | 0 | first_array_offsets = col_array.get_offsets_ptr(); |
162 | 0 | const auto& off_data = assert_cast<const ColumnArray::ColumnOffsets&>( |
163 | 0 | col_array.get_offsets_column()); |
164 | 0 | array_column_offset = off_data.clone_resized(col_array.get_offsets_column().size()); |
165 | 0 | args_info.offsets_ptr = &col_array.get_offsets(); |
166 | 0 | } else { |
167 | | // select array_map((x,y)->x+y,c_array1,[0,1,2,3]) from array_test2; |
168 | | // c_array1: [0,1,2,3,4,5,6,7,8,9] |
169 | 0 | const auto& array_offsets = |
170 | 0 | assert_cast<const ColumnArray::ColumnOffsets&>(*first_array_offsets) |
171 | 0 | .get_data(); |
172 | 0 | if (nested_array_column_rows != col_array.get_data_ptr()->size() || |
173 | 0 | (!array_offsets.empty() && |
174 | 0 | memcmp(array_offsets.data(), col_array.get_offsets().data(), |
175 | 0 | sizeof(array_offsets[0]) * array_offsets.size()) != 0)) { |
176 | 0 | return Status::InvalidArgument( |
177 | 0 | "in array map function, the input column size " |
178 | 0 | "are " |
179 | 0 | "not equal completely, nested column data rows 1st size is {}, {}th " |
180 | 0 | "size is {}.", |
181 | 0 | nested_array_column_rows, i + 1, col_array.get_data_ptr()->size()); |
182 | 0 | } |
183 | 0 | } |
184 | 0 | lambda_datas[i] = col_array.get_data_ptr(); |
185 | 0 | names.push_back("R" + array_column_type_name.name); |
186 | 0 | data_types.push_back(col_type.get_nested_type()); |
187 | 0 | } |
188 | | |
189 | | // if column_array is NULL, we know the array_data_column will not write any data, |
190 | | // so the column is empty. eg : (x) -> concat('|',x + "1"). if still execute the lambda function, will cause the bolck rows are not equal |
191 | | // the x column is empty, but "|" is const literal, size of column is 1, so the block rows is 1, but the x column is empty, will be coredump. |
192 | 0 | if (std::any_of(lambda_datas.begin(), lambda_datas.end(), |
193 | 0 | [](const auto& v) { return v->empty(); })) { |
194 | 0 | DataTypePtr nested_type; |
195 | 0 | bool is_nullable = result_type->is_nullable(); |
196 | 0 | if (is_nullable) { |
197 | 0 | nested_type = |
198 | 0 | assert_cast<const DataTypeNullable*>(result_type.get())->get_nested_type(); |
199 | 0 | } else { |
200 | 0 | nested_type = result_type; |
201 | 0 | } |
202 | 0 | auto empty_nested_column = assert_cast<const DataTypeArray*>(nested_type.get()) |
203 | 0 | ->get_nested_type() |
204 | 0 | ->create_column(); |
205 | 0 | auto result_array_column = ColumnArray::create(std::move(empty_nested_column), |
206 | 0 | std::move(array_column_offset)); |
207 | |
|
208 | 0 | if (is_nullable) { |
209 | 0 | result_column = ColumnNullable::create(std::move(result_array_column), |
210 | 0 | std::move(outside_null_map)); |
211 | 0 | } else { |
212 | 0 | result_column = std::move(result_array_column); |
213 | 0 | } |
214 | |
|
215 | 0 | return Status::OK(); |
216 | 0 | } |
217 | | |
218 | 0 | MutableColumnPtr result_col = nullptr; |
219 | 0 | DataTypePtr res_type; |
220 | | |
221 | | //process first row |
222 | 0 | args_info.array_start = (*args_info.offsets_ptr)[args_info.current_row_idx - 1]; |
223 | 0 | args_info.cur_size = |
224 | 0 | (*args_info.offsets_ptr)[args_info.current_row_idx] - args_info.array_start; |
225 | | |
226 | | // lambda block to exectute the lambda, and reuse the memory |
227 | 0 | Block lambda_block; |
228 | 0 | auto column_size = names.size(); |
229 | 0 | MutableColumns columns(column_size); |
230 | 0 | do { |
231 | 0 | bool mem_reuse = lambda_block.mem_reuse(); |
232 | 0 | for (int i = 0; i < column_size; i++) { |
233 | 0 | if (mem_reuse) { |
234 | 0 | columns[i] = lambda_block.get_by_position(i).column->assume_mutable(); |
235 | 0 | } else { |
236 | 0 | if (_contains_column_id(output_slot_ref_indexs, i) || i >= gap) { |
237 | | // TODO: maybe could create const column, so not insert_many_from when extand data |
238 | | // but now here handle batch_size of array nested data every time, so maybe have different rows |
239 | 0 | columns[i] = data_types[i]->create_column(); |
240 | 0 | } else { |
241 | 0 | columns[i] = data_types[i] |
242 | 0 | ->create_column_const_with_default_value(0) |
243 | 0 | ->assume_mutable(); |
244 | 0 | } |
245 | 0 | } |
246 | 0 | } |
247 | | // batch_size of array nested data every time inorder to avoid memory overflow |
248 | 0 | while (columns[gap]->size() < batch_size) { |
249 | 0 | long max_step = batch_size - columns[gap]->size(); |
250 | 0 | long current_step = std::min( |
251 | 0 | max_step, (long)(args_info.cur_size - args_info.current_offset_in_array)); |
252 | 0 | size_t pos = args_info.array_start + args_info.current_offset_in_array; |
253 | 0 | for (int i = 0; i < arguments.size() && current_step > 0; ++i) { |
254 | 0 | columns[gap + i]->insert_range_from(*lambda_datas[i], pos, current_step); |
255 | 0 | } |
256 | 0 | args_info.current_offset_in_array += current_step; |
257 | 0 | args_info.current_repeat_times += current_step; |
258 | 0 | if (args_info.current_offset_in_array >= args_info.cur_size) { |
259 | 0 | args_info.current_row_eos = true; |
260 | 0 | } |
261 | 0 | _extend_data(columns, block, args_info.current_repeat_times, gap, |
262 | 0 | args_info.current_row_idx, output_slot_ref_indexs); |
263 | 0 | args_info.current_repeat_times = 0; |
264 | 0 | if (args_info.current_row_eos) { |
265 | | //current row is end of array, move to next row |
266 | 0 | args_info.current_row_idx++; |
267 | 0 | args_info.current_offset_in_array = 0; |
268 | 0 | if (args_info.current_row_idx >= count) { |
269 | 0 | break; |
270 | 0 | } |
271 | 0 | args_info.current_row_eos = false; |
272 | 0 | args_info.array_start = (*args_info.offsets_ptr)[args_info.current_row_idx - 1]; |
273 | 0 | args_info.cur_size = (*args_info.offsets_ptr)[args_info.current_row_idx] - |
274 | 0 | args_info.array_start; |
275 | 0 | } |
276 | 0 | } |
277 | |
|
278 | 0 | if (!mem_reuse) { |
279 | 0 | for (int i = 0; i < column_size; ++i) { |
280 | 0 | lambda_block.insert( |
281 | 0 | ColumnWithTypeAndName(std::move(columns[i]), data_types[i], names[i])); |
282 | 0 | } |
283 | 0 | } |
284 | | //3. child[0]->execute(new_block) |
285 | |
|
286 | 0 | ColumnPtr res_col; |
287 | | // lambda body executes on the internal lambda_block, not the original block. |
288 | | // The outer expr_selector is irrelevant here, so pass nullptr. |
289 | 0 | RETURN_IF_ERROR(children[0]->execute_column(context, &lambda_block, nullptr, |
290 | 0 | lambda_block.rows(), res_col)); |
291 | 0 | res_col = res_col->convert_to_full_column_if_const(); |
292 | 0 | res_type = children[0]->execute_type(&lambda_block); |
293 | |
|
294 | 0 | if (!result_col) { |
295 | 0 | result_col = res_col->clone_empty(); |
296 | 0 | } |
297 | 0 | result_col->insert_range_from(*res_col, 0, res_col->size()); |
298 | 0 | lambda_block.clear_column_data(column_size); |
299 | 0 | } while (args_info.current_row_idx < count); |
300 | | |
301 | | //4. get the result column after execution, reassemble it into a new array column, and return. |
302 | 0 | if (result_type->is_nullable()) { |
303 | 0 | if (res_type->is_nullable()) { |
304 | 0 | result_column = ColumnNullable::create( |
305 | 0 | ColumnArray::create(std::move(result_col), std::move(array_column_offset)), |
306 | 0 | std::move(outside_null_map)); |
307 | 0 | } else { |
308 | | // deal with eg: select array_map(x -> x is null, [null, 1, 2]); |
309 | | // need to create the nested column null map for column array |
310 | 0 | auto nested_null_map = ColumnUInt8::create(result_col->size(), 0); |
311 | |
|
312 | 0 | result_column = ColumnNullable::create( |
313 | 0 | ColumnArray::create(ColumnNullable::create(std::move(result_col), |
314 | 0 | std::move(nested_null_map)), |
315 | 0 | std::move(array_column_offset)), |
316 | 0 | std::move(outside_null_map)); |
317 | 0 | } |
318 | 0 | } else { |
319 | 0 | if (res_type->is_nullable()) { |
320 | 0 | result_column = |
321 | 0 | ColumnArray::create(std::move(result_col), std::move(array_column_offset)); |
322 | 0 | } else { |
323 | 0 | auto nested_null_map = ColumnUInt8::create(result_col->size(), 0); |
324 | |
|
325 | 0 | result_column = ColumnArray::create( |
326 | 0 | ColumnNullable::create(std::move(result_col), std::move(nested_null_map)), |
327 | 0 | std::move(array_column_offset)); |
328 | 0 | } |
329 | 0 | } |
330 | 0 | return Status::OK(); |
331 | 0 | } |
332 | | |
333 | | private: |
334 | 0 | bool _contains_column_id(const std::vector<int>& output_slot_ref_indexs, int id) const { |
335 | 0 | const auto it = std::find(output_slot_ref_indexs.begin(), output_slot_ref_indexs.end(), id); |
336 | 0 | return it != output_slot_ref_indexs.end(); |
337 | 0 | } |
338 | | |
339 | 0 | void _set_column_ref_column_id(VExprSPtr expr, int gap) const { |
340 | 0 | for (const auto& child : expr->children()) { |
341 | 0 | if (child->is_column_ref()) { |
342 | 0 | auto* ref = static_cast<VColumnRef*>(child.get()); |
343 | 0 | ref->set_gap(gap); |
344 | 0 | } else { |
345 | 0 | _set_column_ref_column_id(child, gap); |
346 | 0 | } |
347 | 0 | } |
348 | 0 | } |
349 | | |
350 | | void _collect_slot_ref_column_id(VExprSPtr expr, |
351 | 0 | std::vector<int>& output_slot_ref_indexs) const { |
352 | 0 | for (const auto& child : expr->children()) { |
353 | 0 | if (child->is_slot_ref()) { |
354 | 0 | const auto* ref = static_cast<VSlotRef*>(child.get()); |
355 | 0 | output_slot_ref_indexs.push_back(ref->column_id()); |
356 | 0 | } else { |
357 | 0 | _collect_slot_ref_column_id(child, output_slot_ref_indexs); |
358 | 0 | } |
359 | 0 | } |
360 | 0 | } |
361 | | |
362 | | void _extend_data(std::vector<MutableColumnPtr>& columns, const Block* block, |
363 | | int current_repeat_times, int size, int64_t current_row_idx, |
364 | 0 | const std::vector<int>& output_slot_ref_indexs) const { |
365 | 0 | if (!current_repeat_times || !size) { |
366 | 0 | return; |
367 | 0 | } |
368 | 0 | for (int i = 0; i < size; i++) { |
369 | 0 | if (_contains_column_id(output_slot_ref_indexs, i)) { |
370 | 0 | auto src_column = |
371 | 0 | block->get_by_position(i).column->convert_to_full_column_if_const(); |
372 | 0 | columns[i]->insert_many_from(*src_column, current_row_idx, current_repeat_times); |
373 | 0 | } else { |
374 | | // must be column const |
375 | 0 | DCHECK(is_column_const(*columns[i])); |
376 | 0 | columns[i]->resize(columns[i]->size() + current_repeat_times); |
377 | 0 | } |
378 | 0 | } |
379 | 0 | } |
380 | | }; |
381 | | |
382 | 0 | void register_function_array_map(doris::LambdaFunctionFactory& factory) { |
383 | 0 | factory.register_function<ArrayMapFunction>(); |
384 | 0 | } |
385 | | |
386 | | #include "common/compile_check_end.h" |
387 | | } // namespace doris |