Coverage Report

Created: 2026-06-28 23:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exprs/lambda_function/varray_map_function.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include <algorithm>
19
#include <memory>
20
#include <set>
21
#include <string>
22
#include <vector>
23
24
#include "common/status.h"
25
#include "core/assert_cast.h"
26
#include "core/block/block.h"
27
#include "core/block/column_numbers.h"
28
#include "core/block/column_with_type_and_name.h"
29
#include "core/block/columns_with_type_and_name.h"
30
#include "core/column/column.h"
31
#include "core/column/column_array.h"
32
#include "core/column/column_nothing.h"
33
#include "core/column/column_nullable.h"
34
#include "core/column/column_vector.h"
35
#include "core/data_type/data_type.h"
36
#include "core/data_type/data_type_array.h"
37
#include "core/data_type/data_type_nullable.h"
38
#include "core/data_type/data_type_number.h"
39
#include "exec/common/util.hpp"
40
#include "exprs/aggregate/aggregate_function.h"
41
#include "exprs/lambda_function/lambda_execution_context.h"
42
#include "exprs/lambda_function/lambda_function.h"
43
#include "exprs/lambda_function/lambda_function_factory.h"
44
#include "exprs/vcolumn_ref.h"
45
#include "exprs/vexpr_context.h"
46
#include "exprs/vlambda_function_expr.h"
47
48
namespace doris {
49
50
// extend a block with all required parameters
51
struct LambdaArgs {
52
    // which line is extended to the original block
53
    int64_t current_row_idx = 0;
54
    // when a block is filled, the array may be truncated, recording where it was truncated
55
    int64_t current_offset_in_array = 0;
56
    // the beginning position of the array
57
    size_t array_start = 0;
58
    // the size of the array
59
    int64_t cur_size = 0;
60
    // offset of column array
61
    const ColumnArray::Offsets64* offsets_ptr = nullptr;
62
    // expend data of repeat times
63
    int current_repeat_times = 0;
64
    // whether the current row of the original block has been extended
65
    bool current_row_eos = false;
66
};
67
68
class ArrayMapFunction : public LambdaFunction {
69
    ENABLE_FACTORY_CREATOR(ArrayMapFunction);
70
71
public:
72
27
    ~ArrayMapFunction() override = default;
73
74
    static constexpr auto name = "array_map";
75
76
27
    static LambdaFunctionPtr create() { return std::make_shared<ArrayMapFunction>(); }
77
78
0
    std::string get_name() const override { return name; }
79
80
27
    Status prepare(RuntimeState* state, const VExprSPtrs& children) override {
81
27
        RETURN_IF_ERROR(LambdaFunction::prepare(state, children));
82
27
        DCHECK_GE(children.size(), 2);
83
84
27
        return _prepare_lambda_argument_binding(children[0], children.size() - 1,
85
27
                                                _lambda_argument_binding);
86
27
    }
87
88
    Status execute(VExprContext* context, const Block* block, const Selector* expr_selector,
89
                   size_t count, ColumnPtr& result_column, const DataTypePtr& result_type,
90
16
                   const VExprSPtrs& children) const override {
91
16
        LambdaArgs args_info;
92
93
        ///* array_map(lambda,arg1,arg2,.....) *///
94
        //1. child[1:end]->execute(src_block)
95
16
        ColumnsWithTypeAndName arguments(children.size() - 1);
96
34
        for (int i = 1; i < children.size(); ++i) {
97
18
            ColumnPtr column;
98
18
            RETURN_IF_ERROR(
99
18
                    children[i]->execute_column(context, block, expr_selector, count, column));
100
18
            arguments[i - 1].column = column;
101
18
            arguments[i - 1].type = children[i]->execute_type(block);
102
18
            arguments[i - 1].name = children[i]->expr_name();
103
18
        }
104
105
        // used for save column array outside null map
106
16
        auto outside_null_map = ColumnUInt8::create(
107
16
                arguments[0].column->convert_to_full_column_if_const()->size(), 0);
108
        // offset column
109
16
        ColumnPtr array_column_offset;
110
16
        size_t nested_array_column_rows = 0;
111
16
        ColumnPtr first_array_offsets = nullptr;
112
        //2. get the result column from executed expr, and the needed is nested column of array
113
16
        std::vector<ColumnPtr> lambda_datas(arguments.size());
114
16
        DataTypes lambda_argument_types(arguments.size());
115
116
34
        for (int i = 0; i < arguments.size(); ++i) {
117
18
            const auto& array_column_type_name = arguments[i];
118
18
            auto column_array = array_column_type_name.column->convert_to_full_column_if_const();
119
18
            auto type_array = array_column_type_name.type;
120
18
            if (type_array->is_nullable()) {
121
                // get the nullmap of nullable column
122
                // hold the null column instead of a reference 'cause `column_array` will be assigned and freed below.
123
1
                DORIS_CHECK(is_column_nullable(*column_array));
124
1
                auto column_array_nullmap =
125
1
                        assert_cast<const ColumnNullable&>(*column_array).get_null_map_column_ptr();
126
127
                // get the array column from nullable column
128
1
                column_array = assert_cast<const ColumnNullable*>(column_array.get())
129
1
                                       ->get_nested_column_ptr();
130
131
                // get the nested type from nullable type
132
1
                type_array = assert_cast<const DataTypeNullable*>(array_column_type_name.type.get())
133
1
                                     ->get_nested_type();
134
135
                // need to union nullmap from all columns
136
1
                VectorizedUtils::update_null_map(outside_null_map->get_data(),
137
1
                                                 column_array_nullmap->get_data());
138
1
            }
139
140
            // here is the array column
141
18
            const auto& col_array = assert_cast<const ColumnArray&>(*column_array);
142
143
18
            if (i == 0) {
144
16
                nested_array_column_rows = col_array.get_data_ptr()->size();
145
16
                first_array_offsets = col_array.get_offsets_ptr();
146
16
                array_column_offset = first_array_offsets;
147
16
                args_info.offsets_ptr = &col_array.get_offsets();
148
16
            } else {
149
                // select array_map((x,y)->x+y,c_array1,[0,1,2,3]) from array_test2;
150
                // c_array1: [0,1,2,3,4,5,6,7,8,9]
151
2
                const auto& array_offsets =
152
2
                        assert_cast<const ColumnArray::ColumnOffsets&>(*first_array_offsets)
153
2
                                .get_data();
154
2
                if (nested_array_column_rows != col_array.get_data_ptr()->size() ||
155
2
                    (!array_offsets.empty() &&
156
2
                     memcmp(array_offsets.data(), col_array.get_offsets().data(),
157
2
                            sizeof(array_offsets[0]) * array_offsets.size()) != 0)) {
158
0
                    return Status::InvalidArgument(
159
0
                            "in array map function, the input column size "
160
0
                            "are "
161
0
                            "not equal completely, nested column data rows 1st size is {}, {}th "
162
0
                            "size is {}.",
163
0
                            nested_array_column_rows, i + 1, col_array.get_data_ptr()->size());
164
0
                }
165
2
            }
166
18
            lambda_datas[i] = col_array.get_data_ptr();
167
18
            const auto& col_type = assert_cast<const DataTypeArray&>(*type_array);
168
18
            lambda_argument_types[i] = col_type.get_nested_type();
169
18
        }
170
16
        std::set<int> required_input_column_ids;
171
16
        children[0]->collect_slot_column_ids(required_input_column_ids);
172
16
        context->lambda_execution_context().collect_visible_binding_column_positions(
173
16
                required_input_column_ids);
174
16
        const int lambda_argument_base =
175
16
                required_input_column_ids.empty() ? 0 : *required_input_column_ids.rbegin() + 1;
176
16
        if (!_lambda_argument_binding.bind_by_name) {
177
1
            RETURN_IF_ERROR(
178
1
                    _set_legacy_lambda_argument_gap(children[0]->get_child(0), lambda_argument_base,
179
1
                                                    _lambda_argument_binding.argument_size));
180
1
        }
181
16
        std::vector<std::string> names(lambda_argument_base);
182
16
        DataTypes data_types(lambda_argument_base);
183
16
        std::vector<bool> materialized_input_columns(lambda_argument_base, false);
184
16
        names.reserve(lambda_argument_base + arguments.size());
185
16
        data_types.reserve(lambda_argument_base + arguments.size());
186
16
        for (int column_id : required_input_column_ids) {
187
12
            if (column_id < 0 || static_cast<size_t>(column_id) >= block->columns()) {
188
0
                return Status::InternalError(
189
0
                        "array_map lambda input column id {} is outside input block, block={}",
190
0
                        column_id, block->dump_structure());
191
0
            }
192
12
            materialized_input_columns[column_id] = true;
193
12
            names[column_id] = block->get_by_position(column_id).name;
194
12
            data_types[column_id] = block->get_by_position(column_id).type;
195
12
        }
196
37
        for (int i = 0; i < lambda_argument_base; ++i) {
197
21
            if (!materialized_input_columns[i]) {
198
                // Keep sparse input positions stable for SlotRef/parent lambda bindings without
199
                // materializing unrelated wide-table columns into every lambda batch.
200
9
                names[i] = "temp";
201
9
                data_types[i] = std::make_shared<DataTypeUInt8>();
202
9
            }
203
21
        }
204
34
        for (int i = 0; i < arguments.size(); ++i) {
205
18
            const auto& array_column_type_name = arguments[i];
206
18
            if (_lambda_argument_binding.bind_by_name &&
207
18
                i < _lambda_argument_binding.names.size()) {
208
16
                names.push_back(_lambda_argument_binding.names[i]);
209
16
            } else {
210
2
                names.push_back("R" + array_column_type_name.name);
211
2
            }
212
18
            data_types.push_back(lambda_argument_types[i]);
213
18
        }
214
215
16
        LambdaExecutionContext::Frame lambda_frame;
216
16
        lambda_frame.bind_by_name = _lambda_argument_binding.bind_by_name;
217
16
        lambda_frame.parent_bindings_visible = true;
218
33
        for (int i = 0; i < _lambda_argument_binding.argument_size; ++i) {
219
17
            const int column_position = lambda_argument_base + i;
220
17
            if (_lambda_argument_binding.bind_by_name) {
221
16
                lambda_frame.argument_bindings.push_back(
222
16
                        {_lambda_argument_binding.names[i], column_position});
223
16
            }
224
17
        }
225
16
        LambdaExecutionContext::FrameGuard lambda_frame_guard(context->lambda_execution_context(),
226
16
                                                              std::move(lambda_frame));
227
228
        // if column_array is NULL, we know the array_data_column will not write any data,
229
        // so the column is empty. eg : (x) -> concat('|',x + "1"). if still execute the lambda function, will cause the bolck rows are not equal
230
        // the x column is empty, but "|" is const literal, size of column is 1, so the block rows is 1, but the x column is empty, will be coredump.
231
18
        if (std::ranges::any_of(lambda_datas, [](const auto& v) { return v->empty(); })) {
232
0
            DataTypePtr nested_type;
233
0
            bool is_nullable = result_type->is_nullable();
234
0
            if (is_nullable) {
235
0
                nested_type =
236
0
                        assert_cast<const DataTypeNullable*>(result_type.get())->get_nested_type();
237
0
            } else {
238
0
                nested_type = result_type;
239
0
            }
240
0
            auto empty_nested_column = assert_cast<const DataTypeArray*>(nested_type.get())
241
0
                                               ->get_nested_type()
242
0
                                               ->create_column();
243
0
            auto result_array_column =
244
0
                    ColumnArray::create(std::move(empty_nested_column), array_column_offset);
245
246
0
            if (is_nullable) {
247
0
                result_column = ColumnNullable::create(std::move(result_array_column),
248
0
                                                       std::move(outside_null_map));
249
0
            } else {
250
0
                result_column = std::move(result_array_column);
251
0
            }
252
253
0
            return Status::OK();
254
0
        }
255
256
16
        MutableColumnPtr result_col = nullptr;
257
16
        DataTypePtr res_type;
258
259
        //process first row
260
16
        args_info.array_start = (*args_info.offsets_ptr)[args_info.current_row_idx - 1];
261
16
        args_info.cur_size =
262
16
                (*args_info.offsets_ptr)[args_info.current_row_idx] - args_info.array_start;
263
264
        // lambda block to exectute the lambda, and reuse the memory
265
16
        Block lambda_block;
266
16
        auto column_size = names.size();
267
16
        MutableColumns columns(column_size);
268
16
        do {
269
16
            bool mem_reuse = lambda_block.mem_reuse();
270
55
            for (int i = 0; i < column_size; i++) {
271
39
                if (mem_reuse) {
272
0
                    columns[i] = lambda_block.get_by_position(i).column->assert_mutable();
273
39
                } else {
274
39
                    columns[i] = data_types[i]->create_column();
275
39
                }
276
39
            }
277
            // batch_size of array nested data every time inorder to avoid memory overflow
278
30
            while (columns[lambda_argument_base]->size() < batch_size) {
279
30
                long max_step = batch_size - columns[lambda_argument_base]->size();
280
30
                long current_step = std::min(
281
30
                        max_step, (long)(args_info.cur_size - args_info.current_offset_in_array));
282
30
                size_t pos = args_info.array_start + args_info.current_offset_in_array;
283
64
                for (int i = 0; i < arguments.size() && current_step > 0; ++i) {
284
34
                    columns[lambda_argument_base + i]->insert_range_from(*lambda_datas[i], pos,
285
34
                                                                         current_step);
286
34
                }
287
30
                args_info.current_offset_in_array += current_step;
288
30
                args_info.current_repeat_times += current_step;
289
30
                if (args_info.current_offset_in_array >= args_info.cur_size) {
290
30
                    args_info.current_row_eos = true;
291
30
                }
292
30
                _repeat_input_columns(columns, block, args_info.current_repeat_times,
293
30
                                      materialized_input_columns, args_info.current_row_idx);
294
30
                args_info.current_repeat_times = 0;
295
30
                if (args_info.current_row_eos) {
296
                    //current row is end of array, move to next row
297
30
                    args_info.current_row_idx++;
298
30
                    args_info.current_offset_in_array = 0;
299
30
                    if (args_info.current_row_idx >= count) {
300
16
                        break;
301
16
                    }
302
14
                    args_info.current_row_eos = false;
303
14
                    args_info.array_start = (*args_info.offsets_ptr)[args_info.current_row_idx - 1];
304
14
                    args_info.cur_size = (*args_info.offsets_ptr)[args_info.current_row_idx] -
305
14
                                         args_info.array_start;
306
14
                }
307
30
            }
308
309
16
            if (!mem_reuse) {
310
55
                for (int i = 0; i < column_size; ++i) {
311
39
                    lambda_block.insert(
312
39
                            ColumnWithTypeAndName(std::move(columns[i]), data_types[i], names[i]));
313
39
                }
314
16
            }
315
            //3. child[0]->execute(new_block)
316
317
16
            ColumnPtr res_col;
318
            // lambda body executes on the internal lambda_block, not the original block.
319
            // The outer expr_selector is irrelevant here, so pass nullptr.
320
16
            RETURN_IF_ERROR(children[0]->execute_column(context, &lambda_block, nullptr,
321
16
                                                        lambda_block.rows(), res_col));
322
16
            res_col = res_col->convert_to_full_column_if_const();
323
16
            res_type = children[0]->execute_type(&lambda_block);
324
325
16
            if (!result_col) {
326
16
                result_col = res_col->clone_empty();
327
16
            }
328
16
            result_col->insert_range_from(*res_col, 0, res_col->size());
329
16
            lambda_block.clear_column_data(column_size);
330
16
        } while (args_info.current_row_idx < count);
331
332
        //4. get the result column after execution, reassemble it into a new array column, and return.
333
16
        if (result_type->is_nullable()) {
334
0
            if (res_type->is_nullable()) {
335
0
                result_column = ColumnNullable::create(
336
0
                        ColumnArray::create(std::move(result_col), array_column_offset),
337
0
                        std::move(outside_null_map));
338
0
            } else {
339
                // deal with eg: select array_map(x -> x is null, [null, 1, 2]);
340
                // need to create the nested column null map for column array
341
0
                auto nested_null_map = ColumnUInt8::create(result_col->size(), 0);
342
343
0
                result_column = ColumnNullable::create(
344
0
                        ColumnArray::create(ColumnNullable::create(std::move(result_col),
345
0
                                                                   std::move(nested_null_map)),
346
0
                                            array_column_offset),
347
0
                        std::move(outside_null_map));
348
0
            }
349
16
        } else {
350
16
            if (res_type->is_nullable()) {
351
4
                result_column = ColumnArray::create(std::move(result_col), array_column_offset);
352
12
            } else {
353
12
                auto nested_null_map = ColumnUInt8::create(result_col->size(), 0);
354
355
12
                result_column = ColumnArray::create(
356
12
                        ColumnNullable::create(std::move(result_col), std::move(nested_null_map)),
357
12
                        array_column_offset);
358
12
            }
359
16
        }
360
16
        return Status::OK();
361
16
    }
362
363
private:
364
    struct LambdaArgumentBinding {
365
        bool bind_by_name = true;
366
        size_t argument_size = 0;
367
        std::vector<std::string> names;
368
    };
369
370
    Status _prepare_lambda_argument_binding(const VExprSPtr& expr, size_t expected_argument_size,
371
27
                                            LambdaArgumentBinding& argument_binding) const {
372
27
        DORIS_CHECK_EQ(expr->node_type(), TExprNodeType::LAMBDA_FUNCTION_EXPR);
373
27
        const auto* lambda_expr = assert_cast<const VLambdaFunctionExpr*>(expr.get());
374
375
27
        argument_binding.argument_size = 0;
376
27
        argument_binding.names.clear();
377
27
        argument_binding.bind_by_name = lambda_expr->has_argument_names();
378
379
27
        if (!argument_binding.bind_by_name) {
380
5
            if (_contains_nested_lambda_call(expr->get_child(0))) {
381
1
                return Status::InternalError(
382
1
                        "Cannot resolve nested lambda argument without lambda metadata");
383
1
            }
384
4
            argument_binding.argument_size = expected_argument_size;
385
4
            argument_binding.names.resize(expected_argument_size);
386
4
            return Status::OK();
387
5
        }
388
389
22
        argument_binding.names = lambda_expr->argument_names();
390
22
        if (argument_binding.names.size() > expected_argument_size) {
391
0
            return Status::InternalError(
392
0
                    "lambda argument metadata size exceeds parameter size, maximum={}, actual={}",
393
0
                    expected_argument_size, argument_binding.names.size());
394
0
        }
395
22
        argument_binding.argument_size = argument_binding.names.size();
396
22
        if (std::ranges::any_of(argument_binding.names,
397
23
                                [](const auto& argument_name) { return argument_name.empty(); })) {
398
0
            return Status::InternalError("lambda argument metadata contains empty name");
399
0
        }
400
22
        return Status::OK();
401
22
    }
402
403
    Status _set_legacy_lambda_argument_gap(const VExprSPtr& expr, int lambda_argument_base,
404
1
                                           size_t argument_size) const {
405
1
        if (expr->is_column_ref()) {
406
1
            auto* ref = static_cast<VColumnRef*>(expr.get());
407
1
            if (ref->column_id() >= 0 && static_cast<size_t>(ref->column_id()) < argument_size) {
408
1
                const int argument_index = ref->column_id();
409
1
                ref->set_gap(lambda_argument_base + argument_index - ref->column_id());
410
1
            }
411
1
            return Status::OK();
412
1
        }
413
414
0
        for (const auto& child : expr->children()) {
415
0
            RETURN_IF_ERROR(
416
0
                    _set_legacy_lambda_argument_gap(child, lambda_argument_base, argument_size));
417
0
        }
418
0
        return Status::OK();
419
0
    }
420
421
9
    bool _is_lambda_call_with_lambda_expr(const VExprSPtr& expr) const {
422
9
        return expr->node_type() == TExprNodeType::LAMBDA_FUNCTION_CALL_EXPR &&
423
9
               !expr->children().empty() &&
424
9
               expr->children()[0]->node_type() == TExprNodeType::LAMBDA_FUNCTION_EXPR;
425
9
    }
426
427
9
    bool _contains_nested_lambda_call(const VExprSPtr& expr) const {
428
9
        if (_is_lambda_call_with_lambda_expr(expr)) {
429
1
            return true;
430
1
        }
431
8
        return std::ranges::any_of(expr->children(), [this](const auto& child) {
432
4
            return _contains_nested_lambda_call(child);
433
4
        });
434
9
    }
435
436
    void _repeat_input_columns(std::vector<MutableColumnPtr>& columns, const Block* block,
437
                               int repeat_times,
438
                               const std::vector<bool>& materialized_input_columns,
439
30
                               int64_t row_idx) const {
440
30
        if (!repeat_times || materialized_input_columns.empty()) {
441
10
            return;
442
10
        }
443
71
        for (size_t i = 0; i < materialized_input_columns.size(); i++) {
444
51
            if (!materialized_input_columns[i]) {
445
23
                columns[i]->resize(columns[i]->size() + repeat_times);
446
23
                continue;
447
23
            }
448
28
            DORIS_CHECK(block != nullptr);
449
28
            auto src_column = block->get_by_position(i).column->convert_to_full_column_if_const();
450
28
            if (check_and_get_column<ColumnNothing>(src_column.get())) {
451
                // A ColumnNothing in the outer block is a placeholder for an unmaterialized
452
                // virtual column. Keep it as a placeholder in the lambda block as well, so
453
                // VirtualSlotRef can still materialize it lazily if the lambda body reads it.
454
0
                if (!check_and_get_column<ColumnNothing>(columns[i].get())) {
455
0
                    columns[i] = ColumnNothing::create(columns[i]->size());
456
0
                }
457
0
            }
458
28
            columns[i]->insert_many_from(*src_column, row_idx, repeat_times);
459
28
        }
460
20
    }
461
462
    LambdaArgumentBinding _lambda_argument_binding;
463
};
464
465
1
void register_function_array_map(doris::LambdaFunctionFactory& factory) {
466
1
    factory.register_function<ArrayMapFunction>();
467
1
}
468
469
} // namespace doris