Coverage Report

Created: 2026-03-12 16:03

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exprs/function/function_rpc.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exprs/function/function_rpc.h"
19
20
#include <brpc/controller.h>
21
#include <fmt/format.h>
22
#include <gen_cpp/function_service.pb.h>
23
#include <gen_cpp/types.pb.h>
24
25
#include <algorithm>
26
#include <memory>
27
#include <utility>
28
29
#include "common/status.h"
30
#include "core/column/column.h"
31
#include "core/data_type_serde/data_type_serde.h"
32
#include "runtime/exec_env.h"
33
#include "util/brpc_client_cache.h"
34
35
namespace doris {
36
#include "common/compile_check_begin.h"
37
0
RPCFnImpl::RPCFnImpl(const TFunction& fn) : _fn(fn) {
38
0
    _function_name = _fn.scalar_fn.symbol;
39
0
    _server_addr = _fn.hdfs_location;
40
0
    _client = ExecEnv::GetInstance()->brpc_function_client_cache()->get_client(_server_addr);
41
0
    _signature = fmt::format("{}: [{}/{}]", _fn.name.function_name, _fn.hdfs_location,
42
0
                             _fn.scalar_fn.symbol);
43
0
}
44
45
Status RPCFnImpl::vec_call(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
46
0
                           uint32_t result, size_t input_rows_count) {
47
0
    PFunctionCallRequest request;
48
0
    PFunctionCallResponse response;
49
0
    if (_client == nullptr) {
50
0
        return Status::InternalError(
51
0
                "call to rpc function {} failed: init rpc error, server addr = {}", _signature,
52
0
                _server_addr);
53
0
    }
54
0
    request.set_function_name(_function_name);
55
0
    RETURN_IF_ERROR(_convert_block_to_proto(block, arguments, input_rows_count, &request));
56
0
    brpc::Controller cntl;
57
0
    _client->fn_call(&cntl, &request, &response, nullptr);
58
0
    if (cntl.Failed()) {
59
0
        return Status::InternalError("call to rpc function {} failed: {}", _signature,
60
0
                                     cntl.ErrorText());
61
0
    }
62
0
    if (!response.has_status() || response.result_size() == 0) {
63
0
        return Status::InternalError("call rpc function {} failed: status or result is not set.",
64
0
                                     _signature);
65
0
    }
66
0
    if (response.status().status_code() != 0) {
67
0
        return Status::InternalError("call to rpc function {} failed: {}", _signature,
68
0
                                     response.status().DebugString());
69
0
    }
70
0
    RETURN_IF_ERROR(_convert_to_block(block, response.result(0), result));
71
0
    return Status::OK();
72
0
}
73
74
Status RPCFnImpl::_convert_block_to_proto(Block& block, const ColumnNumbers& arguments,
75
0
                                          size_t input_rows_count, PFunctionCallRequest* request) {
76
0
    size_t row_count = std::min(block.rows(), input_rows_count);
77
0
    for (size_t col_idx : arguments) {
78
0
        PValues* arg = request->add_args();
79
0
        ColumnWithTypeAndName& column = block.get_by_position(col_idx);
80
0
        arg->set_has_null(column.column->has_null(0, row_count));
81
0
        auto col = column.column->convert_to_full_column_if_const();
82
0
        RETURN_IF_ERROR(column.type->get_serde()->write_column_to_pb(*col, *arg, 0, row_count));
83
0
    }
84
0
    return Status::OK();
85
0
}
86
87
0
Status RPCFnImpl::_convert_to_block(Block& block, const PValues& result, size_t pos) {
88
0
    auto data_type = block.get_data_type(pos);
89
0
    auto col = data_type->create_column();
90
0
    auto serde = data_type->get_serde();
91
0
    RETURN_IF_ERROR(serde->read_column_from_pb(*col, result));
92
0
    block.replace_by_position(pos, std::move(col));
93
0
    return Status::OK();
94
0
}
95
96
FunctionRPC::FunctionRPC(const TFunction& fn, const DataTypes& argument_types,
97
                         const DataTypePtr& return_type)
98
0
        : _argument_types(argument_types), _return_type(return_type), _tfn(fn) {}
99
100
0
Status FunctionRPC::open(FunctionContext* context, FunctionContext::FunctionStateScope scope) {
101
0
    if (scope == FunctionContext::FRAGMENT_LOCAL) {
102
0
        std::shared_ptr<RPCFnImpl> fn = std::make_shared<RPCFnImpl>(_tfn);
103
0
        if (!fn->available()) {
104
0
            return Status::InternalError("rpc env init error");
105
0
        }
106
0
        context->set_function_state(FunctionContext::FRAGMENT_LOCAL, fn);
107
0
    }
108
0
    return Status::OK();
109
0
}
110
} // namespace doris