/root/doris/be/src/service/internal_service.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <brpc/closure_guard.h> |
21 | | #include <gen_cpp/internal_service.pb.h> |
22 | | |
23 | | #include <string> |
24 | | |
25 | | #include "common/status.h" |
26 | | #include "util/work_thread_pool.hpp" |
27 | | |
28 | | namespace google::protobuf { |
29 | | class Closure; |
30 | | class RpcController; |
31 | | } // namespace google::protobuf |
32 | | |
33 | | namespace doris { |
34 | | |
35 | | class StorageEngine; |
36 | | class ExecEnv; |
37 | | class PHandShakeRequest; |
38 | | class PHandShakeResponse; |
39 | | class RuntimeState; |
40 | | |
41 | | template <typename T> |
42 | | concept CanCancel = requires(T* response) { response->mutable_status(); }; |
43 | | |
44 | | template <typename T> |
45 | 0 | void offer_failed(T* response, google::protobuf::Closure* done, const FifoThreadPool& pool) { |
46 | 0 | brpc::ClosureGuard closure_guard(done); |
47 | 0 | LOG(WARNING) << "fail to offer request to the work pool, pool=" << pool.get_info(); |
48 | 0 | } Unexecuted instantiation: _ZN5doris12offer_failedINS_25PTabletWriterCancelResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_14PCacheResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_17PFetchCacheResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE |
49 | | |
50 | | template <CanCancel T> |
51 | 0 | void offer_failed(T* response, google::protobuf::Closure* done, const FifoThreadPool& pool) { |
52 | 0 | brpc::ClosureGuard closure_guard(done); |
53 | | // Should use status to generate protobuf message, because it will encoding Backend Info |
54 | | // into the error message and then we could know which backend's pool is full. |
55 | 0 | Status st = Status::Error<TStatusCode::CANCELLED>( |
56 | 0 | "fail to offer request to the work pool, pool={}", pool.get_info()); |
57 | 0 | st.to_protobuf(response->mutable_status()); |
58 | 0 | LOG(WARNING) << "cancelled due to fail to offer request to the work pool, pool=" |
59 | 0 | << pool.get_info(); |
60 | 0 | } Unexecuted instantiation: _ZN5doris12offer_failedINS_23PTabletWriterOpenResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_23PExecPlanFragmentResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_23POpenLoadStreamResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_27PTabletWriterAddBlockResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_25PCancelPlanFragmentResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_16PFetchDataResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_21PFetchArrowDataResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_26POutfileWriteSuccessResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_23PFetchTableSchemaResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_29PFetchArrowFlightSchemaResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_24PTabletKeyLookupResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_25PJdbcTestConnectionResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_20PFetchColIdsResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_26PFetchRemoteSchemaResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_12PProxyResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_20PMergeFilterResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_23PSendFilterSizeResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_23PSyncFilterSizeResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_22PPublishFilterResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_15PSendDataResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_13PCommitResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_15PRollbackResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_19PConstantExprResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_19PTransmitDataResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_24PCheckRPCChannelResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_24PResetRPCChannelResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_23PTabletWriteSlaveResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_27PTabletWriteSlaveDoneResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_17PMultiGetResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_13PGlobResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_26PGroupCommitInsertResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_24PGetWalQueueSizeResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE Unexecuted instantiation: _ZN5doris12offer_failedINS_22PGetBeResourceResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE |
61 | | |
62 | | class PInternalService : public PBackendService { |
63 | | public: |
64 | | PInternalService(ExecEnv* exec_env); |
65 | | ~PInternalService() override; |
66 | | |
67 | | void transmit_data(::google::protobuf::RpcController* controller, |
68 | | const ::doris::PTransmitDataParams* request, |
69 | | ::doris::PTransmitDataResult* response, |
70 | | ::google::protobuf::Closure* done) override; |
71 | | |
72 | | void transmit_data_by_http(::google::protobuf::RpcController* controller, |
73 | | const ::doris::PEmptyRequest* request, |
74 | | ::doris::PTransmitDataResult* response, |
75 | | ::google::protobuf::Closure* done) override; |
76 | | |
77 | | void exec_plan_fragment(google::protobuf::RpcController* controller, |
78 | | const PExecPlanFragmentRequest* request, |
79 | | PExecPlanFragmentResult* result, |
80 | | google::protobuf::Closure* done) override; |
81 | | |
82 | | void exec_plan_fragment_prepare(google::protobuf::RpcController* controller, |
83 | | const PExecPlanFragmentRequest* request, |
84 | | PExecPlanFragmentResult* result, |
85 | | google::protobuf::Closure* done) override; |
86 | | |
87 | | void exec_plan_fragment_start(google::protobuf::RpcController* controller, |
88 | | const PExecPlanFragmentStartRequest* request, |
89 | | PExecPlanFragmentResult* result, |
90 | | google::protobuf::Closure* done) override; |
91 | | |
92 | | void cancel_plan_fragment(google::protobuf::RpcController* controller, |
93 | | const PCancelPlanFragmentRequest* request, |
94 | | PCancelPlanFragmentResult* result, |
95 | | google::protobuf::Closure* done) override; |
96 | | |
97 | | void fetch_data(google::protobuf::RpcController* controller, const PFetchDataRequest* request, |
98 | | PFetchDataResult* result, google::protobuf::Closure* done) override; |
99 | | |
100 | | void fetch_arrow_data(google::protobuf::RpcController* controller, |
101 | | const PFetchArrowDataRequest* request, PFetchArrowDataResult* result, |
102 | | google::protobuf::Closure* done) override; |
103 | | |
104 | | void outfile_write_success(google::protobuf::RpcController* controller, |
105 | | const POutfileWriteSuccessRequest* request, |
106 | | POutfileWriteSuccessResult* result, |
107 | | google::protobuf::Closure* done) override; |
108 | | |
109 | | void fetch_table_schema(google::protobuf::RpcController* controller, |
110 | | const PFetchTableSchemaRequest* request, |
111 | | PFetchTableSchemaResult* result, |
112 | | google::protobuf::Closure* done) override; |
113 | | |
114 | | void fetch_arrow_flight_schema(google::protobuf::RpcController* controller, |
115 | | const PFetchArrowFlightSchemaRequest* request, |
116 | | PFetchArrowFlightSchemaResult* result, |
117 | | google::protobuf::Closure* done) override; |
118 | | |
119 | | void tablet_writer_open(google::protobuf::RpcController* controller, |
120 | | const PTabletWriterOpenRequest* request, |
121 | | PTabletWriterOpenResult* response, |
122 | | google::protobuf::Closure* done) override; |
123 | | |
124 | | void open_load_stream(google::protobuf::RpcController* controller, |
125 | | const POpenLoadStreamRequest* request, POpenLoadStreamResponse* response, |
126 | | google::protobuf::Closure* done) override; |
127 | | |
128 | | void tablet_writer_add_block(google::protobuf::RpcController* controller, |
129 | | const PTabletWriterAddBlockRequest* request, |
130 | | PTabletWriterAddBlockResult* response, |
131 | | google::protobuf::Closure* done) override; |
132 | | |
133 | | void tablet_writer_add_block_by_http(google::protobuf::RpcController* controller, |
134 | | const ::doris::PEmptyRequest* request, |
135 | | PTabletWriterAddBlockResult* response, |
136 | | google::protobuf::Closure* done) override; |
137 | | |
138 | | void tablet_writer_cancel(google::protobuf::RpcController* controller, |
139 | | const PTabletWriterCancelRequest* request, |
140 | | PTabletWriterCancelResult* response, |
141 | | google::protobuf::Closure* done) override; |
142 | | |
143 | | void get_info(google::protobuf::RpcController* controller, const PProxyRequest* request, |
144 | | PProxyResult* response, google::protobuf::Closure* done) override; |
145 | | |
146 | | void update_cache(google::protobuf::RpcController* controller, |
147 | | const PUpdateCacheRequest* request, PCacheResponse* response, |
148 | | google::protobuf::Closure* done) override; |
149 | | |
150 | | void fetch_cache(google::protobuf::RpcController* controller, const PFetchCacheRequest* request, |
151 | | PFetchCacheResult* result, google::protobuf::Closure* done) override; |
152 | | |
153 | | void clear_cache(google::protobuf::RpcController* controller, const PClearCacheRequest* request, |
154 | | PCacheResponse* response, google::protobuf::Closure* done) override; |
155 | | |
156 | | void merge_filter(::google::protobuf::RpcController* controller, |
157 | | const ::doris::PMergeFilterRequest* request, |
158 | | ::doris::PMergeFilterResponse* response, |
159 | | ::google::protobuf::Closure* done) override; |
160 | | |
161 | | void send_filter_size(::google::protobuf::RpcController* controller, |
162 | | const ::doris::PSendFilterSizeRequest* request, |
163 | | ::doris::PSendFilterSizeResponse* response, |
164 | | ::google::protobuf::Closure* done) override; |
165 | | |
166 | | void sync_filter_size(::google::protobuf::RpcController* controller, |
167 | | const ::doris::PSyncFilterSizeRequest* request, |
168 | | ::doris::PSyncFilterSizeResponse* response, |
169 | | ::google::protobuf::Closure* done) override; |
170 | | void apply_filterv2(::google::protobuf::RpcController* controller, |
171 | | const ::doris::PPublishFilterRequestV2* request, |
172 | | ::doris::PPublishFilterResponse* response, |
173 | | ::google::protobuf::Closure* done) override; |
174 | | void transmit_block(::google::protobuf::RpcController* controller, |
175 | | const ::doris::PTransmitDataParams* request, |
176 | | ::doris::PTransmitDataResult* response, |
177 | | ::google::protobuf::Closure* done) override; |
178 | | void transmit_block_by_http(::google::protobuf::RpcController* controller, |
179 | | const ::doris::PEmptyRequest* request, |
180 | | ::doris::PTransmitDataResult* response, |
181 | | ::google::protobuf::Closure* done) override; |
182 | | |
183 | | void send_data(google::protobuf::RpcController* controller, const PSendDataRequest* request, |
184 | | PSendDataResult* response, google::protobuf::Closure* done) override; |
185 | | void commit(google::protobuf::RpcController* controller, const PCommitRequest* request, |
186 | | PCommitResult* response, google::protobuf::Closure* done) override; |
187 | | void rollback(google::protobuf::RpcController* controller, const PRollbackRequest* request, |
188 | | PRollbackResult* response, google::protobuf::Closure* done) override; |
189 | | void fold_constant_expr(google::protobuf::RpcController* controller, |
190 | | const PConstantExprRequest* request, PConstantExprResult* response, |
191 | | google::protobuf::Closure* done) override; |
192 | | void check_rpc_channel(google::protobuf::RpcController* controller, |
193 | | const PCheckRPCChannelRequest* request, |
194 | | PCheckRPCChannelResponse* response, |
195 | | google::protobuf::Closure* done) override; |
196 | | void reset_rpc_channel(google::protobuf::RpcController* controller, |
197 | | const PResetRPCChannelRequest* request, |
198 | | PResetRPCChannelResponse* response, |
199 | | google::protobuf::Closure* done) override; |
200 | | void hand_shake(google::protobuf::RpcController* controller, const PHandShakeRequest* request, |
201 | | PHandShakeResponse* response, google::protobuf::Closure* done) override; |
202 | | |
203 | | void report_stream_load_status(google::protobuf::RpcController* controller, |
204 | | const PReportStreamLoadStatusRequest* request, |
205 | | PReportStreamLoadStatusResponse* response, |
206 | | google::protobuf::Closure* done) override; |
207 | | |
208 | | void glob(google::protobuf::RpcController* controller, const PGlobRequest* request, |
209 | | PGlobResponse* response, google::protobuf::Closure* done) override; |
210 | | |
211 | | void group_commit_insert(google::protobuf::RpcController* controller, |
212 | | const PGroupCommitInsertRequest* request, |
213 | | PGroupCommitInsertResponse* response, |
214 | | google::protobuf::Closure* done) override; |
215 | | |
216 | | void get_wal_queue_size(google::protobuf::RpcController* controller, |
217 | | const PGetWalQueueSizeRequest* request, |
218 | | PGetWalQueueSizeResponse* response, |
219 | | google::protobuf::Closure* done) override; |
220 | | |
221 | | void multiget_data(google::protobuf::RpcController* controller, const PMultiGetRequest* request, |
222 | | PMultiGetResponse* response, google::protobuf::Closure* done) override; |
223 | | |
224 | | void tablet_fetch_data(google::protobuf::RpcController* controller, |
225 | | const PTabletKeyLookupRequest* request, |
226 | | PTabletKeyLookupResponse* response, |
227 | | google::protobuf::Closure* done) override; |
228 | | |
229 | | void test_jdbc_connection(google::protobuf::RpcController* controller, |
230 | | const PJdbcTestConnectionRequest* request, |
231 | | PJdbcTestConnectionResult* result, |
232 | | google::protobuf::Closure* done) override; |
233 | | |
234 | | void fetch_remote_tablet_schema(google::protobuf::RpcController* controller, |
235 | | const PFetchRemoteSchemaRequest* request, |
236 | | PFetchRemoteSchemaResponse* response, |
237 | | google::protobuf::Closure* done) override; |
238 | | |
239 | | void get_be_resource(google::protobuf::RpcController* controller, |
240 | | const PGetBeResourceRequest* request, PGetBeResourceResponse* response, |
241 | | google::protobuf::Closure* done) override; |
242 | | |
243 | | private: |
244 | | void _exec_plan_fragment_in_pthread(google::protobuf::RpcController* controller, |
245 | | const PExecPlanFragmentRequest* request, |
246 | | PExecPlanFragmentResult* result, |
247 | | google::protobuf::Closure* done); |
248 | | |
249 | | Status _exec_plan_fragment_impl(const std::string& s_request, PFragmentRequestVersion version, |
250 | | bool compact, |
251 | | const std::function<void(RuntimeState*, Status*)>& cb = |
252 | | std::function<void(RuntimeState*, Status*)>()); |
253 | | |
254 | | Status _fold_constant_expr(const std::string& ser_request, PConstantExprResult* response); |
255 | | |
256 | | void _transmit_data(::google::protobuf::RpcController* controller, |
257 | | const ::doris::PTransmitDataParams* request, |
258 | | ::doris::PTransmitDataResult* response, ::google::protobuf::Closure* done, |
259 | | const Status& extract_st); |
260 | | |
261 | | void _transmit_block(::google::protobuf::RpcController* controller, |
262 | | const ::doris::PTransmitDataParams* request, |
263 | | ::doris::PTransmitDataResult* response, ::google::protobuf::Closure* done, |
264 | | const Status& extract_st, const int64_t wait_for_worker); |
265 | | |
266 | | Status _tablet_fetch_data(const PTabletKeyLookupRequest* request, |
267 | | PTabletKeyLookupResponse* response); |
268 | | |
269 | | protected: |
270 | | ExecEnv* _exec_env = nullptr; |
271 | | |
272 | | // every brpc service request should put into thread pool |
273 | | // the reason see issue #16634 |
274 | | // define the interface for reading and writing data as heavy interface |
275 | | // otherwise as light interface |
276 | | FifoThreadPool _heavy_work_pool; |
277 | | FifoThreadPool _light_work_pool; |
278 | | FifoThreadPool _arrow_flight_work_pool; |
279 | | }; |
280 | | |
281 | | // `StorageEngine` mixin for `PInternalService` |
282 | | class PInternalServiceImpl final : public PInternalService { |
283 | | public: |
284 | | PInternalServiceImpl(StorageEngine& engine, ExecEnv* exec_env); |
285 | | |
286 | | ~PInternalServiceImpl() override; |
287 | | void request_slave_tablet_pull_rowset(google::protobuf::RpcController* controller, |
288 | | const PTabletWriteSlaveRequest* request, |
289 | | PTabletWriteSlaveResult* response, |
290 | | google::protobuf::Closure* done) override; |
291 | | void response_slave_tablet_pull_rowset(google::protobuf::RpcController* controller, |
292 | | const PTabletWriteSlaveDoneRequest* request, |
293 | | PTabletWriteSlaveDoneResult* response, |
294 | | google::protobuf::Closure* done) override; |
295 | | |
296 | | void get_column_ids_by_tablet_ids(google::protobuf::RpcController* controller, |
297 | | const PFetchColIdsRequest* request, |
298 | | PFetchColIdsResponse* response, |
299 | | google::protobuf::Closure* done) override; |
300 | | |
301 | | void get_tablet_rowset_versions(google::protobuf::RpcController* controller, |
302 | | const PGetTabletVersionsRequest* request, |
303 | | PGetTabletVersionsResponse* response, |
304 | | google::protobuf::Closure* done) override; |
305 | | |
306 | | private: |
307 | | void _response_pull_slave_rowset(const std::string& remote_host, int64_t brpc_port, |
308 | | int64_t txn_id, int64_t tablet_id, int64_t node_id, |
309 | | bool is_succeed); |
310 | | Status _multi_get(const PMultiGetRequest& request, PMultiGetResponse* response); |
311 | | |
312 | | void _get_column_ids_by_tablet_ids(google::protobuf::RpcController* controller, |
313 | | const PFetchColIdsRequest* request, |
314 | | PFetchColIdsResponse* response, |
315 | | google::protobuf::Closure* done); |
316 | | |
317 | | StorageEngine& _engine; |
318 | | }; |
319 | | } // namespace doris |