Coverage Report

Created: 2026-03-15 20:53

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exprs/function/function_java_udf.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exprs/function/function_java_udf.h"
19
20
#include <bthread/bthread.h>
21
22
#include <memory>
23
#include <string>
24
25
#include "common/exception.h"
26
#include "core/block/block.h"
27
#include "exec/connector/jni_connector.h"
28
#include "jni.h"
29
#include "runtime/exec_env.h"
30
#include "runtime/user_function_cache.h"
31
#include "util/jni-util.h"
32
33
const char* EXECUTOR_CLASS = "org/apache/doris/udf/UdfExecutor";
34
const char* EXECUTOR_CTOR_SIGNATURE = "([B)V";
35
const char* EXECUTOR_EVALUATE_SIGNATURE = "(Ljava/util/Map;Ljava/util/Map;)J";
36
const char* EXECUTOR_CLOSE_SIGNATURE = "()V";
37
38
namespace doris {
39
40
JavaFunctionCall::JavaFunctionCall(const TFunction& fn, const DataTypes& argument_types,
41
                                   const DataTypePtr& return_type)
42
0
        : fn_(fn), _argument_types(argument_types), _return_type(return_type) {}
43
44
0
Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::FunctionStateScope scope) {
45
0
    JNIEnv* env = nullptr;
46
0
    RETURN_IF_ERROR(Jni::Env::Get(&env));
47
48
0
    if (scope == FunctionContext::FunctionStateScope::THREAD_LOCAL) {
49
0
        SCOPED_TIMER(context->get_udf_execute_timer());
50
0
        std::shared_ptr<JniContext> jni_ctx = std::make_shared<JniContext>();
51
0
        context->set_function_state(FunctionContext::THREAD_LOCAL, jni_ctx);
52
53
0
        {
54
0
            std::string local_location;
55
0
            auto function_cache = UserFunctionCache::instance();
56
0
            TJavaUdfExecutorCtorParams ctor_params;
57
0
            ctor_params.__set_fn(fn_);
58
            // get jar path if both file path location and checksum are null
59
0
            if (!fn_.hdfs_location.empty() && !fn_.checksum.empty()) {
60
0
                RETURN_IF_ERROR(function_cache->get_jarpath(fn_.id, fn_.hdfs_location, fn_.checksum,
61
0
                                                            &local_location));
62
0
                ctor_params.__set_location(local_location);
63
0
            }
64
65
0
            RETURN_IF_ERROR(Jni::Util::find_class(env, EXECUTOR_CLASS, &jni_ctx->executor_cl));
66
67
0
            RETURN_IF_ERROR(jni_ctx->executor_cl.get_method(env, "<init>", EXECUTOR_CTOR_SIGNATURE,
68
0
                                                            &jni_ctx->executor_ctor_id));
69
0
            RETURN_IF_ERROR(jni_ctx->executor_cl.get_method(
70
0
                    env, "evaluate", EXECUTOR_EVALUATE_SIGNATURE, &jni_ctx->executor_evaluate_id));
71
0
            RETURN_IF_ERROR(jni_ctx->executor_cl.get_method(env, "close", EXECUTOR_CLOSE_SIGNATURE,
72
0
                                                            &jni_ctx->executor_close_id));
73
0
            Jni::LocalArray ctor_params_bytes;
74
0
            RETURN_IF_ERROR(Jni::Util::SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
75
0
            RETURN_IF_ERROR(jni_ctx->executor_cl.new_object(env, jni_ctx->executor_ctor_id)
76
0
                                    .with_arg(ctor_params_bytes)
77
0
                                    .call(&jni_ctx->executor));
78
0
        }
79
0
        jni_ctx->open_successes = true;
80
0
    }
81
0
    return Status::OK();
82
0
}
83
84
Status JavaFunctionCall::execute_impl(FunctionContext* context, Block& block,
85
                                      const ColumnNumbers& arguments, uint32_t result,
86
0
                                      size_t num_rows) const {
87
0
    JNIEnv* env = nullptr;
88
0
    RETURN_IF_ERROR(Jni::Env::Get(&env));
89
0
    auto* jni_ctx = reinterpret_cast<JniContext*>(
90
0
            context->get_function_state(FunctionContext::THREAD_LOCAL));
91
0
    SCOPED_TIMER(context->get_udf_execute_timer());
92
0
    std::unique_ptr<long[]> input_table;
93
0
    RETURN_IF_ERROR(JniConnector::to_java_table(&block, num_rows, arguments, input_table));
94
0
    auto input_table_schema = JniConnector::parse_table_schema(&block, arguments, true);
95
0
    std::map<String, String> input_params = {
96
0
            {"meta_address", std::to_string((long)input_table.get())},
97
0
            {"required_fields", input_table_schema.first},
98
0
            {"columns_types", input_table_schema.second}};
99
0
    Jni::LocalObject input_map;
100
101
0
    RETURN_IF_ERROR(Jni::Util::convert_to_java_map(env, input_params, &input_map));
102
0
    auto output_table_schema = JniConnector::parse_table_schema(&block, {result}, true);
103
0
    std::string output_nullable =
104
0
            block.get_by_position(result).type->is_nullable() ? "true" : "false";
105
0
    std::map<String, String> output_params = {{"is_nullable", output_nullable},
106
0
                                              {"required_fields", output_table_schema.first},
107
0
                                              {"columns_types", output_table_schema.second}};
108
0
    Jni::LocalObject output_map;
109
0
    RETURN_IF_ERROR(Jni::Util::convert_to_java_map(env, output_params, &output_map));
110
0
    long output_address = 0;
111
0
    RETURN_IF_ERROR(jni_ctx->executor.call_long_method(env, jni_ctx->executor_evaluate_id)
112
0
                            .with_arg(input_map)
113
0
                            .with_arg(output_map)
114
0
                            .call(&output_address));
115
116
0
    return JniConnector::fill_block(&block, {result}, output_address);
117
0
}
118
119
Status JavaFunctionCall::close(FunctionContext* context,
120
0
                               FunctionContext::FunctionStateScope scope) {
121
0
    auto close_func = [context]() {
122
0
        auto* jni_ctx = reinterpret_cast<JniContext*>(
123
0
                context->get_function_state(FunctionContext::THREAD_LOCAL));
124
        // JNIContext own some resource and its release method depend on JavaFunctionCall
125
        // has to release the resource before JavaFunctionCall is deconstructed.
126
0
        if (jni_ctx) {
127
0
            RETURN_IF_ERROR(jni_ctx->close());
128
0
        }
129
0
        return Status::OK();
130
0
    };
131
132
0
    if (bthread_self() == 0) {
133
0
        return close_func();
134
0
    } else {
135
0
        DorisMetrics::instance()->udf_close_bthread_count->increment(1);
136
        // Use the close_workers pthread pool to execute the close function
137
0
        auto task = std::make_shared<std::packaged_task<Status()>>(std::move(close_func));
138
0
        auto task_future = task->get_future();
139
0
        RETURN_IF_ERROR(ExecEnv::GetInstance()->udf_close_workers_pool()->submit_func(
140
0
                [task]() { (*task)(); }));
141
0
        RETURN_IF_ERROR(task_future.get());
142
0
        return Status::OK();
143
0
    }
144
0
}
145
} // namespace doris