Coverage Report

Created: 2026-05-14 04:18

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/udf/python/python_udf_meta.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "udf/python/python_udf_meta.h"
19
20
#include <arrow/util/base64.h>
21
#include <fmt/core.h>
22
#include <rapidjson/stringbuffer.h>
23
#include <rapidjson/writer.h>
24
25
#include <sstream>
26
27
#include "common/status.h"
28
#include "format/arrow/arrow_utils.h"
29
#include "util/string_util.h"
30
31
namespace doris {
32
33
Status PythonUDFMeta::convert_types_to_schema(const DataTypes& types, const std::string& timezone,
34
9.94k
                                              std::shared_ptr<arrow::Schema>* schema) {
35
9.94k
    arrow::SchemaBuilder builder;
36
22.1k
    for (size_t i = 0; i < types.size(); ++i) {
37
12.2k
        std::shared_ptr<arrow::DataType> arrow_type;
38
12.2k
        RETURN_IF_ERROR(convert_to_arrow_type(types[i], &arrow_type, timezone));
39
12.2k
        std::shared_ptr<arrow::Field> field = std::make_shared<arrow::Field>(
40
12.2k
                "arg" + std::to_string(i), arrow_type, types[i]->is_nullable());
41
12.2k
        RETURN_DORIS_STATUS_IF_ERROR(builder.AddField(field));
42
12.2k
    }
43
9.94k
    RETURN_DORIS_STATUS_IF_RESULT_ERROR(schema, builder.Finish());
44
9.94k
    return Status::OK();
45
9.94k
}
46
47
Status PythonUDFMeta::serialize_arrow_schema(const std::shared_ptr<arrow::Schema>& schema,
48
9.94k
                                             std::shared_ptr<arrow::Buffer>* out) {
49
9.94k
    RETURN_DORIS_STATUS_IF_RESULT_ERROR(
50
9.94k
            out, arrow::ipc::SerializeSchema(*schema, arrow::default_memory_pool()));
51
9.94k
    return Status::OK();
52
9.94k
}
53
54
/*
55
    json format:
56
    {
57
        "name": "xxx",
58
        "id": 123,
59
        "symbol": "xxx",
60
        "location": "xxx",
61
        "udf_load_type": 0 or 1,
62
        "client_type": 0 (UDF) or 1 (UDAF) or 2 (UDTF),
63
        "runtime_version": "x.xx.xx",
64
        "always_nullable": true,
65
        "inline_code": "base64_inline_code",
66
        "input_types": "base64_input_types",
67
        "return_type": "base64_return_type"
68
    }
69
*/
70
4.97k
Status PythonUDFMeta::serialize_to_json(std::string* json_str) const {
71
4.97k
    rapidjson::Document doc;
72
4.97k
    doc.SetObject();
73
4.97k
    auto& allocator = doc.GetAllocator();
74
4.97k
    doc.AddMember("name", rapidjson::Value().SetString(name.c_str(), allocator), allocator);
75
4.97k
    doc.AddMember("id", rapidjson::Value().SetInt64(id), allocator);
76
4.97k
    doc.AddMember("symbol", rapidjson::Value().SetString(symbol.c_str(), allocator), allocator);
77
4.97k
    doc.AddMember("location", rapidjson::Value().SetString(location.c_str(), allocator), allocator);
78
4.97k
    doc.AddMember("udf_load_type", rapidjson::Value().SetInt(static_cast<int>(type)), allocator);
79
4.97k
    doc.AddMember("client_type", rapidjson::Value().SetInt(static_cast<int>(client_type)),
80
4.97k
                  allocator);
81
4.97k
    doc.AddMember("runtime_version",
82
4.97k
                  rapidjson::Value().SetString(runtime_version.c_str(), allocator), allocator);
83
4.97k
    doc.AddMember("always_nullable", rapidjson::Value().SetBool(always_nullable), allocator);
84
85
4.97k
    {
86
        // Serialize base64 inline code to json
87
4.97k
        std::string base64_str = arrow::util::base64_encode(inline_code);
88
4.97k
        doc.AddMember("inline_code", rapidjson::Value().SetString(base64_str.c_str(), allocator),
89
4.97k
                      allocator);
90
4.97k
    }
91
4.97k
    {
92
        // Serialize base64 input types to json
93
4.97k
        std::shared_ptr<arrow::Schema> input_schema;
94
4.97k
        RETURN_IF_ERROR(convert_types_to_schema(input_types, TimezoneUtils::default_time_zone,
95
4.97k
                                                &input_schema));
96
4.97k
        std::shared_ptr<arrow::Buffer> input_schema_buffer;
97
4.97k
        RETURN_IF_ERROR(serialize_arrow_schema(input_schema, &input_schema_buffer));
98
4.97k
        std::string base64_str =
99
4.97k
                arrow::util::base64_encode({input_schema_buffer->data_as<char>(),
100
4.97k
                                            static_cast<size_t>(input_schema_buffer->size())});
101
4.97k
        doc.AddMember("input_types", rapidjson::Value().SetString(base64_str.c_str(), allocator),
102
4.97k
                      allocator);
103
4.97k
    }
104
0
    {
105
        // Serialize base64 return type to json
106
4.97k
        std::shared_ptr<arrow::Schema> return_schema;
107
4.97k
        RETURN_IF_ERROR(convert_types_to_schema({return_type}, TimezoneUtils::default_time_zone,
108
4.97k
                                                &return_schema));
109
4.97k
        std::shared_ptr<arrow::Buffer> return_schema_buffer;
110
4.97k
        RETURN_IF_ERROR(serialize_arrow_schema(return_schema, &return_schema_buffer));
111
4.97k
        std::string base64_str =
112
4.97k
                arrow::util::base64_encode({return_schema_buffer->data_as<char>(),
113
4.97k
                                            static_cast<size_t>(return_schema_buffer->size())});
114
4.97k
        doc.AddMember("return_type", rapidjson::Value().SetString(base64_str.c_str(), allocator),
115
4.97k
                      allocator);
116
4.97k
    }
117
118
    // Convert document to json string
119
0
    rapidjson::StringBuffer buffer;
120
4.97k
    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
121
4.97k
    doc.Accept(writer);
122
4.97k
    *json_str = std::string(buffer.GetString(), buffer.GetSize());
123
4.97k
    return Status::OK();
124
4.97k
}
125
126
1.93k
std::string PythonUDFMeta::to_string() const {
127
1.93k
    std::stringstream input_types_ss;
128
1.93k
    input_types_ss << "<";
129
4.96k
    for (size_t i = 0; i < input_types.size(); ++i) {
130
3.02k
        input_types_ss << input_types[i]->get_name();
131
3.02k
        if (i != input_types.size() - 1) {
132
1.09k
            input_types_ss << ", ";
133
1.09k
        }
134
3.02k
    }
135
1.93k
    input_types_ss << ">";
136
1.93k
    return fmt::format(
137
1.93k
            "[name: {}, symbol: {}, location: {}, runtime_version: {}, always_nullable: {}, "
138
1.93k
            "inline_code: {}][input_types: {}][return_type: {}]",
139
1.93k
            name, symbol, location, runtime_version, always_nullable, inline_code,
140
1.93k
            input_types_ss.str(), return_type->get_name());
141
1.93k
}
142
143
2.92k
Status PythonUDFMeta::check() const {
144
2.92k
    if (trim(name).empty()) {
145
2
        return Status::InvalidArgument("Python UDF name is empty");
146
2
    }
147
148
2.92k
    if (trim(symbol).empty()) {
149
1
        return Status::InvalidArgument("Python UDF symbol is empty");
150
1
    }
151
152
2.92k
    if (trim(runtime_version).empty()) {
153
1
        return Status::InvalidArgument("Python UDF runtime version is empty");
154
1
    }
155
156
2.92k
    if (input_types.empty() &&
157
2.92k
        (client_type == PythonClientType::UDAF || type == PythonUDFLoadType::UNKNOWN)) {
158
1
        return Status::InvalidArgument("Python UDAF input types is empty");
159
1
    }
160
161
2.91k
    if (!return_type) {
162
1
        return Status::InvalidArgument("Python UDF return type is empty");
163
1
    }
164
165
2.91k
    if (type == PythonUDFLoadType::UNKNOWN) {
166
1
        return Status::InvalidArgument(
167
1
                "Python UDF load type is invalid, please check inline code or file path");
168
1
    }
169
170
2.91k
    if (type == PythonUDFLoadType::MODULE) {
171
1.42k
        if (trim(location).empty()) {
172
1
            return Status::InvalidArgument("Non-inline Python UDF location is empty");
173
1
        }
174
1.42k
        if (trim(checksum).empty()) {
175
1
            return Status::InvalidArgument("Non-inline Python UDF checksum is empty");
176
1
        }
177
1.42k
    }
178
179
2.91k
    return Status::OK();
180
2.91k
}
181
182
} // namespace doris