Coverage Report

Created: 2026-03-30 20:29

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/udf/python/python_client.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <string_view>
21
22
#include <arrow/status.h>
23
24
#include "arrow/flight/client.h"
25
#include "common/status.h"
26
#include "format/arrow/arrow_utils.h"
27
#include "udf/python/python_udf_meta.h"
28
#include "udf/python/python_udf_runtime.h"
29
30
namespace doris {
31
32
/**
33
 * Base class for Python UDF/UDAF/UDTF clients
34
 * 
35
 * Provides common functionality for communicating with Python server via Arrow Flight:
36
 * - Connection management
37
 * - Stream initialization
38
 * - Error handling
39
 * - Process lifecycle management
40
 */
41
class PythonClient {
42
public:
43
    using FlightDescriptor = arrow::flight::FlightDescriptor;
44
    using FlightClient = arrow::flight::FlightClient;
45
    using FlightStreamWriter = arrow::flight::FlightStreamWriter;
46
    using FlightStreamReader = arrow::flight::FlightStreamReader;
47
48
6.38k
    PythonClient() = default;
49
6.37k
    virtual ~PythonClient() = default;
50
51
    /**
52
     * Initialize connection to Python server
53
     * @param func_meta Function metadata (contains client_type for operation name)
54
     * @param process Python process handle
55
     * @return Status
56
     */
57
    Status init(const PythonUDFMeta& func_meta, ProcessPtr process);
58
59
    /**
60
     * Close connection and cleanup resources
61
     * @return Status
62
     */
63
    Status close();
64
65
    /**
66
     * Handle Arrow Flight error
67
     * @param status Arrow status
68
     * @return Doris Status with formatted error message
69
     */
70
    Status handle_error(arrow::Status status);
71
72
    /**
73
     * Handle application-level error returned via Flight app_metadata.
74
     * @param msg Error message
75
     * @return Doris Status with formatted error message
76
     */
77
    Status handle_error_message(std::string_view msg);
78
79
    /**
80
     * Get process information for debugging
81
     * @return Process string representation
82
     */
83
1.29k
    std::string print_process() const { return _process ? _process->to_string() : "null"; }
84
85
    /**
86
     * Get the underlying Python process
87
     * @return Process pointer
88
     */
89
1.97k
    ProcessPtr get_process() const { return _process; }
90
91
protected:
92
    /**
93
     * Check whether a Flight chunk carries application-level error metadata.
94
     * @param chunk Flight response chunk
95
     * @return RuntimeError if metadata encodes a Python-side business error
96
     */
97
    Status handle_chunk_metadata(const arrow::flight::FlightStreamChunk& chunk);
98
99
    /**
100
     * Begin Flight stream with schema (called only once per stream)
101
     * @param schema Input schema
102
     * @return Status
103
     */
104
    Status begin_stream(const std::shared_ptr<arrow::Schema>& schema);
105
106
    /**
107
     * Write RecordBatch to server
108
     * @param input Input RecordBatch
109
     * @return Status
110
     */
111
    Status write_batch(const arrow::RecordBatch& input);
112
113
    /**
114
     * Read RecordBatch from server
115
     * @param output Output RecordBatch
116
     * @return Status
117
     */
118
    Status read_batch(std::shared_ptr<arrow::RecordBatch>* output);
119
120
    // Common state
121
    bool _inited = false;
122
    bool _begin = false;         // Track if Begin() has been called
123
    std::string _operation_name; // Operation name for error messages
124
    std::unique_ptr<FlightClient> _arrow_client;
125
    std::unique_ptr<FlightStreamWriter> _writer;
126
    std::unique_ptr<FlightStreamReader> _reader;
127
    ProcessPtr _process;
128
129
private:
130
    DISALLOW_COPY_AND_ASSIGN(PythonClient);
131
};
132
133
} // namespace doris