Coverage Report

Created: 2026-03-15 08:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exprs/lambda_function/varray_map_function.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include <memory>
19
#include <string>
20
#include <vector>
21
22
#include "common/status.h"
23
#include "core/assert_cast.h"
24
#include "core/block/block.h"
25
#include "core/block/column_numbers.h"
26
#include "core/block/column_with_type_and_name.h"
27
#include "core/block/columns_with_type_and_name.h"
28
#include "core/column/column.h"
29
#include "core/column/column_array.h"
30
#include "core/column/column_nullable.h"
31
#include "core/column/column_vector.h"
32
#include "core/data_type/data_type.h"
33
#include "core/data_type/data_type_array.h"
34
#include "core/data_type/data_type_nullable.h"
35
#include "core/data_type/data_type_number.h"
36
#include "exec/common/util.hpp"
37
#include "exprs/aggregate/aggregate_function.h"
38
#include "exprs/lambda_function/lambda_function.h"
39
#include "exprs/lambda_function/lambda_function_factory.h"
40
#include "exprs/vcolumn_ref.h"
41
#include "exprs/vslot_ref.h"
42
43
namespace doris {
44
#include "common/compile_check_begin.h"
45
class VExprContext;
46
47
// extend a block with all required parameters
48
struct LambdaArgs {
49
    // the lambda function need the column ids of all the slots
50
    std::vector<int> output_slot_ref_indexs;
51
    // which line is extended to the original block
52
    int64_t current_row_idx = 0;
53
    // when a block is filled, the array may be truncated, recording where it was truncated
54
    int64_t current_offset_in_array = 0;
55
    // the beginning position of the array
56
    size_t array_start = 0;
57
    // the size of the array
58
    int64_t cur_size = 0;
59
    // offset of column array
60
    const ColumnArray::Offsets64* offsets_ptr = nullptr;
61
    // expend data of repeat times
62
    int current_repeat_times = 0;
63
    // whether the current row of the original block has been extended
64
    bool current_row_eos = false;
65
};
66
67
class ArrayMapFunction : public LambdaFunction {
68
    ENABLE_FACTORY_CREATOR(ArrayMapFunction);
69
70
public:
71
0
    ~ArrayMapFunction() override = default;
72
73
    static constexpr auto name = "array_map";
74
75
0
    static LambdaFunctionPtr create() { return std::make_shared<ArrayMapFunction>(); }
76
77
0
    std::string get_name() const override { return name; }
78
79
    Status execute(VExprContext* context, const Block* block, Selector* expr_selector, size_t count,
80
                   ColumnPtr& result_column, const DataTypePtr& result_type,
81
0
                   const VExprSPtrs& children) const override {
82
0
        LambdaArgs args_info;
83
        // collect used slot ref in lambda function body
84
0
        std::vector<int>& output_slot_ref_indexs = args_info.output_slot_ref_indexs;
85
0
        _collect_slot_ref_column_id(children[0], output_slot_ref_indexs);
86
87
0
        int gap = 0;
88
0
        if (!output_slot_ref_indexs.empty()) {
89
0
            auto max_id =
90
0
                    std::max_element(output_slot_ref_indexs.begin(), output_slot_ref_indexs.end());
91
0
            gap = *max_id + 1;
92
0
            _set_column_ref_column_id(children[0], gap);
93
0
        }
94
95
0
        std::vector<std::string> names(gap);
96
0
        DataTypes data_types(gap);
97
98
0
        for (int i = 0; i < gap; ++i) {
99
0
            if (_contains_column_id(output_slot_ref_indexs, i)) {
100
0
                names[i] = block->get_by_position(i).name;
101
0
                data_types[i] = block->get_by_position(i).type;
102
0
            } else {
103
                // padding some mock data to hold the position, like call block#rows function need
104
0
                names[i] = "temp";
105
0
                data_types[i] = std::make_shared<DataTypeUInt8>();
106
0
            }
107
0
        }
108
109
        ///* array_map(lambda,arg1,arg2,.....) *///
110
        //1. child[1:end]->execute(src_block)
111
0
        ColumnsWithTypeAndName arguments(children.size() - 1);
112
0
        for (int i = 1; i < children.size(); ++i) {
113
0
            ColumnPtr column;
114
0
            RETURN_IF_ERROR(
115
0
                    children[i]->execute_column(context, block, expr_selector, count, column));
116
0
            arguments[i - 1].column = column;
117
0
            arguments[i - 1].type = children[i]->execute_type(block);
118
0
            arguments[i - 1].name = children[i]->expr_name();
119
0
        }
120
121
        // used for save column array outside null map
122
0
        auto outside_null_map = ColumnUInt8::create(
123
0
                arguments[0].column->convert_to_full_column_if_const()->size(), 0);
124
        // offset column
125
0
        MutableColumnPtr array_column_offset;
126
0
        size_t nested_array_column_rows = 0;
127
0
        ColumnPtr first_array_offsets = nullptr;
128
        //2. get the result column from executed expr, and the needed is nested column of array
129
0
        std::vector<ColumnPtr> lambda_datas(arguments.size());
130
131
0
        for (int i = 0; i < arguments.size(); ++i) {
132
0
            const auto& array_column_type_name = arguments[i];
133
0
            auto column_array = array_column_type_name.column->convert_to_full_column_if_const();
134
0
            auto type_array = array_column_type_name.type;
135
0
            if (type_array->is_nullable()) {
136
                // get the nullmap of nullable column
137
                // hold the null column instead of a reference 'cause `column_array` will be assigned and freed below.
138
0
                auto column_array_nullmap =
139
0
                        assert_cast<const ColumnNullable&>(*column_array).get_null_map_column_ptr();
140
141
                // get the array column from nullable column
142
0
                column_array = assert_cast<const ColumnNullable*>(column_array.get())
143
0
                                       ->get_nested_column_ptr();
144
145
                // get the nested type from nullable type
146
0
                type_array = assert_cast<const DataTypeNullable*>(array_column_type_name.type.get())
147
0
                                     ->get_nested_type();
148
149
                // need to union nullmap from all columns
150
0
                VectorizedUtils::update_null_map(
151
0
                        outside_null_map->get_data(),
152
0
                        assert_cast<const ColumnUInt8&>(*column_array_nullmap).get_data());
153
0
            }
154
155
            // here is the array column
156
0
            const auto& col_array = assert_cast<const ColumnArray&>(*column_array);
157
0
            const auto& col_type = assert_cast<const DataTypeArray&>(*type_array);
158
159
0
            if (i == 0) {
160
0
                nested_array_column_rows = col_array.get_data_ptr()->size();
161
0
                first_array_offsets = col_array.get_offsets_ptr();
162
0
                const auto& off_data = assert_cast<const ColumnArray::ColumnOffsets&>(
163
0
                        col_array.get_offsets_column());
164
0
                array_column_offset = off_data.clone_resized(col_array.get_offsets_column().size());
165
0
                args_info.offsets_ptr = &col_array.get_offsets();
166
0
            } else {
167
                // select array_map((x,y)->x+y,c_array1,[0,1,2,3]) from array_test2;
168
                // c_array1: [0,1,2,3,4,5,6,7,8,9]
169
0
                const auto& array_offsets =
170
0
                        assert_cast<const ColumnArray::ColumnOffsets&>(*first_array_offsets)
171
0
                                .get_data();
172
0
                if (nested_array_column_rows != col_array.get_data_ptr()->size() ||
173
0
                    (!array_offsets.empty() &&
174
0
                     memcmp(array_offsets.data(), col_array.get_offsets().data(),
175
0
                            sizeof(array_offsets[0]) * array_offsets.size()) != 0)) {
176
0
                    return Status::InvalidArgument(
177
0
                            "in array map function, the input column size "
178
0
                            "are "
179
0
                            "not equal completely, nested column data rows 1st size is {}, {}th "
180
0
                            "size is {}.",
181
0
                            nested_array_column_rows, i + 1, col_array.get_data_ptr()->size());
182
0
                }
183
0
            }
184
0
            lambda_datas[i] = col_array.get_data_ptr();
185
0
            names.push_back("R" + array_column_type_name.name);
186
0
            data_types.push_back(col_type.get_nested_type());
187
0
        }
188
189
        // if column_array is NULL, we know the array_data_column will not write any data,
190
        // so the column is empty. eg : (x) -> concat('|',x + "1"). if still execute the lambda function, will cause the bolck rows are not equal
191
        // the x column is empty, but "|" is const literal, size of column is 1, so the block rows is 1, but the x column is empty, will be coredump.
192
0
        if (std::any_of(lambda_datas.begin(), lambda_datas.end(),
193
0
                        [](const auto& v) { return v->empty(); })) {
194
0
            DataTypePtr nested_type;
195
0
            bool is_nullable = result_type->is_nullable();
196
0
            if (is_nullable) {
197
0
                nested_type =
198
0
                        assert_cast<const DataTypeNullable*>(result_type.get())->get_nested_type();
199
0
            } else {
200
0
                nested_type = result_type;
201
0
            }
202
0
            auto empty_nested_column = assert_cast<const DataTypeArray*>(nested_type.get())
203
0
                                               ->get_nested_type()
204
0
                                               ->create_column();
205
0
            auto result_array_column = ColumnArray::create(std::move(empty_nested_column),
206
0
                                                           std::move(array_column_offset));
207
208
0
            if (is_nullable) {
209
0
                result_column = ColumnNullable::create(std::move(result_array_column),
210
0
                                                       std::move(outside_null_map));
211
0
            } else {
212
0
                result_column = std::move(result_array_column);
213
0
            }
214
215
0
            return Status::OK();
216
0
        }
217
218
0
        MutableColumnPtr result_col = nullptr;
219
0
        DataTypePtr res_type;
220
221
        //process first row
222
0
        args_info.array_start = (*args_info.offsets_ptr)[args_info.current_row_idx - 1];
223
0
        args_info.cur_size =
224
0
                (*args_info.offsets_ptr)[args_info.current_row_idx] - args_info.array_start;
225
226
        // lambda block to exectute the lambda, and reuse the memory
227
0
        Block lambda_block;
228
0
        auto column_size = names.size();
229
0
        MutableColumns columns(column_size);
230
0
        do {
231
0
            bool mem_reuse = lambda_block.mem_reuse();
232
0
            for (int i = 0; i < column_size; i++) {
233
0
                if (mem_reuse) {
234
0
                    columns[i] = lambda_block.get_by_position(i).column->assume_mutable();
235
0
                } else {
236
0
                    if (_contains_column_id(output_slot_ref_indexs, i) || i >= gap) {
237
                        // TODO: maybe could create const column, so not insert_many_from when extand data
238
                        // but now here handle batch_size of array nested data every time, so maybe have different rows
239
0
                        columns[i] = data_types[i]->create_column();
240
0
                    } else {
241
0
                        columns[i] = data_types[i]
242
0
                                             ->create_column_const_with_default_value(0)
243
0
                                             ->assume_mutable();
244
0
                    }
245
0
                }
246
0
            }
247
            // batch_size of array nested data every time inorder to avoid memory overflow
248
0
            while (columns[gap]->size() < batch_size) {
249
0
                long max_step = batch_size - columns[gap]->size();
250
0
                long current_step = std::min(
251
0
                        max_step, (long)(args_info.cur_size - args_info.current_offset_in_array));
252
0
                size_t pos = args_info.array_start + args_info.current_offset_in_array;
253
0
                for (int i = 0; i < arguments.size() && current_step > 0; ++i) {
254
0
                    columns[gap + i]->insert_range_from(*lambda_datas[i], pos, current_step);
255
0
                }
256
0
                args_info.current_offset_in_array += current_step;
257
0
                args_info.current_repeat_times += current_step;
258
0
                if (args_info.current_offset_in_array >= args_info.cur_size) {
259
0
                    args_info.current_row_eos = true;
260
0
                }
261
0
                _extend_data(columns, block, args_info.current_repeat_times, gap,
262
0
                             args_info.current_row_idx, output_slot_ref_indexs);
263
0
                args_info.current_repeat_times = 0;
264
0
                if (args_info.current_row_eos) {
265
                    //current row is end of array, move to next row
266
0
                    args_info.current_row_idx++;
267
0
                    args_info.current_offset_in_array = 0;
268
0
                    if (args_info.current_row_idx >= count) {
269
0
                        break;
270
0
                    }
271
0
                    args_info.current_row_eos = false;
272
0
                    args_info.array_start = (*args_info.offsets_ptr)[args_info.current_row_idx - 1];
273
0
                    args_info.cur_size = (*args_info.offsets_ptr)[args_info.current_row_idx] -
274
0
                                         args_info.array_start;
275
0
                }
276
0
            }
277
278
0
            if (!mem_reuse) {
279
0
                for (int i = 0; i < column_size; ++i) {
280
0
                    lambda_block.insert(
281
0
                            ColumnWithTypeAndName(std::move(columns[i]), data_types[i], names[i]));
282
0
                }
283
0
            }
284
            //3. child[0]->execute(new_block)
285
286
0
            ColumnPtr res_col;
287
            // lambda body executes on the internal lambda_block, not the original block.
288
            // The outer expr_selector is irrelevant here, so pass nullptr.
289
0
            RETURN_IF_ERROR(children[0]->execute_column(context, &lambda_block, nullptr,
290
0
                                                        lambda_block.rows(), res_col));
291
0
            res_col = res_col->convert_to_full_column_if_const();
292
0
            res_type = children[0]->execute_type(&lambda_block);
293
294
0
            if (!result_col) {
295
0
                result_col = res_col->clone_empty();
296
0
            }
297
0
            result_col->insert_range_from(*res_col, 0, res_col->size());
298
0
            lambda_block.clear_column_data(column_size);
299
0
        } while (args_info.current_row_idx < count);
300
301
        //4. get the result column after execution, reassemble it into a new array column, and return.
302
0
        if (result_type->is_nullable()) {
303
0
            if (res_type->is_nullable()) {
304
0
                result_column = ColumnNullable::create(
305
0
                        ColumnArray::create(std::move(result_col), std::move(array_column_offset)),
306
0
                        std::move(outside_null_map));
307
0
            } else {
308
                // deal with eg: select array_map(x -> x is null, [null, 1, 2]);
309
                // need to create the nested column null map for column array
310
0
                auto nested_null_map = ColumnUInt8::create(result_col->size(), 0);
311
312
0
                result_column = ColumnNullable::create(
313
0
                        ColumnArray::create(ColumnNullable::create(std::move(result_col),
314
0
                                                                   std::move(nested_null_map)),
315
0
                                            std::move(array_column_offset)),
316
0
                        std::move(outside_null_map));
317
0
            }
318
0
        } else {
319
0
            if (res_type->is_nullable()) {
320
0
                result_column =
321
0
                        ColumnArray::create(std::move(result_col), std::move(array_column_offset));
322
0
            } else {
323
0
                auto nested_null_map = ColumnUInt8::create(result_col->size(), 0);
324
325
0
                result_column = ColumnArray::create(
326
0
                        ColumnNullable::create(std::move(result_col), std::move(nested_null_map)),
327
0
                        std::move(array_column_offset));
328
0
            }
329
0
        }
330
0
        return Status::OK();
331
0
    }
332
333
private:
334
0
    bool _contains_column_id(const std::vector<int>& output_slot_ref_indexs, int id) const {
335
0
        const auto it = std::find(output_slot_ref_indexs.begin(), output_slot_ref_indexs.end(), id);
336
0
        return it != output_slot_ref_indexs.end();
337
0
    }
338
339
0
    void _set_column_ref_column_id(VExprSPtr expr, int gap) const {
340
0
        for (const auto& child : expr->children()) {
341
0
            if (child->is_column_ref()) {
342
0
                auto* ref = static_cast<VColumnRef*>(child.get());
343
0
                ref->set_gap(gap);
344
0
            } else {
345
0
                _set_column_ref_column_id(child, gap);
346
0
            }
347
0
        }
348
0
    }
349
350
    void _collect_slot_ref_column_id(VExprSPtr expr,
351
0
                                     std::vector<int>& output_slot_ref_indexs) const {
352
0
        for (const auto& child : expr->children()) {
353
0
            if (child->is_slot_ref()) {
354
0
                const auto* ref = static_cast<VSlotRef*>(child.get());
355
0
                output_slot_ref_indexs.push_back(ref->column_id());
356
0
            } else {
357
0
                _collect_slot_ref_column_id(child, output_slot_ref_indexs);
358
0
            }
359
0
        }
360
0
    }
361
362
    void _extend_data(std::vector<MutableColumnPtr>& columns, const Block* block,
363
                      int current_repeat_times, int size, int64_t current_row_idx,
364
0
                      const std::vector<int>& output_slot_ref_indexs) const {
365
0
        if (!current_repeat_times || !size) {
366
0
            return;
367
0
        }
368
0
        for (int i = 0; i < size; i++) {
369
0
            if (_contains_column_id(output_slot_ref_indexs, i)) {
370
0
                auto src_column =
371
0
                        block->get_by_position(i).column->convert_to_full_column_if_const();
372
0
                columns[i]->insert_many_from(*src_column, current_row_idx, current_repeat_times);
373
0
            } else {
374
                // must be column const
375
0
                DCHECK(is_column_const(*columns[i]));
376
0
                columns[i]->resize(columns[i]->size() + current_repeat_times);
377
0
            }
378
0
        }
379
0
    }
380
};
381
382
0
void register_function_array_map(doris::LambdaFunctionFactory& factory) {
383
0
    factory.register_function<ArrayMapFunction>();
384
0
}
385
386
#include "common/compile_check_end.h"
387
} // namespace doris