Coverage Report

Created: 2026-03-13 12:15

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/udf/python/python_client.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "udf/python/python_client.h"
19
20
#include "arrow/flight/client.h"
21
#include "arrow/flight/server.h"
22
#include "common/compiler_util.h"
23
#include "common/config.h"
24
#include "common/status.h"
25
#include "format/arrow/arrow_utils.h"
26
#include "udf/python/python_udf_meta.h"
27
#include "udf/python/python_udf_runtime.h"
28
29
namespace doris {
30
31
0
Status PythonClient::init(const PythonUDFMeta& func_meta, ProcessPtr process) {
32
0
    if (_inited) {
33
0
        return Status::InternalError("PythonClient has already been initialized");
34
0
    }
35
36
    // Set operation name based on client type
37
0
    switch (func_meta.client_type) {
38
0
    case PythonClientType::UDF:
39
0
        _operation_name = "Python UDF";
40
0
        break;
41
0
    case PythonClientType::UDAF:
42
0
        _operation_name = "Python UDAF";
43
0
        break;
44
0
    case PythonClientType::UDTF:
45
0
        _operation_name = "Python UDTF";
46
0
        break;
47
0
    default:
48
0
        return Status::InternalError("Invalid Python client type");
49
0
    }
50
51
    // Parse and connect to Python server location
52
0
    arrow::flight::Location location;
53
0
    RETURN_DORIS_STATUS_IF_RESULT_ERROR(location,
54
0
                                        arrow::flight::Location::Parse(process->get_uri()));
55
0
    RETURN_DORIS_STATUS_IF_RESULT_ERROR(_arrow_client, FlightClient::Connect(location));
56
57
    // Serialize function metadata to JSON command
58
0
    std::string command;
59
0
    RETURN_IF_ERROR(func_meta.serialize_to_json(&command));
60
61
    // Create Flight descriptor and establish bidirectional streaming
62
0
    FlightDescriptor descriptor = FlightDescriptor::Command(command);
63
0
    arrow::flight::FlightClient::DoExchangeResult exchange_res;
64
0
    RETURN_DORIS_STATUS_IF_RESULT_ERROR(exchange_res, _arrow_client->DoExchange(descriptor));
65
66
0
    _reader = std::move(exchange_res.reader);
67
0
    _writer = std::move(exchange_res.writer);
68
0
    _process = std::move(process);
69
0
    _inited = true;
70
71
0
    return Status::OK();
72
0
}
73
74
0
Status PythonClient::close() {
75
0
    if (!_inited || !_writer) {
76
0
        return Status::OK();
77
0
    }
78
79
0
    auto writer_res = _writer->Close();
80
0
    if (!writer_res.ok()) {
81
        // Don't propagate error from close, just log it
82
0
        LOG(WARNING) << "Error closing Python client writer: " << writer_res.message();
83
0
    }
84
85
0
    _inited = false;
86
0
    _begin = false;
87
0
    _arrow_client.reset();
88
0
    _writer.reset();
89
0
    _reader.reset();
90
0
    _process.reset();
91
92
0
    return Status::OK();
93
0
}
94
95
0
Status PythonClient::handle_error(arrow::Status status) {
96
0
    DCHECK(!status.ok());
97
98
    // Clean up resources
99
0
    _writer.reset();
100
0
    _reader.reset();
101
102
    // Extract and clean error message
103
0
    std::string msg = status.message();
104
0
    LOG(ERROR) << _operation_name << " error: " << msg;
105
106
    // Remove Python traceback noise for cleaner error messages
107
0
    size_t pos = msg.find("The above exception was the direct cause");
108
0
    if (pos != std::string::npos) {
109
0
        msg = msg.substr(0, pos);
110
0
    }
111
112
0
    return Status::RuntimeError(trim(msg));
113
0
}
114
115
0
Status PythonClient::begin_stream(const std::shared_ptr<arrow::Schema>& schema) {
116
0
    if (UNLIKELY(!_begin)) {
117
0
        auto begin_res = _writer->Begin(schema);
118
0
        if (!begin_res.ok()) {
119
0
            return handle_error(begin_res);
120
0
        }
121
0
        _begin = true;
122
0
    }
123
0
    return Status::OK();
124
0
}
125
126
0
Status PythonClient::write_batch(const arrow::RecordBatch& input) {
127
0
    auto write_res = _writer->WriteRecordBatch(input);
128
0
    if (!write_res.ok()) {
129
0
        return handle_error(write_res);
130
0
    }
131
0
    return Status::OK();
132
0
}
133
134
0
Status PythonClient::read_batch(std::shared_ptr<arrow::RecordBatch>* output) {
135
0
    auto read_res = _reader->Next();
136
0
    if (!read_res.ok()) {
137
0
        return handle_error(read_res.status());
138
0
    }
139
140
0
    arrow::flight::FlightStreamChunk chunk = std::move(*read_res);
141
0
    if (!chunk.data) {
142
0
        return Status::InternalError("Received null RecordBatch from {} server", _operation_name);
143
0
    }
144
145
0
    *output = std::move(chunk.data);
146
0
    return Status::OK();
147
0
}
148
149
} // namespace doris