Coverage Report

Created: 2026-04-22 07:12

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exprs/function/function_java_udf.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exprs/function/function_java_udf.h"
19
20
#include <bthread/bthread.h>
21
22
#include <memory>
23
#include <string>
24
25
#include "common/exception.h"
26
#include "common/metrics/doris_metrics.h"
27
#include "core/block/block.h"
28
#include "format/jni/jni_data_bridge.h"
29
#include "jni.h"
30
#include "runtime/exec_env.h"
31
#include "runtime/user_function_cache.h"
32
#include "util/jni-util.h"
33
34
const char* EXECUTOR_CLASS = "org/apache/doris/udf/UdfExecutor";
35
const char* EXECUTOR_CTOR_SIGNATURE = "([B)V";
36
const char* EXECUTOR_EVALUATE_SIGNATURE = "(Ljava/util/Map;Ljava/util/Map;)J";
37
const char* EXECUTOR_CLOSE_SIGNATURE = "()V";
38
39
namespace doris {
40
41
JavaFunctionCall::JavaFunctionCall(const TFunction& fn, const DataTypes& argument_types,
42
                                   const DataTypePtr& return_type)
43
467
        : fn_(fn), _argument_types(argument_types), _return_type(return_type) {}
44
45
4.99k
Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::FunctionStateScope scope) {
46
4.99k
    JNIEnv* env = nullptr;
47
4.99k
    RETURN_IF_ERROR(Jni::Env::Get(&env));
48
49
4.99k
    if (scope == FunctionContext::FunctionStateScope::THREAD_LOCAL) {
50
4.53k
        SCOPED_TIMER(context->get_udf_execute_timer());
51
4.53k
        std::shared_ptr<JniContext> jni_ctx = std::make_shared<JniContext>();
52
4.53k
        context->set_function_state(FunctionContext::THREAD_LOCAL, jni_ctx);
53
54
4.53k
        {
55
4.53k
            std::string local_location;
56
4.53k
            auto function_cache = UserFunctionCache::instance();
57
4.53k
            TJavaUdfExecutorCtorParams ctor_params;
58
4.53k
            ctor_params.__set_fn(fn_);
59
            // get jar path if both file path location and checksum are null
60
4.53k
            if (!fn_.hdfs_location.empty() && !fn_.checksum.empty()) {
61
4.52k
                RETURN_IF_ERROR(function_cache->get_jarpath(fn_.id, fn_.hdfs_location, fn_.checksum,
62
4.52k
                                                            &local_location));
63
4.52k
                ctor_params.__set_location(local_location);
64
4.52k
            }
65
66
4.53k
            RETURN_IF_ERROR(Jni::Util::find_class(env, EXECUTOR_CLASS, &jni_ctx->executor_cl));
67
68
4.53k
            RETURN_IF_ERROR(jni_ctx->executor_cl.get_method(env, "<init>", EXECUTOR_CTOR_SIGNATURE,
69
4.53k
                                                            &jni_ctx->executor_ctor_id));
70
4.53k
            RETURN_IF_ERROR(jni_ctx->executor_cl.get_method(
71
4.53k
                    env, "evaluate", EXECUTOR_EVALUATE_SIGNATURE, &jni_ctx->executor_evaluate_id));
72
4.53k
            RETURN_IF_ERROR(jni_ctx->executor_cl.get_method(env, "close", EXECUTOR_CLOSE_SIGNATURE,
73
4.53k
                                                            &jni_ctx->executor_close_id));
74
4.53k
            Jni::LocalArray ctor_params_bytes;
75
4.53k
            RETURN_IF_ERROR(Jni::Util::SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
76
4.53k
            RETURN_IF_ERROR(jni_ctx->executor_cl.new_object(env, jni_ctx->executor_ctor_id)
77
4.53k
                                    .with_arg(ctor_params_bytes)
78
4.53k
                                    .call(&jni_ctx->executor));
79
4.53k
        }
80
4.53k
        jni_ctx->open_successes = true;
81
4.53k
    }
82
4.99k
    return Status::OK();
83
4.99k
}
84
85
Status JavaFunctionCall::execute_impl(FunctionContext* context, Block& block,
86
                                      const ColumnNumbers& arguments, uint32_t result,
87
2.14k
                                      size_t num_rows) const {
88
2.14k
    JNIEnv* env = nullptr;
89
2.14k
    RETURN_IF_ERROR(Jni::Env::Get(&env));
90
2.14k
    auto* jni_ctx = reinterpret_cast<JniContext*>(
91
2.14k
            context->get_function_state(FunctionContext::THREAD_LOCAL));
92
2.14k
    SCOPED_TIMER(context->get_udf_execute_timer());
93
2.14k
    std::unique_ptr<long[]> input_table;
94
2.14k
    RETURN_IF_ERROR(JniDataBridge::to_java_table(&block, num_rows, arguments, input_table));
95
2.14k
    auto input_table_schema = JniDataBridge::parse_table_schema(&block, arguments, true);
96
2.14k
    std::map<String, String> input_params = {
97
2.14k
            {"meta_address", std::to_string((long)input_table.get())},
98
2.14k
            {"required_fields", input_table_schema.first},
99
2.14k
            {"columns_types", input_table_schema.second}};
100
2.14k
    Jni::LocalObject input_map;
101
102
2.14k
    RETURN_IF_ERROR(Jni::Util::convert_to_java_map(env, input_params, &input_map));
103
2.14k
    auto output_table_schema = JniDataBridge::parse_table_schema(&block, {result}, true);
104
2.14k
    std::string output_nullable =
105
2.14k
            block.get_by_position(result).type->is_nullable() ? "true" : "false";
106
2.14k
    std::map<String, String> output_params = {{"is_nullable", output_nullable},
107
2.14k
                                              {"required_fields", output_table_schema.first},
108
2.14k
                                              {"columns_types", output_table_schema.second}};
109
2.14k
    Jni::LocalObject output_map;
110
2.14k
    RETURN_IF_ERROR(Jni::Util::convert_to_java_map(env, output_params, &output_map));
111
2.14k
    long output_address = 0;
112
2.14k
    RETURN_IF_ERROR(jni_ctx->executor.call_long_method(env, jni_ctx->executor_evaluate_id)
113
2.14k
                            .with_arg(input_map)
114
2.14k
                            .with_arg(output_map)
115
2.14k
                            .call(&output_address));
116
117
2.14k
    return JniDataBridge::fill_block(&block, {result}, output_address);
118
2.14k
}
119
120
Status JavaFunctionCall::close(FunctionContext* context,
121
5.04k
                               FunctionContext::FunctionStateScope scope) {
122
5.04k
    auto close_func = [context]() {
123
5.04k
        auto* jni_ctx = reinterpret_cast<JniContext*>(
124
5.04k
                context->get_function_state(FunctionContext::THREAD_LOCAL));
125
        // JNIContext own some resource and its release method depend on JavaFunctionCall
126
        // has to release the resource before JavaFunctionCall is deconstructed.
127
5.04k
        if (jni_ctx) {
128
5.04k
            RETURN_IF_ERROR(jni_ctx->close());
129
5.04k
        }
130
5.04k
        return Status::OK();
131
5.04k
    };
132
133
5.04k
    if (bthread_self() == 0) {
134
5.04k
        return close_func();
135
5.04k
    } else {
136
0
        DorisMetrics::instance()->udf_close_bthread_count->increment(1);
137
        // Use the close_workers pthread pool to execute the close function
138
0
        auto task = std::make_shared<std::packaged_task<Status()>>(std::move(close_func));
139
0
        auto task_future = task->get_future();
140
0
        RETURN_IF_ERROR(ExecEnv::GetInstance()->udf_close_workers_pool()->submit_func(
141
0
                [task]() { (*task)(); }));
142
0
        RETURN_IF_ERROR(task_future.get());
143
0
        return Status::OK();
144
0
    }
145
5.04k
}
146
} // namespace doris