Coverage Report

Created: 2026-05-28 20:10

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exprs/lambda_function/varray_map_function.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include <memory>
19
#include <string>
20
#include <vector>
21
22
#include "common/status.h"
23
#include "core/assert_cast.h"
24
#include "core/block/block.h"
25
#include "core/block/column_numbers.h"
26
#include "core/block/column_with_type_and_name.h"
27
#include "core/block/columns_with_type_and_name.h"
28
#include "core/column/column.h"
29
#include "core/column/column_array.h"
30
#include "core/column/column_nullable.h"
31
#include "core/column/column_vector.h"
32
#include "core/data_type/data_type.h"
33
#include "core/data_type/data_type_array.h"
34
#include "core/data_type/data_type_nullable.h"
35
#include "core/data_type/data_type_number.h"
36
#include "exec/common/util.hpp"
37
#include "exprs/aggregate/aggregate_function.h"
38
#include "exprs/lambda_function/lambda_function.h"
39
#include "exprs/lambda_function/lambda_function_factory.h"
40
#include "exprs/vcolumn_ref.h"
41
#include "exprs/vslot_ref.h"
42
43
namespace doris {
44
class VExprContext;
45
46
// extend a block with all required parameters
47
struct LambdaArgs {
48
    // the lambda function need the column ids of all the slots
49
    std::vector<int> output_slot_ref_indexs;
50
    // which line is extended to the original block
51
    int64_t current_row_idx = 0;
52
    // when a block is filled, the array may be truncated, recording where it was truncated
53
    int64_t current_offset_in_array = 0;
54
    // the beginning position of the array
55
    size_t array_start = 0;
56
    // the size of the array
57
    int64_t cur_size = 0;
58
    // offset of column array
59
    const ColumnArray::Offsets64* offsets_ptr = nullptr;
60
    // expend data of repeat times
61
    int current_repeat_times = 0;
62
    // whether the current row of the original block has been extended
63
    bool current_row_eos = false;
64
};
65
66
class ArrayMapFunction : public LambdaFunction {
67
    ENABLE_FACTORY_CREATOR(ArrayMapFunction);
68
69
public:
70
0
    ~ArrayMapFunction() override = default;
71
72
    static constexpr auto name = "array_map";
73
74
0
    static LambdaFunctionPtr create() { return std::make_shared<ArrayMapFunction>(); }
75
76
0
    std::string get_name() const override { return name; }
77
78
    Status execute(VExprContext* context, const Block* block, const Selector* expr_selector,
79
                   size_t count, ColumnPtr& result_column, const DataTypePtr& result_type,
80
0
                   const VExprSPtrs& children) const override {
81
0
        LambdaArgs args_info;
82
        // collect used slot ref in lambda function body
83
0
        std::vector<int>& output_slot_ref_indexs = args_info.output_slot_ref_indexs;
84
0
        _collect_slot_ref_column_id(children[0], output_slot_ref_indexs);
85
86
0
        int gap = 0;
87
0
        if (!output_slot_ref_indexs.empty()) {
88
0
            auto max_id =
89
0
                    std::max_element(output_slot_ref_indexs.begin(), output_slot_ref_indexs.end());
90
0
            gap = *max_id + 1;
91
0
            _set_column_ref_column_id(children[0], gap);
92
0
        }
93
94
0
        std::vector<std::string> names(gap);
95
0
        DataTypes data_types(gap);
96
97
0
        for (int i = 0; i < gap; ++i) {
98
0
            if (_contains_column_id(output_slot_ref_indexs, i)) {
99
0
                names[i] = block->get_by_position(i).name;
100
0
                data_types[i] = block->get_by_position(i).type;
101
0
            } else {
102
                // padding some mock data to hold the position, like call block#rows function need
103
0
                names[i] = "temp";
104
0
                data_types[i] = std::make_shared<DataTypeUInt8>();
105
0
            }
106
0
        }
107
108
        ///* array_map(lambda,arg1,arg2,.....) *///
109
        //1. child[1:end]->execute(src_block)
110
0
        ColumnsWithTypeAndName arguments(children.size() - 1);
111
0
        for (int i = 1; i < children.size(); ++i) {
112
0
            ColumnPtr column;
113
0
            RETURN_IF_ERROR(
114
0
                    children[i]->execute_column(context, block, expr_selector, count, column));
115
0
            arguments[i - 1].column = column;
116
0
            arguments[i - 1].type = children[i]->execute_type(block);
117
0
            arguments[i - 1].name = children[i]->expr_name();
118
0
        }
119
120
        // used for save column array outside null map
121
0
        auto outside_null_map = ColumnUInt8::create(
122
0
                arguments[0].column->convert_to_full_column_if_const()->size(), 0);
123
        // offset column
124
0
        MutableColumnPtr array_column_offset;
125
0
        size_t nested_array_column_rows = 0;
126
0
        ColumnPtr first_array_offsets = nullptr;
127
        //2. get the result column from executed expr, and the needed is nested column of array
128
0
        std::vector<ColumnPtr> lambda_datas(arguments.size());
129
130
0
        for (int i = 0; i < arguments.size(); ++i) {
131
0
            const auto& array_column_type_name = arguments[i];
132
0
            auto column_array = array_column_type_name.column->convert_to_full_column_if_const();
133
0
            auto type_array = array_column_type_name.type;
134
0
            if (type_array->is_nullable()) {
135
                // get the nullmap of nullable column
136
                // hold the null column instead of a reference 'cause `column_array` will be assigned and freed below.
137
0
                auto column_array_nullmap =
138
0
                        assert_cast<const ColumnNullable&>(*column_array).get_null_map_column_ptr();
139
140
                // get the array column from nullable column
141
0
                column_array = assert_cast<const ColumnNullable*>(column_array.get())
142
0
                                       ->get_nested_column_ptr();
143
144
                // get the nested type from nullable type
145
0
                type_array = assert_cast<const DataTypeNullable*>(array_column_type_name.type.get())
146
0
                                     ->get_nested_type();
147
148
                // need to union nullmap from all columns
149
0
                VectorizedUtils::update_null_map(outside_null_map->get_data(),
150
0
                                                 column_array_nullmap->get_data());
151
0
            }
152
153
            // here is the array column
154
0
            const auto& col_array = assert_cast<const ColumnArray&>(*column_array);
155
0
            const auto& col_type = assert_cast<const DataTypeArray&>(*type_array);
156
157
0
            if (i == 0) {
158
0
                nested_array_column_rows = col_array.get_data_ptr()->size();
159
0
                first_array_offsets = col_array.get_offsets_ptr();
160
0
                const auto& off_data = assert_cast<const ColumnArray::ColumnOffsets&>(
161
0
                        col_array.get_offsets_column());
162
0
                array_column_offset = off_data.clone_resized(col_array.get_offsets_column().size());
163
0
                args_info.offsets_ptr = &col_array.get_offsets();
164
0
            } else {
165
                // select array_map((x,y)->x+y,c_array1,[0,1,2,3]) from array_test2;
166
                // c_array1: [0,1,2,3,4,5,6,7,8,9]
167
0
                const auto& array_offsets =
168
0
                        assert_cast<const ColumnArray::ColumnOffsets&>(*first_array_offsets)
169
0
                                .get_data();
170
0
                if (nested_array_column_rows != col_array.get_data_ptr()->size() ||
171
0
                    (!array_offsets.empty() &&
172
0
                     memcmp(array_offsets.data(), col_array.get_offsets().data(),
173
0
                            sizeof(array_offsets[0]) * array_offsets.size()) != 0)) {
174
0
                    return Status::InvalidArgument(
175
0
                            "in array map function, the input column size "
176
0
                            "are "
177
0
                            "not equal completely, nested column data rows 1st size is {}, {}th "
178
0
                            "size is {}.",
179
0
                            nested_array_column_rows, i + 1, col_array.get_data_ptr()->size());
180
0
                }
181
0
            }
182
0
            lambda_datas[i] = col_array.get_data_ptr();
183
0
            names.push_back("R" + array_column_type_name.name);
184
0
            data_types.push_back(col_type.get_nested_type());
185
0
        }
186
187
        // if column_array is NULL, we know the array_data_column will not write any data,
188
        // so the column is empty. eg : (x) -> concat('|',x + "1"). if still execute the lambda function, will cause the bolck rows are not equal
189
        // the x column is empty, but "|" is const literal, size of column is 1, so the block rows is 1, but the x column is empty, will be coredump.
190
0
        if (std::any_of(lambda_datas.begin(), lambda_datas.end(),
191
0
                        [](const auto& v) { return v->empty(); })) {
192
0
            DataTypePtr nested_type;
193
0
            bool is_nullable = result_type->is_nullable();
194
0
            if (is_nullable) {
195
0
                nested_type =
196
0
                        assert_cast<const DataTypeNullable*>(result_type.get())->get_nested_type();
197
0
            } else {
198
0
                nested_type = result_type;
199
0
            }
200
0
            auto empty_nested_column = assert_cast<const DataTypeArray*>(nested_type.get())
201
0
                                               ->get_nested_type()
202
0
                                               ->create_column();
203
0
            auto result_array_column = ColumnArray::create(std::move(empty_nested_column),
204
0
                                                           std::move(array_column_offset));
205
206
0
            if (is_nullable) {
207
0
                result_column = ColumnNullable::create(std::move(result_array_column),
208
0
                                                       std::move(outside_null_map));
209
0
            } else {
210
0
                result_column = std::move(result_array_column);
211
0
            }
212
213
0
            return Status::OK();
214
0
        }
215
216
0
        MutableColumnPtr result_col = nullptr;
217
0
        DataTypePtr res_type;
218
219
        //process first row
220
0
        args_info.array_start = (*args_info.offsets_ptr)[args_info.current_row_idx - 1];
221
0
        args_info.cur_size =
222
0
                (*args_info.offsets_ptr)[args_info.current_row_idx] - args_info.array_start;
223
224
        // lambda block to exectute the lambda, and reuse the memory
225
0
        Block lambda_block;
226
0
        auto column_size = names.size();
227
0
        MutableColumns columns(column_size);
228
0
        do {
229
0
            bool mem_reuse = lambda_block.mem_reuse();
230
0
            for (int i = 0; i < column_size; i++) {
231
0
                if (mem_reuse) {
232
0
                    columns[i] = lambda_block.get_by_position(i).column->assert_mutable();
233
0
                } else {
234
0
                    if (_contains_column_id(output_slot_ref_indexs, i) || i >= gap) {
235
                        // TODO: maybe could create const column, so not insert_many_from when extand data
236
                        // but now here handle batch_size of array nested data every time, so maybe have different rows
237
0
                        columns[i] = data_types[i]->create_column();
238
0
                    } else {
239
0
                        columns[i] = data_types[i]
240
0
                                             ->create_column_const_with_default_value(0)
241
0
                                             ->assert_mutable();
242
0
                    }
243
0
                }
244
0
            }
245
            // batch_size of array nested data every time inorder to avoid memory overflow
246
0
            while (columns[gap]->size() < batch_size) {
247
0
                long max_step = batch_size - columns[gap]->size();
248
0
                long current_step = std::min(
249
0
                        max_step, (long)(args_info.cur_size - args_info.current_offset_in_array));
250
0
                size_t pos = args_info.array_start + args_info.current_offset_in_array;
251
0
                for (int i = 0; i < arguments.size() && current_step > 0; ++i) {
252
0
                    columns[gap + i]->insert_range_from(*lambda_datas[i], pos, current_step);
253
0
                }
254
0
                args_info.current_offset_in_array += current_step;
255
0
                args_info.current_repeat_times += current_step;
256
0
                if (args_info.current_offset_in_array >= args_info.cur_size) {
257
0
                    args_info.current_row_eos = true;
258
0
                }
259
0
                _extend_data(columns, block, args_info.current_repeat_times, gap,
260
0
                             args_info.current_row_idx, output_slot_ref_indexs);
261
0
                args_info.current_repeat_times = 0;
262
0
                if (args_info.current_row_eos) {
263
                    //current row is end of array, move to next row
264
0
                    args_info.current_row_idx++;
265
0
                    args_info.current_offset_in_array = 0;
266
0
                    if (args_info.current_row_idx >= count) {
267
0
                        break;
268
0
                    }
269
0
                    args_info.current_row_eos = false;
270
0
                    args_info.array_start = (*args_info.offsets_ptr)[args_info.current_row_idx - 1];
271
0
                    args_info.cur_size = (*args_info.offsets_ptr)[args_info.current_row_idx] -
272
0
                                         args_info.array_start;
273
0
                }
274
0
            }
275
276
0
            if (!mem_reuse) {
277
0
                for (int i = 0; i < column_size; ++i) {
278
0
                    lambda_block.insert(
279
0
                            ColumnWithTypeAndName(std::move(columns[i]), data_types[i], names[i]));
280
0
                }
281
0
            }
282
            //3. child[0]->execute(new_block)
283
284
0
            ColumnPtr res_col;
285
            // lambda body executes on the internal lambda_block, not the original block.
286
            // The outer expr_selector is irrelevant here, so pass nullptr.
287
0
            RETURN_IF_ERROR(children[0]->execute_column(context, &lambda_block, nullptr,
288
0
                                                        lambda_block.rows(), res_col));
289
0
            res_col = res_col->convert_to_full_column_if_const();
290
0
            res_type = children[0]->execute_type(&lambda_block);
291
292
0
            if (!result_col) {
293
0
                result_col = res_col->clone_empty();
294
0
            }
295
0
            result_col->insert_range_from(*res_col, 0, res_col->size());
296
0
            lambda_block.clear_column_data(column_size);
297
0
        } while (args_info.current_row_idx < count);
298
299
        //4. get the result column after execution, reassemble it into a new array column, and return.
300
0
        if (result_type->is_nullable()) {
301
0
            if (res_type->is_nullable()) {
302
0
                result_column = ColumnNullable::create(
303
0
                        ColumnArray::create(std::move(result_col), std::move(array_column_offset)),
304
0
                        std::move(outside_null_map));
305
0
            } else {
306
                // deal with eg: select array_map(x -> x is null, [null, 1, 2]);
307
                // need to create the nested column null map for column array
308
0
                auto nested_null_map = ColumnUInt8::create(result_col->size(), 0);
309
310
0
                result_column = ColumnNullable::create(
311
0
                        ColumnArray::create(ColumnNullable::create(std::move(result_col),
312
0
                                                                   std::move(nested_null_map)),
313
0
                                            std::move(array_column_offset)),
314
0
                        std::move(outside_null_map));
315
0
            }
316
0
        } else {
317
0
            if (res_type->is_nullable()) {
318
0
                result_column =
319
0
                        ColumnArray::create(std::move(result_col), std::move(array_column_offset));
320
0
            } else {
321
0
                auto nested_null_map = ColumnUInt8::create(result_col->size(), 0);
322
323
0
                result_column = ColumnArray::create(
324
0
                        ColumnNullable::create(std::move(result_col), std::move(nested_null_map)),
325
0
                        std::move(array_column_offset));
326
0
            }
327
0
        }
328
0
        return Status::OK();
329
0
    }
330
331
private:
332
0
    bool _contains_column_id(const std::vector<int>& output_slot_ref_indexs, int id) const {
333
0
        const auto it = std::find(output_slot_ref_indexs.begin(), output_slot_ref_indexs.end(), id);
334
0
        return it != output_slot_ref_indexs.end();
335
0
    }
336
337
0
    void _set_column_ref_column_id(VExprSPtr expr, int gap) const {
338
0
        for (const auto& child : expr->children()) {
339
0
            if (child->is_column_ref()) {
340
0
                auto* ref = static_cast<VColumnRef*>(child.get());
341
0
                ref->set_gap(gap);
342
0
            } else {
343
0
                _set_column_ref_column_id(child, gap);
344
0
            }
345
0
        }
346
0
    }
347
348
    void _collect_slot_ref_column_id(VExprSPtr expr,
349
0
                                     std::vector<int>& output_slot_ref_indexs) const {
350
0
        for (const auto& child : expr->children()) {
351
0
            if (child->is_slot_ref()) {
352
0
                const auto* ref = static_cast<VSlotRef*>(child.get());
353
0
                output_slot_ref_indexs.push_back(ref->column_id());
354
0
            } else {
355
0
                _collect_slot_ref_column_id(child, output_slot_ref_indexs);
356
0
            }
357
0
        }
358
0
    }
359
360
    void _extend_data(std::vector<MutableColumnPtr>& columns, const Block* block,
361
                      int current_repeat_times, int size, int64_t current_row_idx,
362
0
                      const std::vector<int>& output_slot_ref_indexs) const {
363
0
        if (!current_repeat_times || !size) {
364
0
            return;
365
0
        }
366
0
        for (int i = 0; i < size; i++) {
367
0
            if (_contains_column_id(output_slot_ref_indexs, i)) {
368
0
                auto src_column =
369
0
                        block->get_by_position(i).column->convert_to_full_column_if_const();
370
0
                columns[i]->insert_many_from(*src_column, current_row_idx, current_repeat_times);
371
0
            } else {
372
                // must be column const
373
0
                DCHECK(is_column_const(*columns[i]));
374
0
                columns[i]->resize(columns[i]->size() + current_repeat_times);
375
0
            }
376
0
        }
377
0
    }
378
};
379
380
0
void register_function_array_map(doris::LambdaFunctionFactory& factory) {
381
0
    factory.register_function<ArrayMapFunction>();
382
0
}
383
384
} // namespace doris