Coverage Report

Created: 2026-03-16 16:23

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exprs/function/function_java_udf.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exprs/function/function_java_udf.h"
19
20
#include <bthread/bthread.h>
21
22
#include <memory>
23
#include <string>
24
25
#include "common/exception.h"
26
#include "core/block/block.h"
27
#include "format/jni/jni_data_bridge.h"
28
#include "jni.h"
29
#include "runtime/exec_env.h"
30
#include "runtime/user_function_cache.h"
31
#include "util/jni-util.h"
32
33
const char* EXECUTOR_CLASS = "org/apache/doris/udf/UdfExecutor";
34
const char* EXECUTOR_CTOR_SIGNATURE = "([B)V";
35
const char* EXECUTOR_EVALUATE_SIGNATURE = "(Ljava/util/Map;Ljava/util/Map;)J";
36
const char* EXECUTOR_CLOSE_SIGNATURE = "()V";
37
38
namespace doris {
39
40
JavaFunctionCall::JavaFunctionCall(const TFunction& fn, const DataTypes& argument_types,
41
                                   const DataTypePtr& return_type)
42
467
        : fn_(fn), _argument_types(argument_types), _return_type(return_type) {}
43
44
5.48k
Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::FunctionStateScope scope) {
45
5.48k
    JNIEnv* env = nullptr;
46
5.48k
    RETURN_IF_ERROR(Jni::Env::Get(&env));
47
48
5.48k
    if (scope == FunctionContext::FunctionStateScope::THREAD_LOCAL) {
49
5.00k
        SCOPED_TIMER(context->get_udf_execute_timer());
50
5.00k
        std::shared_ptr<JniContext> jni_ctx = std::make_shared<JniContext>();
51
5.00k
        context->set_function_state(FunctionContext::THREAD_LOCAL, jni_ctx);
52
53
5.00k
        {
54
5.00k
            std::string local_location;
55
5.00k
            auto function_cache = UserFunctionCache::instance();
56
5.00k
            TJavaUdfExecutorCtorParams ctor_params;
57
5.00k
            ctor_params.__set_fn(fn_);
58
            // get jar path if both file path location and checksum are null
59
5.00k
            if (!fn_.hdfs_location.empty() && !fn_.checksum.empty()) {
60
4.95k
                RETURN_IF_ERROR(function_cache->get_jarpath(fn_.id, fn_.hdfs_location, fn_.checksum,
61
4.95k
                                                            &local_location));
62
4.95k
                ctor_params.__set_location(local_location);
63
4.95k
            }
64
65
5.00k
            RETURN_IF_ERROR(Jni::Util::find_class(env, EXECUTOR_CLASS, &jni_ctx->executor_cl));
66
67
5.00k
            RETURN_IF_ERROR(jni_ctx->executor_cl.get_method(env, "<init>", EXECUTOR_CTOR_SIGNATURE,
68
5.00k
                                                            &jni_ctx->executor_ctor_id));
69
5.00k
            RETURN_IF_ERROR(jni_ctx->executor_cl.get_method(
70
5.00k
                    env, "evaluate", EXECUTOR_EVALUATE_SIGNATURE, &jni_ctx->executor_evaluate_id));
71
5.00k
            RETURN_IF_ERROR(jni_ctx->executor_cl.get_method(env, "close", EXECUTOR_CLOSE_SIGNATURE,
72
5.00k
                                                            &jni_ctx->executor_close_id));
73
5.00k
            Jni::LocalArray ctor_params_bytes;
74
5.00k
            RETURN_IF_ERROR(Jni::Util::SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
75
5.00k
            RETURN_IF_ERROR(jni_ctx->executor_cl.new_object(env, jni_ctx->executor_ctor_id)
76
5.00k
                                    .with_arg(ctor_params_bytes)
77
5.00k
                                    .call(&jni_ctx->executor));
78
5.00k
        }
79
5.00k
        jni_ctx->open_successes = true;
80
5.00k
    }
81
5.48k
    return Status::OK();
82
5.48k
}
83
84
Status JavaFunctionCall::execute_impl(FunctionContext* context, Block& block,
85
                                      const ColumnNumbers& arguments, uint32_t result,
86
2.14k
                                      size_t num_rows) const {
87
2.14k
    JNIEnv* env = nullptr;
88
2.14k
    RETURN_IF_ERROR(Jni::Env::Get(&env));
89
2.14k
    auto* jni_ctx = reinterpret_cast<JniContext*>(
90
2.14k
            context->get_function_state(FunctionContext::THREAD_LOCAL));
91
2.14k
    SCOPED_TIMER(context->get_udf_execute_timer());
92
2.14k
    std::unique_ptr<long[]> input_table;
93
2.14k
    RETURN_IF_ERROR(JniDataBridge::to_java_table(&block, num_rows, arguments, input_table));
94
2.14k
    auto input_table_schema = JniDataBridge::parse_table_schema(&block, arguments, true);
95
2.14k
    std::map<String, String> input_params = {
96
2.14k
            {"meta_address", std::to_string((long)input_table.get())},
97
2.14k
            {"required_fields", input_table_schema.first},
98
2.14k
            {"columns_types", input_table_schema.second}};
99
2.14k
    Jni::LocalObject input_map;
100
101
2.14k
    RETURN_IF_ERROR(Jni::Util::convert_to_java_map(env, input_params, &input_map));
102
2.14k
    auto output_table_schema = JniDataBridge::parse_table_schema(&block, {result}, true);
103
2.14k
    std::string output_nullable =
104
2.14k
            block.get_by_position(result).type->is_nullable() ? "true" : "false";
105
2.14k
    std::map<String, String> output_params = {{"is_nullable", output_nullable},
106
2.14k
                                              {"required_fields", output_table_schema.first},
107
2.14k
                                              {"columns_types", output_table_schema.second}};
108
2.14k
    Jni::LocalObject output_map;
109
2.14k
    RETURN_IF_ERROR(Jni::Util::convert_to_java_map(env, output_params, &output_map));
110
2.14k
    long output_address = 0;
111
2.14k
    RETURN_IF_ERROR(jni_ctx->executor.call_long_method(env, jni_ctx->executor_evaluate_id)
112
2.14k
                            .with_arg(input_map)
113
2.14k
                            .with_arg(output_map)
114
2.14k
                            .call(&output_address));
115
116
2.14k
    return JniDataBridge::fill_block(&block, {result}, output_address);
117
2.14k
}
118
119
Status JavaFunctionCall::close(FunctionContext* context,
120
5.53k
                               FunctionContext::FunctionStateScope scope) {
121
5.53k
    auto close_func = [context]() {
122
5.53k
        auto* jni_ctx = reinterpret_cast<JniContext*>(
123
5.53k
                context->get_function_state(FunctionContext::THREAD_LOCAL));
124
        // JNIContext own some resource and its release method depend on JavaFunctionCall
125
        // has to release the resource before JavaFunctionCall is deconstructed.
126
5.53k
        if (jni_ctx) {
127
5.53k
            RETURN_IF_ERROR(jni_ctx->close());
128
5.53k
        }
129
5.53k
        return Status::OK();
130
5.53k
    };
131
132
5.53k
    if (bthread_self() == 0) {
133
5.53k
        return close_func();
134
18.4E
    } else {
135
18.4E
        DorisMetrics::instance()->udf_close_bthread_count->increment(1);
136
        // Use the close_workers pthread pool to execute the close function
137
18.4E
        auto task = std::make_shared<std::packaged_task<Status()>>(std::move(close_func));
138
18.4E
        auto task_future = task->get_future();
139
18.4E
        RETURN_IF_ERROR(ExecEnv::GetInstance()->udf_close_workers_pool()->submit_func(
140
18.4E
                [task]() { (*task)(); }));
141
18.4E
        RETURN_IF_ERROR(task_future.get());
142
18.4E
        return Status::OK();
143
18.4E
    }
144
5.53k
}
145
} // namespace doris