Coverage Report

Created: 2026-03-15 17:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exprs/table_function/python_udtf_function.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exprs/table_function/python_udtf_function.h"
19
20
#include <arrow/array.h>
21
#include <arrow/array/array_nested.h>
22
#include <arrow/record_batch.h>
23
#include <arrow/type_fwd.h>
24
#include <glog/logging.h>
25
26
#include "core/assert_cast.h"
27
#include "core/block/block.h"
28
#include "core/block/column_numbers.h"
29
#include "core/column/column.h"
30
#include "core/column/column_array.h"
31
#include "core/column/column_nullable.h"
32
#include "core/data_type/data_type_array.h"
33
#include "core/data_type/data_type_factory.hpp"
34
#include "core/data_type_serde/data_type_array_serde.h"
35
#include "exprs/function/array/function_array_utils.h"
36
#include "exprs/vexpr.h"
37
#include "exprs/vexpr_context.h"
38
#include "format/arrow/arrow_block_convertor.h"
39
#include "format/arrow/arrow_row_batch.h"
40
#include "format/arrow/arrow_utils.h"
41
#include "runtime/runtime_state.h"
42
#include "runtime/user_function_cache.h"
43
#include "udf/python/python_env.h"
44
#include "udf/python/python_server.h"
45
#include "udf/python/python_udf_meta.h"
46
#include "util/timezone_utils.h"
47
48
namespace doris {
49
#include "common/compile_check_begin.h"
50
51
0
PythonUDTFFunction::PythonUDTFFunction(const TFunction& t_fn) : TableFunction(), _t_fn(t_fn) {
52
0
    _fn_name = _t_fn.name.function_name;
53
0
    TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, _timezone_obj);
54
55
    // Like Java UDTF, FE passes the element type T, and we wrap it into array<T> here
56
    // This makes the behavior consistent with Java UDTF
57
0
    DataTypePtr element_type = DataTypeFactory::instance().create_data_type(t_fn.ret_type);
58
0
    _return_type = make_nullable(std::make_shared<DataTypeArray>(make_nullable(element_type)));
59
0
}
60
61
0
Status PythonUDTFFunction::open() {
62
0
    PythonUDFMeta python_udf_meta;
63
0
    python_udf_meta.id = _t_fn.id;
64
0
    python_udf_meta.name = _t_fn.name.function_name;
65
0
    python_udf_meta.symbol = _t_fn.scalar_fn.symbol;
66
67
0
    if (!_t_fn.function_code.empty()) {
68
0
        python_udf_meta.type = PythonUDFLoadType::INLINE;
69
0
        python_udf_meta.location = "inline";
70
0
        python_udf_meta.inline_code = _t_fn.function_code;
71
0
    } else if (!_t_fn.hdfs_location.empty()) {
72
0
        python_udf_meta.type = PythonUDFLoadType::MODULE;
73
0
        python_udf_meta.location = _t_fn.hdfs_location;
74
0
        python_udf_meta.checksum = _t_fn.checksum;
75
0
    } else {
76
0
        python_udf_meta.type = PythonUDFLoadType::UNKNOWN;
77
0
        python_udf_meta.location = "unknown";
78
0
    }
79
80
0
    python_udf_meta.client_type = PythonClientType::UDTF;
81
82
0
    if (python_udf_meta.type == PythonUDFLoadType::MODULE) {
83
0
        RETURN_IF_ERROR(UserFunctionCache::instance()->get_pypath(
84
0
                python_udf_meta.id, python_udf_meta.location, python_udf_meta.checksum,
85
0
                &python_udf_meta.location));
86
0
    }
87
88
0
    PythonVersion version;
89
0
    if (_t_fn.__isset.runtime_version && !_t_fn.runtime_version.empty()) {
90
0
        RETURN_IF_ERROR(
91
0
                PythonVersionManager::instance().get_version(_t_fn.runtime_version, &version));
92
0
        python_udf_meta.runtime_version = version.full_version;
93
0
    } else {
94
0
        return Status::InvalidArgument("Python UDTF runtime version is not set");
95
0
    }
96
97
0
    for (const auto& arg_type : _t_fn.arg_types) {
98
0
        DataTypePtr doris_type = DataTypeFactory::instance().create_data_type(arg_type);
99
0
        python_udf_meta.input_types.push_back(doris_type);
100
0
    }
101
102
    // For Python UDTF, FE passes the element type T (like Java UDTF)
103
    // Use it directly as the UDF's return type for Python metadata
104
0
    python_udf_meta.return_type = DataTypeFactory::instance().create_data_type(_t_fn.ret_type);
105
0
    python_udf_meta.always_nullable = python_udf_meta.return_type->is_nullable();
106
0
    RETURN_IF_ERROR(python_udf_meta.check());
107
108
0
    RETURN_IF_ERROR(
109
0
            PythonServerManager::instance().get_client(python_udf_meta, version, &_udtf_client));
110
111
0
    if (!_udtf_client) {
112
0
        return Status::InternalError("Failed to create Python UDTF client");
113
0
    }
114
115
0
    return Status::OK();
116
0
}
117
118
0
Status PythonUDTFFunction::process_init(Block* block, RuntimeState* state) {
119
    // Step 1: Extract input columns from child expressions
120
0
    auto child_size = _expr_context->root()->children().size();
121
0
    ColumnNumbers child_column_idxs;
122
0
    child_column_idxs.resize(child_size);
123
0
    for (int i = 0; i < child_size; ++i) {
124
0
        int result_id = -1;
125
0
        RETURN_IF_ERROR(_expr_context->root()->children()[i]->execute(_expr_context.get(), block,
126
0
                                                                      &result_id));
127
0
        DCHECK_NE(result_id, -1);
128
0
        child_column_idxs[i] = result_id;
129
0
    }
130
131
    // Step 2: Build input block and convert to Arrow format
132
0
    Block input_block;
133
0
    for (uint32_t i = 0; i < child_column_idxs.size(); ++i) {
134
0
        input_block.insert(block->get_by_position(child_column_idxs[i]));
135
0
    }
136
0
    std::shared_ptr<arrow::Schema> input_schema;
137
0
    std::shared_ptr<arrow::RecordBatch> input_batch;
138
0
    RETURN_IF_ERROR(get_arrow_schema_from_block(input_block, &input_schema,
139
0
                                                TimezoneUtils::default_time_zone));
140
0
    RETURN_IF_ERROR(convert_to_arrow_batch(input_block, input_schema, arrow::default_memory_pool(),
141
0
                                           &input_batch, _timezone_obj));
142
143
    // Step 3: Call Python UDTF to evaluate all rows at once (similar to Java UDTF's JNI call)
144
    // Python returns a ListArray where each element contains outputs for one input row
145
0
    std::shared_ptr<arrow::ListArray> list_array;
146
0
    RETURN_IF_ERROR(_udtf_client->evaluate(*input_batch, &list_array));
147
148
    // Step 4: Convert Python server output (ListArray) to Doris array column
149
0
    RETURN_IF_ERROR(_convert_list_array_to_array_column(list_array));
150
151
    // Step 5: Extract array column metadata using extract_column_array_info
152
0
    if (!extract_column_array_info(*_array_result_column, _array_column_detail)) {
153
0
        return Status::NotSupported("column type {} not supported now",
154
0
                                    _array_result_column->get_name());
155
0
    }
156
157
0
    return Status::OK();
158
0
}
159
160
0
void PythonUDTFFunction::process_row(size_t row_idx) {
161
0
    TableFunction::process_row(row_idx);
162
163
    // Check if array is null for this row
164
0
    if (!_array_column_detail.array_nullmap_data ||
165
0
        !_array_column_detail.array_nullmap_data[row_idx]) {
166
0
        _array_offset = (*_array_column_detail.offsets_ptr)[row_idx - 1];
167
0
        _cur_size = (*_array_column_detail.offsets_ptr)[row_idx] - _array_offset;
168
0
    }
169
    // When it's NULL at row_idx, _cur_size stays 0, meaning current_empty()
170
    // If outer function: will continue with insert_default
171
    // If not outer function: will not insert any value
172
0
}
173
174
0
void PythonUDTFFunction::process_close() {
175
0
    _array_result_column = nullptr;
176
0
    _array_column_detail.reset();
177
0
    _array_offset = 0;
178
0
}
179
180
0
void PythonUDTFFunction::get_same_many_values(MutableColumnPtr& column, int length) {
181
0
    size_t pos = _array_offset + _cur_offset;
182
0
    if (current_empty() || (_array_column_detail.nested_nullmap_data &&
183
0
                            _array_column_detail.nested_nullmap_data[pos])) {
184
0
        column->insert_many_defaults(length);
185
0
    } else {
186
0
        if (_is_nullable) {
187
0
            auto* nullable_column = assert_cast<ColumnNullable*>(column.get());
188
0
            auto nested_column = nullable_column->get_nested_column_ptr();
189
0
            auto nullmap_column = nullable_column->get_null_map_column_ptr();
190
0
            nested_column->insert_many_from(*_array_column_detail.nested_col, pos, length);
191
0
            assert_cast<ColumnUInt8*>(nullmap_column.get())->insert_many_defaults(length);
192
0
        } else {
193
0
            column->insert_many_from(*_array_column_detail.nested_col, pos, length);
194
0
        }
195
0
    }
196
0
}
197
198
0
int PythonUDTFFunction::get_value(MutableColumnPtr& column, int max_step) {
199
0
    max_step = std::min(max_step, (int)(_cur_size - _cur_offset));
200
0
    size_t pos = _array_offset + _cur_offset;
201
202
0
    if (current_empty()) {
203
0
        column->insert_default();
204
0
        max_step = 1;
205
0
    } else {
206
0
        if (_is_nullable) {
207
0
            auto* nullable_column = assert_cast<ColumnNullable*>(column.get());
208
0
            auto nested_column = nullable_column->get_nested_column_ptr();
209
0
            auto* nullmap_column =
210
0
                    assert_cast<ColumnUInt8*>(nullable_column->get_null_map_column_ptr().get());
211
212
0
            nested_column->insert_range_from(*_array_column_detail.nested_col, pos, max_step);
213
0
            size_t old_size = nullmap_column->size();
214
0
            nullmap_column->resize(old_size + max_step);
215
0
            memcpy(nullmap_column->get_data().data() + old_size,
216
0
                   _array_column_detail.nested_nullmap_data + pos * sizeof(UInt8),
217
0
                   max_step * sizeof(UInt8));
218
0
        } else {
219
0
            column->insert_range_from(*_array_column_detail.nested_col, pos, max_step);
220
0
        }
221
0
    }
222
0
    forward(max_step);
223
0
    return max_step;
224
0
}
225
226
0
Status PythonUDTFFunction::close() {
227
    // Close UDTF client
228
0
    if (_udtf_client) {
229
0
        Status status = _udtf_client->close();
230
0
        if (!status.ok()) {
231
0
            LOG(WARNING) << "Failed to close UDTF client: " << status.to_string();
232
0
        }
233
0
        _udtf_client.reset();
234
0
    }
235
236
0
    return TableFunction::close();
237
0
}
238
239
Status PythonUDTFFunction::_convert_list_array_to_array_column(
240
0
        const std::shared_ptr<arrow::ListArray>& list_array) {
241
0
    if (!list_array) {
242
0
        return Status::InternalError("Received null ListArray from Python UDTF");
243
0
    }
244
245
0
    size_t num_input_rows = list_array->length();
246
247
    // Handle nullable array column
248
0
    MutableColumnPtr array_col_ptr = _return_type->create_column();
249
0
    ColumnNullable* nullable_col = nullptr;
250
0
    ColumnArray* array_col = nullptr;
251
252
0
    if (_return_type->is_nullable()) {
253
0
        nullable_col = assert_cast<ColumnNullable*>(array_col_ptr.get());
254
0
        array_col = assert_cast<ColumnArray*>(
255
0
                nullable_col->get_nested_column_ptr()->assume_mutable().get());
256
0
    } else {
257
0
        array_col = assert_cast<ColumnArray*>(array_col_ptr.get());
258
0
    }
259
260
    // Create DataTypeArraySerDe for direct Arrow conversion
261
0
    DataTypePtr element_type = DataTypeFactory::instance().create_data_type(_t_fn.ret_type);
262
0
    DataTypePtr array_type = std::make_shared<DataTypeArray>(make_nullable(element_type));
263
0
    auto array_serde = array_type->get_serde();
264
265
    // Use read_column_from_arrow for optimized conversion
266
    // This directly converts Arrow ListArray to Doris ColumnArray
267
    // No struct unwrapping needed - Python server sends the correct format!
268
0
    RETURN_IF_ERROR(array_serde->read_column_from_arrow(
269
0
            array_col->assume_mutable_ref(), list_array.get(), 0, num_input_rows, _timezone_obj));
270
271
    // Handle nullable wrapper: all array elements are non-null
272
    // (empty arrays [] are non-null, different from NULL)
273
0
    if (nullable_col) {
274
0
        auto& null_map = nullable_col->get_null_map_data();
275
0
        null_map.resize_fill(num_input_rows, 0); // All non-null
276
0
    }
277
278
0
    _array_result_column = std::move(array_col_ptr);
279
0
    return Status::OK();
280
0
}
281
282
#include "common/compile_check_end.h"
283
} // namespace doris