Coverage Report

Created: 2026-06-02 13:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exprs/lambda_function/varray_map_function.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include <memory>
19
#include <string>
20
#include <vector>
21
22
#include "common/status.h"
23
#include "core/assert_cast.h"
24
#include "core/block/block.h"
25
#include "core/block/column_numbers.h"
26
#include "core/block/column_with_type_and_name.h"
27
#include "core/block/columns_with_type_and_name.h"
28
#include "core/column/column.h"
29
#include "core/column/column_array.h"
30
#include "core/column/column_nullable.h"
31
#include "core/column/column_vector.h"
32
#include "core/data_type/data_type.h"
33
#include "core/data_type/data_type_array.h"
34
#include "core/data_type/data_type_nullable.h"
35
#include "core/data_type/data_type_number.h"
36
#include "exec/common/util.hpp"
37
#include "exprs/aggregate/aggregate_function.h"
38
#include "exprs/lambda_function/lambda_function.h"
39
#include "exprs/lambda_function/lambda_function_factory.h"
40
#include "exprs/vcolumn_ref.h"
41
#include "exprs/vslot_ref.h"
42
43
namespace doris {
44
class VExprContext;
45
46
// extend a block with all required parameters
47
struct LambdaArgs {
48
    // the lambda function need the column ids of all the slots
49
    std::vector<int> output_slot_ref_indexs;
50
    // which line is extended to the original block
51
    int64_t current_row_idx = 0;
52
    // when a block is filled, the array may be truncated, recording where it was truncated
53
    int64_t current_offset_in_array = 0;
54
    // the beginning position of the array
55
    size_t array_start = 0;
56
    // the size of the array
57
    int64_t cur_size = 0;
58
    // offset of column array
59
    const ColumnArray::Offsets64* offsets_ptr = nullptr;
60
    // expend data of repeat times
61
    int current_repeat_times = 0;
62
    // whether the current row of the original block has been extended
63
    bool current_row_eos = false;
64
};
65
66
class ArrayMapFunction : public LambdaFunction {
67
    ENABLE_FACTORY_CREATOR(ArrayMapFunction);
68
69
public:
70
0
    ~ArrayMapFunction() override = default;
71
72
    static constexpr auto name = "array_map";
73
74
0
    static LambdaFunctionPtr create() { return std::make_shared<ArrayMapFunction>(); }
75
76
0
    std::string get_name() const override { return name; }
77
78
    Status execute(VExprContext* context, const Block* block, const Selector* expr_selector,
79
                   size_t count, ColumnPtr& result_column, const DataTypePtr& result_type,
80
0
                   const VExprSPtrs& children) const override {
81
0
        LambdaArgs args_info;
82
        // collect used slot ref in lambda function body
83
0
        std::vector<int>& output_slot_ref_indexs = args_info.output_slot_ref_indexs;
84
0
        _collect_slot_ref_column_id(children[0], output_slot_ref_indexs);
85
86
0
        int gap = 0;
87
0
        if (!output_slot_ref_indexs.empty()) {
88
0
            auto max_id =
89
0
                    std::max_element(output_slot_ref_indexs.begin(), output_slot_ref_indexs.end());
90
0
            gap = *max_id + 1;
91
0
            _set_column_ref_column_id(children[0], gap);
92
0
        }
93
94
0
        std::vector<std::string> names(gap);
95
0
        DataTypes data_types(gap);
96
97
0
        for (int i = 0; i < gap; ++i) {
98
0
            if (_contains_column_id(output_slot_ref_indexs, i)) {
99
0
                names[i] = block->get_by_position(i).name;
100
0
                data_types[i] = block->get_by_position(i).type;
101
0
            } else {
102
                // padding some mock data to hold the position, like call block#rows function need
103
0
                names[i] = "temp";
104
0
                data_types[i] = std::make_shared<DataTypeUInt8>();
105
0
            }
106
0
        }
107
108
        ///* array_map(lambda,arg1,arg2,.....) *///
109
        //1. child[1:end]->execute(src_block)
110
0
        ColumnsWithTypeAndName arguments(children.size() - 1);
111
0
        for (int i = 1; i < children.size(); ++i) {
112
0
            ColumnPtr column;
113
0
            RETURN_IF_ERROR(
114
0
                    children[i]->execute_column(context, block, expr_selector, count, column));
115
0
            arguments[i - 1].column = column;
116
0
            arguments[i - 1].type = children[i]->execute_type(block);
117
0
            arguments[i - 1].name = children[i]->expr_name();
118
0
        }
119
120
        // used for save column array outside null map
121
0
        auto outside_null_map = ColumnUInt8::create(
122
0
                arguments[0].column->convert_to_full_column_if_const()->size(), 0);
123
        // offset column
124
0
        MutableColumnPtr array_column_offset;
125
0
        size_t nested_array_column_rows = 0;
126
0
        ColumnPtr first_array_offsets = nullptr;
127
        //2. get the result column from executed expr, and the needed is nested column of array
128
0
        std::vector<ColumnPtr> lambda_datas(arguments.size());
129
130
0
        for (int i = 0; i < arguments.size(); ++i) {
131
0
            const auto& array_column_type_name = arguments[i];
132
0
            auto column_array = array_column_type_name.column->convert_to_full_column_if_const();
133
0
            auto type_array = array_column_type_name.type;
134
0
            if (type_array->is_nullable()) {
135
                // get the nullmap of nullable column
136
                // hold the null column instead of a reference 'cause `column_array` will be assigned and freed below.
137
0
                auto column_array_nullmap =
138
0
                        assert_cast<const ColumnNullable&>(*column_array).get_null_map_column_ptr();
139
140
                // get the array column from nullable column
141
0
                column_array = assert_cast<const ColumnNullable*>(column_array.get())
142
0
                                       ->get_nested_column_ptr();
143
144
                // get the nested type from nullable type
145
0
                type_array = assert_cast<const DataTypeNullable*>(array_column_type_name.type.get())
146
0
                                     ->get_nested_type();
147
148
                // need to union nullmap from all columns
149
0
                VectorizedUtils::update_null_map(outside_null_map->get_data(),
150
0
                                                 column_array_nullmap->get_data());
151
0
            }
152
153
            // here is the array column
154
0
            const auto& col_array = assert_cast<const ColumnArray&>(*column_array);
155
0
            const auto& col_type = assert_cast<const DataTypeArray&>(*type_array);
156
157
0
            if (i == 0) {
158
0
                nested_array_column_rows = col_array.get_data_ptr()->size();
159
0
                first_array_offsets = col_array.get_offsets_ptr();
160
0
                const auto& off_data = col_array.get_offsets_column();
161
0
                array_column_offset = off_data.clone_resized(col_array.get_offsets_column().size());
162
0
                args_info.offsets_ptr = &col_array.get_offsets();
163
0
            } else {
164
                // select array_map((x,y)->x+y,c_array1,[0,1,2,3]) from array_test2;
165
                // c_array1: [0,1,2,3,4,5,6,7,8,9]
166
0
                const auto& array_offsets =
167
0
                        assert_cast<const ColumnArray::ColumnOffsets&>(*first_array_offsets)
168
0
                                .get_data();
169
0
                if (nested_array_column_rows != col_array.get_data_ptr()->size() ||
170
0
                    (!array_offsets.empty() &&
171
0
                     memcmp(array_offsets.data(), col_array.get_offsets().data(),
172
0
                            sizeof(array_offsets[0]) * array_offsets.size()) != 0)) {
173
0
                    return Status::InvalidArgument(
174
0
                            "in array map function, the input column size "
175
0
                            "are "
176
0
                            "not equal completely, nested column data rows 1st size is {}, {}th "
177
0
                            "size is {}.",
178
0
                            nested_array_column_rows, i + 1, col_array.get_data_ptr()->size());
179
0
                }
180
0
            }
181
0
            lambda_datas[i] = col_array.get_data_ptr();
182
0
            names.push_back("R" + array_column_type_name.name);
183
0
            data_types.push_back(col_type.get_nested_type());
184
0
        }
185
186
        // if column_array is NULL, we know the array_data_column will not write any data,
187
        // so the column is empty. eg : (x) -> concat('|',x + "1"). if still execute the lambda function, will cause the bolck rows are not equal
188
        // the x column is empty, but "|" is const literal, size of column is 1, so the block rows is 1, but the x column is empty, will be coredump.
189
0
        if (std::any_of(lambda_datas.begin(), lambda_datas.end(),
190
0
                        [](const auto& v) { return v->empty(); })) {
191
0
            DataTypePtr nested_type;
192
0
            bool is_nullable = result_type->is_nullable();
193
0
            if (is_nullable) {
194
0
                nested_type =
195
0
                        assert_cast<const DataTypeNullable*>(result_type.get())->get_nested_type();
196
0
            } else {
197
0
                nested_type = result_type;
198
0
            }
199
0
            auto empty_nested_column = assert_cast<const DataTypeArray*>(nested_type.get())
200
0
                                               ->get_nested_type()
201
0
                                               ->create_column();
202
0
            auto result_array_column = ColumnArray::create(std::move(empty_nested_column),
203
0
                                                           std::move(array_column_offset));
204
205
0
            if (is_nullable) {
206
0
                result_column = ColumnNullable::create(std::move(result_array_column),
207
0
                                                       std::move(outside_null_map));
208
0
            } else {
209
0
                result_column = std::move(result_array_column);
210
0
            }
211
212
0
            return Status::OK();
213
0
        }
214
215
0
        MutableColumnPtr result_col = nullptr;
216
0
        DataTypePtr res_type;
217
218
        //process first row
219
0
        args_info.array_start = (*args_info.offsets_ptr)[args_info.current_row_idx - 1];
220
0
        args_info.cur_size =
221
0
                (*args_info.offsets_ptr)[args_info.current_row_idx] - args_info.array_start;
222
223
        // lambda block to exectute the lambda, and reuse the memory
224
0
        Block lambda_block;
225
0
        auto column_size = names.size();
226
0
        MutableColumns columns(column_size);
227
0
        do {
228
0
            bool mem_reuse = lambda_block.mem_reuse();
229
0
            for (int i = 0; i < column_size; i++) {
230
0
                if (mem_reuse) {
231
0
                    columns[i] = lambda_block.get_by_position(i).column->assert_mutable();
232
0
                } else {
233
0
                    if (_contains_column_id(output_slot_ref_indexs, i) || i >= gap) {
234
                        // TODO: maybe could create const column, so not insert_many_from when extand data
235
                        // but now here handle batch_size of array nested data every time, so maybe have different rows
236
0
                        columns[i] = data_types[i]->create_column();
237
0
                    } else {
238
0
                        columns[i] = data_types[i]
239
0
                                             ->create_column_const_with_default_value(0)
240
0
                                             ->assert_mutable();
241
0
                    }
242
0
                }
243
0
            }
244
            // batch_size of array nested data every time inorder to avoid memory overflow
245
0
            while (columns[gap]->size() < batch_size) {
246
0
                long max_step = batch_size - columns[gap]->size();
247
0
                long current_step = std::min(
248
0
                        max_step, (long)(args_info.cur_size - args_info.current_offset_in_array));
249
0
                size_t pos = args_info.array_start + args_info.current_offset_in_array;
250
0
                for (int i = 0; i < arguments.size() && current_step > 0; ++i) {
251
0
                    columns[gap + i]->insert_range_from(*lambda_datas[i], pos, current_step);
252
0
                }
253
0
                args_info.current_offset_in_array += current_step;
254
0
                args_info.current_repeat_times += current_step;
255
0
                if (args_info.current_offset_in_array >= args_info.cur_size) {
256
0
                    args_info.current_row_eos = true;
257
0
                }
258
0
                _extend_data(columns, block, args_info.current_repeat_times, gap,
259
0
                             args_info.current_row_idx, output_slot_ref_indexs);
260
0
                args_info.current_repeat_times = 0;
261
0
                if (args_info.current_row_eos) {
262
                    //current row is end of array, move to next row
263
0
                    args_info.current_row_idx++;
264
0
                    args_info.current_offset_in_array = 0;
265
0
                    if (args_info.current_row_idx >= count) {
266
0
                        break;
267
0
                    }
268
0
                    args_info.current_row_eos = false;
269
0
                    args_info.array_start = (*args_info.offsets_ptr)[args_info.current_row_idx - 1];
270
0
                    args_info.cur_size = (*args_info.offsets_ptr)[args_info.current_row_idx] -
271
0
                                         args_info.array_start;
272
0
                }
273
0
            }
274
275
0
            if (!mem_reuse) {
276
0
                for (int i = 0; i < column_size; ++i) {
277
0
                    lambda_block.insert(
278
0
                            ColumnWithTypeAndName(std::move(columns[i]), data_types[i], names[i]));
279
0
                }
280
0
            }
281
            //3. child[0]->execute(new_block)
282
283
0
            ColumnPtr res_col;
284
            // lambda body executes on the internal lambda_block, not the original block.
285
            // The outer expr_selector is irrelevant here, so pass nullptr.
286
0
            RETURN_IF_ERROR(children[0]->execute_column(context, &lambda_block, nullptr,
287
0
                                                        lambda_block.rows(), res_col));
288
0
            res_col = res_col->convert_to_full_column_if_const();
289
0
            res_type = children[0]->execute_type(&lambda_block);
290
291
0
            if (!result_col) {
292
0
                result_col = res_col->clone_empty();
293
0
            }
294
0
            result_col->insert_range_from(*res_col, 0, res_col->size());
295
0
            lambda_block.clear_column_data(column_size);
296
0
        } while (args_info.current_row_idx < count);
297
298
        //4. get the result column after execution, reassemble it into a new array column, and return.
299
0
        if (result_type->is_nullable()) {
300
0
            if (res_type->is_nullable()) {
301
0
                result_column = ColumnNullable::create(
302
0
                        ColumnArray::create(std::move(result_col), std::move(array_column_offset)),
303
0
                        std::move(outside_null_map));
304
0
            } else {
305
                // deal with eg: select array_map(x -> x is null, [null, 1, 2]);
306
                // need to create the nested column null map for column array
307
0
                auto nested_null_map = ColumnUInt8::create(result_col->size(), 0);
308
309
0
                result_column = ColumnNullable::create(
310
0
                        ColumnArray::create(ColumnNullable::create(std::move(result_col),
311
0
                                                                   std::move(nested_null_map)),
312
0
                                            std::move(array_column_offset)),
313
0
                        std::move(outside_null_map));
314
0
            }
315
0
        } else {
316
0
            if (res_type->is_nullable()) {
317
0
                result_column =
318
0
                        ColumnArray::create(std::move(result_col), std::move(array_column_offset));
319
0
            } else {
320
0
                auto nested_null_map = ColumnUInt8::create(result_col->size(), 0);
321
322
0
                result_column = ColumnArray::create(
323
0
                        ColumnNullable::create(std::move(result_col), std::move(nested_null_map)),
324
0
                        std::move(array_column_offset));
325
0
            }
326
0
        }
327
0
        return Status::OK();
328
0
    }
329
330
private:
331
0
    bool _contains_column_id(const std::vector<int>& output_slot_ref_indexs, int id) const {
332
0
        const auto it = std::find(output_slot_ref_indexs.begin(), output_slot_ref_indexs.end(), id);
333
0
        return it != output_slot_ref_indexs.end();
334
0
    }
335
336
0
    void _set_column_ref_column_id(VExprSPtr expr, int gap) const {
337
0
        for (const auto& child : expr->children()) {
338
0
            if (child->is_column_ref()) {
339
0
                auto* ref = static_cast<VColumnRef*>(child.get());
340
0
                ref->set_gap(gap);
341
0
            } else {
342
0
                _set_column_ref_column_id(child, gap);
343
0
            }
344
0
        }
345
0
    }
346
347
    void _collect_slot_ref_column_id(VExprSPtr expr,
348
0
                                     std::vector<int>& output_slot_ref_indexs) const {
349
0
        for (const auto& child : expr->children()) {
350
0
            if (child->is_slot_ref()) {
351
0
                const auto* ref = static_cast<VSlotRef*>(child.get());
352
0
                output_slot_ref_indexs.push_back(ref->column_id());
353
0
            } else {
354
0
                _collect_slot_ref_column_id(child, output_slot_ref_indexs);
355
0
            }
356
0
        }
357
0
    }
358
359
    void _extend_data(std::vector<MutableColumnPtr>& columns, const Block* block,
360
                      int current_repeat_times, int size, int64_t current_row_idx,
361
0
                      const std::vector<int>& output_slot_ref_indexs) const {
362
0
        if (!current_repeat_times || !size) {
363
0
            return;
364
0
        }
365
0
        for (int i = 0; i < size; i++) {
366
0
            if (_contains_column_id(output_slot_ref_indexs, i)) {
367
0
                auto src_column =
368
0
                        block->get_by_position(i).column->convert_to_full_column_if_const();
369
0
                columns[i]->insert_many_from(*src_column, current_row_idx, current_repeat_times);
370
0
            } else {
371
                // must be column const
372
0
                DCHECK(is_column_const(*columns[i]));
373
0
                columns[i]->resize(columns[i]->size() + current_repeat_times);
374
0
            }
375
0
        }
376
0
    }
377
};
378
379
0
void register_function_array_map(doris::LambdaFunctionFactory& factory) {
380
0
    factory.register_function<ArrayMapFunction>();
381
0
}
382
383
} // namespace doris