be/src/service/arrow_flight/flight_sql_service.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "service/arrow_flight/flight_sql_service.h" |
19 | | |
20 | | #include <absl/strings/str_split.h> |
21 | | #include <arrow/status.h> |
22 | | |
23 | | #include <memory> |
24 | | |
25 | | #include "arrow/flight/sql/server.h" |
26 | | #include "format/arrow/arrow_utils.h" |
27 | | #include "service/arrow_flight/arrow_flight_batch_reader.h" |
28 | | #include "service/arrow_flight/flight_sql_info.h" |
29 | | #include "service/backend_options.h" |
30 | | #include "util/uid_util.h" |
31 | | #include "util/url_coding.h" |
32 | | |
33 | | namespace doris::flight { |
34 | | |
35 | | class FlightSqlServer::Impl { |
36 | | private: |
37 | | // Create a Ticket that combines a sql and a query ID. |
38 | | arrow::Result<arrow::flight::Ticket> encode_ticket(const std::string& sql, |
39 | 0 | const std::string& query_id) { |
40 | 0 | std::string query = query_id; |
41 | 0 | query += ':'; |
42 | 0 | query += sql; |
43 | 0 | ARROW_ASSIGN_OR_RAISE(auto ticket, arrow::flight::sql::CreateStatementQueryTicket(query)); |
44 | 0 | return arrow::flight::Ticket {std::move(ticket)}; |
45 | 0 | } |
46 | | |
47 | 0 | arrow::Result<std::shared_ptr<QueryStatement>> decode_ticket(const std::string& ticket) { |
48 | 0 | std::vector<std::string> fields = absl::StrSplit(ticket, "&"); |
49 | 0 | if (fields.size() != 4) { |
50 | 0 | return arrow::Status::Invalid( |
51 | 0 | fmt::format("Malformed ticket: {}, size: {}", ticket, fields.size())); |
52 | 0 | } |
53 | | |
54 | 0 | std::vector<std::string> str = absl::StrSplit(fields[0], "-"); |
55 | 0 | if (str.size() != 2) { |
56 | 0 | return arrow::Status::Invalid("Malformed ticket: {}, missing query id: {}", ticket, |
57 | 0 | fields[0]); |
58 | 0 | } |
59 | | |
60 | 0 | TUniqueId queryid; |
61 | 0 | from_hex(&queryid.hi, str[0]); |
62 | 0 | from_hex(&queryid.lo, str[1]); |
63 | 0 | TNetworkAddress result_addr; |
64 | 0 | result_addr.hostname = fields[1]; |
65 | 0 | result_addr.port = std::stoi(fields[2]); |
66 | 0 | const std::string& sql_base64 = fields[3]; |
67 | 0 | std::string sql; |
68 | 0 | base64_decode(sql_base64, &sql); |
69 | 0 | std::shared_ptr<QueryStatement> statement = |
70 | 0 | std::make_shared<QueryStatement>(queryid, result_addr, sql); |
71 | 0 | return statement; |
72 | 0 | } |
73 | | |
74 | | public: |
75 | | explicit Impl() = default; |
76 | | |
77 | | ~Impl() = default; |
78 | | |
79 | | arrow::Result<std::unique_ptr<arrow::flight::FlightDataStream>> DoGetStatement( |
80 | | const arrow::flight::ServerCallContext& context, |
81 | 0 | const arrow::flight::sql::StatementQueryTicket& command) { |
82 | 0 | ARROW_ASSIGN_OR_RAISE(auto statement, decode_ticket(command.statement_handle)); |
83 | | // if IP:BrpcPort in the Ticket is not current BE node, |
84 | | // pulls the query result Block from the BE node specified by IP:BrpcPort, |
85 | | // converts it to Arrow Batch and returns it to ADBC client. |
86 | | // use brpc to transmit blocks between BEs. |
87 | 0 | if (statement->result_addr.hostname == BackendOptions::get_localhost() && |
88 | 0 | statement->result_addr.port == config::brpc_port) { |
89 | 0 | std::shared_ptr<ArrowFlightBatchLocalReader> reader; |
90 | 0 | ARROW_ASSIGN_OR_RAISE(reader, ArrowFlightBatchLocalReader::Create(statement)); |
91 | 0 | return std::make_unique<arrow::flight::RecordBatchStream>(reader); |
92 | 0 | } else { |
93 | 0 | std::shared_ptr<ArrowFlightBatchRemoteReader> reader; |
94 | 0 | ARROW_ASSIGN_OR_RAISE(reader, ArrowFlightBatchRemoteReader::Create(statement)); |
95 | 0 | return std::make_unique<arrow::flight::RecordBatchStream>(reader); |
96 | 0 | } |
97 | 0 | } |
98 | | }; |
99 | | |
100 | 0 | FlightSqlServer::FlightSqlServer(std::shared_ptr<Impl> impl) : _impl(std::move(impl)) {} |
101 | | |
102 | 0 | arrow::Result<std::shared_ptr<FlightSqlServer>> FlightSqlServer::create() { |
103 | 0 | std::shared_ptr<Impl> impl = std::make_shared<Impl>(); |
104 | |
|
105 | 0 | std::shared_ptr<FlightSqlServer> result(new FlightSqlServer(std::move(impl))); |
106 | 0 | for (const auto& id_to_result : GetSqlInfoResultMap()) { |
107 | 0 | result->RegisterSqlInfo(id_to_result.first, id_to_result.second); |
108 | 0 | } |
109 | |
|
110 | 0 | return result; |
111 | 0 | } |
112 | | |
113 | 0 | FlightSqlServer::~FlightSqlServer() { |
114 | 0 | static_cast<void>(join()); |
115 | 0 | } |
116 | | |
117 | | arrow::Result<std::unique_ptr<arrow::flight::FlightDataStream>> FlightSqlServer::DoGetStatement( |
118 | | const arrow::flight::ServerCallContext& context, |
119 | 0 | const arrow::flight::sql::StatementQueryTicket& command) { |
120 | 0 | return _impl->DoGetStatement(context, command); |
121 | 0 | } |
122 | | |
123 | 0 | Status FlightSqlServer::init(int port) { |
124 | 0 | if (port == -1) { |
125 | 0 | LOG(INFO) << "Arrow Flight Service not start"; |
126 | 0 | return Status::OK(); |
127 | 0 | } |
128 | 0 | _inited = true; |
129 | 0 | arrow::flight::Location bind_location; |
130 | 0 | RETURN_DORIS_STATUS_IF_ERROR( |
131 | 0 | arrow::flight::Location::ForGrpcTcp(BackendOptions::get_service_bind_address(), port) |
132 | 0 | .Value(&bind_location)); |
133 | 0 | arrow::flight::FlightServerOptions flight_options(bind_location); |
134 | | |
135 | | // Not authenticated in BE flight server. |
136 | | // After the authentication between the ADBC Client and the FE flight server is completed, |
137 | | // the FE flight server will put the query id in the Ticket and send it back to the Client. |
138 | | // When the Client uses the Ticket to fetch data from the BE flight server, the BE flight |
139 | | // server will verify the query id, this step is equivalent to authentication. |
140 | 0 | _header_middleware = std::make_shared<NoOpHeaderAuthServerMiddlewareFactory>(); |
141 | 0 | _bearer_middleware = std::make_shared<NoOpBearerAuthServerMiddlewareFactory>(); |
142 | 0 | flight_options.auth_handler = std::make_unique<arrow::flight::NoOpAuthHandler>(); |
143 | 0 | flight_options.middleware.push_back({"header-auth-server", _header_middleware}); |
144 | 0 | flight_options.middleware.push_back({"bearer-auth-server", _bearer_middleware}); |
145 | |
|
146 | 0 | RETURN_DORIS_STATUS_IF_ERROR(Init(flight_options)); |
147 | 0 | LOG(INFO) << "Arrow Flight Service bind to host: " << BackendOptions::get_service_bind_address() |
148 | 0 | << ", port: " << port; |
149 | 0 | return Status::OK(); |
150 | 0 | } |
151 | | |
152 | 0 | Status FlightSqlServer::join() { |
153 | 0 | if (!_inited) { |
154 | | // Flight not inited, not need shutdown |
155 | 0 | return Status::OK(); |
156 | 0 | } |
157 | 0 | RETURN_DORIS_STATUS_IF_ERROR(Shutdown()); |
158 | 0 | return Status::OK(); |
159 | 0 | } |
160 | | |
161 | | } // namespace doris::flight |