Coverage Report

Created: 2026-04-13 22:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exprs/lambda_function/varray_map_function.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include <memory>
19
#include <string>
20
#include <vector>
21
22
#include "common/status.h"
23
#include "core/assert_cast.h"
24
#include "core/block/block.h"
25
#include "core/block/column_numbers.h"
26
#include "core/block/column_with_type_and_name.h"
27
#include "core/block/columns_with_type_and_name.h"
28
#include "core/column/column.h"
29
#include "core/column/column_array.h"
30
#include "core/column/column_nullable.h"
31
#include "core/column/column_vector.h"
32
#include "core/data_type/data_type.h"
33
#include "core/data_type/data_type_array.h"
34
#include "core/data_type/data_type_nullable.h"
35
#include "core/data_type/data_type_number.h"
36
#include "exec/common/util.hpp"
37
#include "exprs/aggregate/aggregate_function.h"
38
#include "exprs/lambda_function/lambda_function.h"
39
#include "exprs/lambda_function/lambda_function_factory.h"
40
#include "exprs/vcolumn_ref.h"
41
#include "exprs/vslot_ref.h"
42
43
namespace doris {
44
class VExprContext;
45
46
// extend a block with all required parameters
47
struct LambdaArgs {
48
    // the lambda function need the column ids of all the slots
49
    std::vector<int> output_slot_ref_indexs;
50
    // which line is extended to the original block
51
    int64_t current_row_idx = 0;
52
    // when a block is filled, the array may be truncated, recording where it was truncated
53
    int64_t current_offset_in_array = 0;
54
    // the beginning position of the array
55
    size_t array_start = 0;
56
    // the size of the array
57
    int64_t cur_size = 0;
58
    // offset of column array
59
    const ColumnArray::Offsets64* offsets_ptr = nullptr;
60
    // expend data of repeat times
61
    int current_repeat_times = 0;
62
    // whether the current row of the original block has been extended
63
    bool current_row_eos = false;
64
};
65
66
class ArrayMapFunction : public LambdaFunction {
67
    ENABLE_FACTORY_CREATOR(ArrayMapFunction);
68
69
public:
70
0
    ~ArrayMapFunction() override = default;
71
72
    static constexpr auto name = "array_map";
73
74
0
    static LambdaFunctionPtr create() { return std::make_shared<ArrayMapFunction>(); }
75
76
0
    std::string get_name() const override { return name; }
77
78
    Status execute(VExprContext* context, const Block* block, Selector* expr_selector, size_t count,
79
                   ColumnPtr& result_column, const DataTypePtr& result_type,
80
0
                   const VExprSPtrs& children) const override {
81
0
        LambdaArgs args_info;
82
        // collect used slot ref in lambda function body
83
0
        std::vector<int>& output_slot_ref_indexs = args_info.output_slot_ref_indexs;
84
0
        _collect_slot_ref_column_id(children[0], output_slot_ref_indexs);
85
86
0
        int gap = 0;
87
0
        if (!output_slot_ref_indexs.empty()) {
88
0
            auto max_id =
89
0
                    std::max_element(output_slot_ref_indexs.begin(), output_slot_ref_indexs.end());
90
0
            gap = *max_id + 1;
91
0
            _set_column_ref_column_id(children[0], gap);
92
0
        }
93
94
0
        std::vector<std::string> names(gap);
95
0
        DataTypes data_types(gap);
96
97
0
        for (int i = 0; i < gap; ++i) {
98
0
            if (_contains_column_id(output_slot_ref_indexs, i)) {
99
0
                names[i] = block->get_by_position(i).name;
100
0
                data_types[i] = block->get_by_position(i).type;
101
0
            } else {
102
                // padding some mock data to hold the position, like call block#rows function need
103
0
                names[i] = "temp";
104
0
                data_types[i] = std::make_shared<DataTypeUInt8>();
105
0
            }
106
0
        }
107
108
        ///* array_map(lambda,arg1,arg2,.....) *///
109
        //1. child[1:end]->execute(src_block)
110
0
        ColumnsWithTypeAndName arguments(children.size() - 1);
111
0
        for (int i = 1; i < children.size(); ++i) {
112
0
            ColumnPtr column;
113
0
            RETURN_IF_ERROR(
114
0
                    children[i]->execute_column(context, block, expr_selector, count, column));
115
0
            arguments[i - 1].column = column;
116
0
            arguments[i - 1].type = children[i]->execute_type(block);
117
0
            arguments[i - 1].name = children[i]->expr_name();
118
0
        }
119
120
        // used for save column array outside null map
121
0
        auto outside_null_map = ColumnUInt8::create(
122
0
                arguments[0].column->convert_to_full_column_if_const()->size(), 0);
123
        // offset column
124
0
        MutableColumnPtr array_column_offset;
125
0
        size_t nested_array_column_rows = 0;
126
0
        ColumnPtr first_array_offsets = nullptr;
127
        //2. get the result column from executed expr, and the needed is nested column of array
128
0
        std::vector<ColumnPtr> lambda_datas(arguments.size());
129
130
0
        for (int i = 0; i < arguments.size(); ++i) {
131
0
            const auto& array_column_type_name = arguments[i];
132
0
            auto column_array = array_column_type_name.column->convert_to_full_column_if_const();
133
0
            auto type_array = array_column_type_name.type;
134
0
            if (type_array->is_nullable()) {
135
                // get the nullmap of nullable column
136
                // hold the null column instead of a reference 'cause `column_array` will be assigned and freed below.
137
0
                auto column_array_nullmap =
138
0
                        assert_cast<const ColumnNullable&>(*column_array).get_null_map_column_ptr();
139
140
                // get the array column from nullable column
141
0
                column_array = assert_cast<const ColumnNullable*>(column_array.get())
142
0
                                       ->get_nested_column_ptr();
143
144
                // get the nested type from nullable type
145
0
                type_array = assert_cast<const DataTypeNullable*>(array_column_type_name.type.get())
146
0
                                     ->get_nested_type();
147
148
                // need to union nullmap from all columns
149
0
                VectorizedUtils::update_null_map(
150
0
                        outside_null_map->get_data(),
151
0
                        assert_cast<const ColumnUInt8&>(*column_array_nullmap).get_data());
152
0
            }
153
154
            // here is the array column
155
0
            const auto& col_array = assert_cast<const ColumnArray&>(*column_array);
156
0
            const auto& col_type = assert_cast<const DataTypeArray&>(*type_array);
157
158
0
            if (i == 0) {
159
0
                nested_array_column_rows = col_array.get_data_ptr()->size();
160
0
                first_array_offsets = col_array.get_offsets_ptr();
161
0
                const auto& off_data = assert_cast<const ColumnArray::ColumnOffsets&>(
162
0
                        col_array.get_offsets_column());
163
0
                array_column_offset = off_data.clone_resized(col_array.get_offsets_column().size());
164
0
                args_info.offsets_ptr = &col_array.get_offsets();
165
0
            } else {
166
                // select array_map((x,y)->x+y,c_array1,[0,1,2,3]) from array_test2;
167
                // c_array1: [0,1,2,3,4,5,6,7,8,9]
168
0
                const auto& array_offsets =
169
0
                        assert_cast<const ColumnArray::ColumnOffsets&>(*first_array_offsets)
170
0
                                .get_data();
171
0
                if (nested_array_column_rows != col_array.get_data_ptr()->size() ||
172
0
                    (!array_offsets.empty() &&
173
0
                     memcmp(array_offsets.data(), col_array.get_offsets().data(),
174
0
                            sizeof(array_offsets[0]) * array_offsets.size()) != 0)) {
175
0
                    return Status::InvalidArgument(
176
0
                            "in array map function, the input column size "
177
0
                            "are "
178
0
                            "not equal completely, nested column data rows 1st size is {}, {}th "
179
0
                            "size is {}.",
180
0
                            nested_array_column_rows, i + 1, col_array.get_data_ptr()->size());
181
0
                }
182
0
            }
183
0
            lambda_datas[i] = col_array.get_data_ptr();
184
0
            names.push_back("R" + array_column_type_name.name);
185
0
            data_types.push_back(col_type.get_nested_type());
186
0
        }
187
188
        // if column_array is NULL, we know the array_data_column will not write any data,
189
        // so the column is empty. eg : (x) -> concat('|',x + "1"). if still execute the lambda function, will cause the bolck rows are not equal
190
        // the x column is empty, but "|" is const literal, size of column is 1, so the block rows is 1, but the x column is empty, will be coredump.
191
0
        if (std::any_of(lambda_datas.begin(), lambda_datas.end(),
192
0
                        [](const auto& v) { return v->empty(); })) {
193
0
            DataTypePtr nested_type;
194
0
            bool is_nullable = result_type->is_nullable();
195
0
            if (is_nullable) {
196
0
                nested_type =
197
0
                        assert_cast<const DataTypeNullable*>(result_type.get())->get_nested_type();
198
0
            } else {
199
0
                nested_type = result_type;
200
0
            }
201
0
            auto empty_nested_column = assert_cast<const DataTypeArray*>(nested_type.get())
202
0
                                               ->get_nested_type()
203
0
                                               ->create_column();
204
0
            auto result_array_column = ColumnArray::create(std::move(empty_nested_column),
205
0
                                                           std::move(array_column_offset));
206
207
0
            if (is_nullable) {
208
0
                result_column = ColumnNullable::create(std::move(result_array_column),
209
0
                                                       std::move(outside_null_map));
210
0
            } else {
211
0
                result_column = std::move(result_array_column);
212
0
            }
213
214
0
            return Status::OK();
215
0
        }
216
217
0
        MutableColumnPtr result_col = nullptr;
218
0
        DataTypePtr res_type;
219
220
        //process first row
221
0
        args_info.array_start = (*args_info.offsets_ptr)[args_info.current_row_idx - 1];
222
0
        args_info.cur_size =
223
0
                (*args_info.offsets_ptr)[args_info.current_row_idx] - args_info.array_start;
224
225
        // lambda block to exectute the lambda, and reuse the memory
226
0
        Block lambda_block;
227
0
        auto column_size = names.size();
228
0
        MutableColumns columns(column_size);
229
0
        do {
230
0
            bool mem_reuse = lambda_block.mem_reuse();
231
0
            for (int i = 0; i < column_size; i++) {
232
0
                if (mem_reuse) {
233
0
                    columns[i] = lambda_block.get_by_position(i).column->assume_mutable();
234
0
                } else {
235
0
                    if (_contains_column_id(output_slot_ref_indexs, i) || i >= gap) {
236
                        // TODO: maybe could create const column, so not insert_many_from when extand data
237
                        // but now here handle batch_size of array nested data every time, so maybe have different rows
238
0
                        columns[i] = data_types[i]->create_column();
239
0
                    } else {
240
0
                        columns[i] = data_types[i]
241
0
                                             ->create_column_const_with_default_value(0)
242
0
                                             ->assume_mutable();
243
0
                    }
244
0
                }
245
0
            }
246
            // batch_size of array nested data every time inorder to avoid memory overflow
247
0
            while (columns[gap]->size() < batch_size) {
248
0
                long max_step = batch_size - columns[gap]->size();
249
0
                long current_step = std::min(
250
0
                        max_step, (long)(args_info.cur_size - args_info.current_offset_in_array));
251
0
                size_t pos = args_info.array_start + args_info.current_offset_in_array;
252
0
                for (int i = 0; i < arguments.size() && current_step > 0; ++i) {
253
0
                    columns[gap + i]->insert_range_from(*lambda_datas[i], pos, current_step);
254
0
                }
255
0
                args_info.current_offset_in_array += current_step;
256
0
                args_info.current_repeat_times += current_step;
257
0
                if (args_info.current_offset_in_array >= args_info.cur_size) {
258
0
                    args_info.current_row_eos = true;
259
0
                }
260
0
                _extend_data(columns, block, args_info.current_repeat_times, gap,
261
0
                             args_info.current_row_idx, output_slot_ref_indexs);
262
0
                args_info.current_repeat_times = 0;
263
0
                if (args_info.current_row_eos) {
264
                    //current row is end of array, move to next row
265
0
                    args_info.current_row_idx++;
266
0
                    args_info.current_offset_in_array = 0;
267
0
                    if (args_info.current_row_idx >= count) {
268
0
                        break;
269
0
                    }
270
0
                    args_info.current_row_eos = false;
271
0
                    args_info.array_start = (*args_info.offsets_ptr)[args_info.current_row_idx - 1];
272
0
                    args_info.cur_size = (*args_info.offsets_ptr)[args_info.current_row_idx] -
273
0
                                         args_info.array_start;
274
0
                }
275
0
            }
276
277
0
            if (!mem_reuse) {
278
0
                for (int i = 0; i < column_size; ++i) {
279
0
                    lambda_block.insert(
280
0
                            ColumnWithTypeAndName(std::move(columns[i]), data_types[i], names[i]));
281
0
                }
282
0
            }
283
            //3. child[0]->execute(new_block)
284
285
0
            ColumnPtr res_col;
286
            // lambda body executes on the internal lambda_block, not the original block.
287
            // The outer expr_selector is irrelevant here, so pass nullptr.
288
0
            RETURN_IF_ERROR(children[0]->execute_column(context, &lambda_block, nullptr,
289
0
                                                        lambda_block.rows(), res_col));
290
0
            res_col = res_col->convert_to_full_column_if_const();
291
0
            res_type = children[0]->execute_type(&lambda_block);
292
293
0
            if (!result_col) {
294
0
                result_col = res_col->clone_empty();
295
0
            }
296
0
            result_col->insert_range_from(*res_col, 0, res_col->size());
297
0
            lambda_block.clear_column_data(column_size);
298
0
        } while (args_info.current_row_idx < count);
299
300
        //4. get the result column after execution, reassemble it into a new array column, and return.
301
0
        if (result_type->is_nullable()) {
302
0
            if (res_type->is_nullable()) {
303
0
                result_column = ColumnNullable::create(
304
0
                        ColumnArray::create(std::move(result_col), std::move(array_column_offset)),
305
0
                        std::move(outside_null_map));
306
0
            } else {
307
                // deal with eg: select array_map(x -> x is null, [null, 1, 2]);
308
                // need to create the nested column null map for column array
309
0
                auto nested_null_map = ColumnUInt8::create(result_col->size(), 0);
310
311
0
                result_column = ColumnNullable::create(
312
0
                        ColumnArray::create(ColumnNullable::create(std::move(result_col),
313
0
                                                                   std::move(nested_null_map)),
314
0
                                            std::move(array_column_offset)),
315
0
                        std::move(outside_null_map));
316
0
            }
317
0
        } else {
318
0
            if (res_type->is_nullable()) {
319
0
                result_column =
320
0
                        ColumnArray::create(std::move(result_col), std::move(array_column_offset));
321
0
            } else {
322
0
                auto nested_null_map = ColumnUInt8::create(result_col->size(), 0);
323
324
0
                result_column = ColumnArray::create(
325
0
                        ColumnNullable::create(std::move(result_col), std::move(nested_null_map)),
326
0
                        std::move(array_column_offset));
327
0
            }
328
0
        }
329
0
        return Status::OK();
330
0
    }
331
332
private:
333
0
    bool _contains_column_id(const std::vector<int>& output_slot_ref_indexs, int id) const {
334
0
        const auto it = std::find(output_slot_ref_indexs.begin(), output_slot_ref_indexs.end(), id);
335
0
        return it != output_slot_ref_indexs.end();
336
0
    }
337
338
0
    void _set_column_ref_column_id(VExprSPtr expr, int gap) const {
339
0
        for (const auto& child : expr->children()) {
340
0
            if (child->is_column_ref()) {
341
0
                auto* ref = static_cast<VColumnRef*>(child.get());
342
0
                ref->set_gap(gap);
343
0
            } else {
344
0
                _set_column_ref_column_id(child, gap);
345
0
            }
346
0
        }
347
0
    }
348
349
    void _collect_slot_ref_column_id(VExprSPtr expr,
350
0
                                     std::vector<int>& output_slot_ref_indexs) const {
351
0
        for (const auto& child : expr->children()) {
352
0
            if (child->is_slot_ref()) {
353
0
                const auto* ref = static_cast<VSlotRef*>(child.get());
354
0
                output_slot_ref_indexs.push_back(ref->column_id());
355
0
            } else {
356
0
                _collect_slot_ref_column_id(child, output_slot_ref_indexs);
357
0
            }
358
0
        }
359
0
    }
360
361
    void _extend_data(std::vector<MutableColumnPtr>& columns, const Block* block,
362
                      int current_repeat_times, int size, int64_t current_row_idx,
363
0
                      const std::vector<int>& output_slot_ref_indexs) const {
364
0
        if (!current_repeat_times || !size) {
365
0
            return;
366
0
        }
367
0
        for (int i = 0; i < size; i++) {
368
0
            if (_contains_column_id(output_slot_ref_indexs, i)) {
369
0
                auto src_column =
370
0
                        block->get_by_position(i).column->convert_to_full_column_if_const();
371
0
                columns[i]->insert_many_from(*src_column, current_row_idx, current_repeat_times);
372
0
            } else {
373
                // must be column const
374
0
                DCHECK(is_column_const(*columns[i]));
375
0
                columns[i]->resize(columns[i]->size() + current_repeat_times);
376
0
            }
377
0
        }
378
0
    }
379
};
380
381
0
void register_function_array_map(doris::LambdaFunctionFactory& factory) {
382
0
    factory.register_function<ArrayMapFunction>();
383
0
}
384
385
} // namespace doris