Coverage Report

Created: 2026-03-12 16:03

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/udf/python/python_client.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "udf/python/python_client.h"
19
20
#include "arrow/flight/client.h"
21
#include "arrow/flight/server.h"
22
#include "common/compiler_util.h"
23
#include "common/config.h"
24
#include "common/status.h"
25
#include "format/arrow/arrow_utils.h"
26
#include "udf/python/python_udf_meta.h"
27
#include "udf/python/python_udf_runtime.h"
28
29
namespace doris {
30
31
5.78k
Status PythonClient::init(const PythonUDFMeta& func_meta, ProcessPtr process) {
32
5.78k
    if (_inited) {
33
0
        return Status::InternalError("PythonClient has already been initialized");
34
0
    }
35
36
    // Set operation name based on client type
37
5.78k
    switch (func_meta.client_type) {
38
1.25k
    case PythonClientType::UDF:
39
1.25k
        _operation_name = "Python UDF";
40
1.25k
        break;
41
3.04k
    case PythonClientType::UDAF:
42
3.04k
        _operation_name = "Python UDAF";
43
3.04k
        break;
44
1.47k
    case PythonClientType::UDTF:
45
1.47k
        _operation_name = "Python UDTF";
46
1.47k
        break;
47
0
    default:
48
0
        return Status::InternalError("Invalid Python client type");
49
5.78k
    }
50
51
    // Parse and connect to Python server location
52
5.78k
    arrow::flight::Location location;
53
5.78k
    RETURN_DORIS_STATUS_IF_RESULT_ERROR(location,
54
5.78k
                                        arrow::flight::Location::Parse(process->get_uri()));
55
5.78k
    RETURN_DORIS_STATUS_IF_RESULT_ERROR(_arrow_client, FlightClient::Connect(location));
56
57
    // Serialize function metadata to JSON command
58
5.78k
    std::string command;
59
5.78k
    RETURN_IF_ERROR(func_meta.serialize_to_json(&command));
60
61
    // Create Flight descriptor and establish bidirectional streaming
62
5.78k
    FlightDescriptor descriptor = FlightDescriptor::Command(command);
63
5.78k
    arrow::flight::FlightClient::DoExchangeResult exchange_res;
64
5.78k
    RETURN_DORIS_STATUS_IF_RESULT_ERROR(exchange_res, _arrow_client->DoExchange(descriptor));
65
66
5.78k
    _reader = std::move(exchange_res.reader);
67
5.78k
    _writer = std::move(exchange_res.writer);
68
5.78k
    _process = std::move(process);
69
5.78k
    _inited = true;
70
71
5.78k
    return Status::OK();
72
5.78k
}
73
74
6.09k
Status PythonClient::close() {
75
6.09k
    if (!_inited || !_writer) {
76
346
        return Status::OK();
77
346
    }
78
79
5.75k
    auto writer_res = _writer->Close();
80
5.75k
    if (!writer_res.ok()) {
81
        // Don't propagate error from close, just log it
82
0
        LOG(WARNING) << "Error closing Python client writer: " << writer_res.message();
83
0
    }
84
85
5.75k
    _inited = false;
86
5.75k
    _begin = false;
87
5.75k
    _arrow_client.reset();
88
5.75k
    _writer.reset();
89
5.75k
    _reader.reset();
90
5.75k
    _process.reset();
91
92
5.75k
    return Status::OK();
93
6.09k
}
94
95
2
Status PythonClient::handle_error(arrow::Status status) {
96
2
    DCHECK(!status.ok());
97
98
    // Clean up resources
99
2
    _writer.reset();
100
2
    _reader.reset();
101
102
    // Extract and clean error message
103
2
    std::string msg = status.message();
104
2
    LOG(ERROR) << _operation_name << " error: " << msg;
105
106
    // Remove Python traceback noise for cleaner error messages
107
2
    size_t pos = msg.find("The above exception was the direct cause");
108
2
    if (pos != std::string::npos) {
109
0
        msg = msg.substr(0, pos);
110
0
    }
111
112
2
    return Status::RuntimeError(trim(msg));
113
2
}
114
115
781
Status PythonClient::begin_stream(const std::shared_ptr<arrow::Schema>& schema) {
116
781
    if (UNLIKELY(!_begin)) {
117
779
        auto begin_res = _writer->Begin(schema);
118
779
        if (!begin_res.ok()) {
119
0
            return handle_error(begin_res);
120
0
        }
121
779
        _begin = true;
122
779
    }
123
781
    return Status::OK();
124
781
}
125
126
781
Status PythonClient::write_batch(const arrow::RecordBatch& input) {
127
781
    auto write_res = _writer->WriteRecordBatch(input);
128
781
    if (!write_res.ok()) {
129
0
        return handle_error(write_res);
130
0
    }
131
781
    return Status::OK();
132
781
}
133
134
781
Status PythonClient::read_batch(std::shared_ptr<arrow::RecordBatch>* output) {
135
781
    auto read_res = _reader->Next();
136
781
    if (!read_res.ok()) {
137
2
        return handle_error(read_res.status());
138
2
    }
139
140
779
    arrow::flight::FlightStreamChunk chunk = std::move(*read_res);
141
779
    if (!chunk.data) {
142
0
        return Status::InternalError("Received null RecordBatch from {} server", _operation_name);
143
0
    }
144
145
779
    *output = std::move(chunk.data);
146
779
    return Status::OK();
147
779
}
148
149
} // namespace doris