Coverage Report

Created: 2024-11-21 12:31

/root/doris/be/src/service/internal_service.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <brpc/closure_guard.h>
21
#include <gen_cpp/internal_service.pb.h>
22
23
#include <string>
24
25
#include "common/status.h"
26
#include "util/work_thread_pool.hpp"
27
28
namespace google::protobuf {
29
class Closure;
30
class RpcController;
31
} // namespace google::protobuf
32
33
namespace doris {
34
35
class StorageEngine;
36
class ExecEnv;
37
class PHandShakeRequest;
38
class PHandShakeResponse;
39
class RuntimeState;
40
41
template <typename T>
42
concept CanCancel = requires(T* response) { response->mutable_status(); };
43
44
template <typename T>
45
0
void offer_failed(T* response, google::protobuf::Closure* done, const FifoThreadPool& pool) {
46
0
    brpc::ClosureGuard closure_guard(done);
47
0
    LOG(WARNING) << "fail to offer request to the work pool, pool=" << pool.get_info();
48
0
}
Unexecuted instantiation: _ZN5doris12offer_failedINS_25PTabletWriterCancelResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_14PCacheResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_17PFetchCacheResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
49
50
template <CanCancel T>
51
0
void offer_failed(T* response, google::protobuf::Closure* done, const FifoThreadPool& pool) {
52
0
    brpc::ClosureGuard closure_guard(done);
53
    // Should use status to generate protobuf message, because it will encoding Backend Info
54
    // into the error message and then we could know which backend's pool is full.
55
0
    Status st = Status::Error<TStatusCode::CANCELLED>(
56
0
            "fail to offer request to the work pool, pool={}", pool.get_info());
57
0
    st.to_protobuf(response->mutable_status());
58
0
    LOG(WARNING) << "cancelled due to fail to offer request to the work pool, pool="
59
0
                 << pool.get_info();
60
0
}
Unexecuted instantiation: _ZN5doris12offer_failedINS_23PTabletWriterOpenResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_23PExecPlanFragmentResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_23POpenLoadStreamResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_27PTabletWriterAddBlockResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_25PCancelPlanFragmentResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_16PFetchDataResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_21PFetchArrowDataResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_26POutfileWriteSuccessResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_23PFetchTableSchemaResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_29PFetchArrowFlightSchemaResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_24PTabletKeyLookupResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_25PJdbcTestConnectionResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_20PFetchColIdsResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_26PFetchRemoteSchemaResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_12PProxyResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_20PMergeFilterResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_23PSendFilterSizeResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_23PSyncFilterSizeResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_22PPublishFilterResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_15PSendDataResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_13PCommitResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_15PRollbackResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_19PConstantExprResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_19PTransmitDataResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_24PCheckRPCChannelResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_24PResetRPCChannelResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_23PTabletWriteSlaveResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_27PTabletWriteSlaveDoneResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_17PMultiGetResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_13PGlobResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_26PGroupCommitInsertResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_24PGetWalQueueSizeResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_22PGetBeResourceResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
61
62
class PInternalService : public PBackendService {
63
public:
64
    PInternalService(ExecEnv* exec_env);
65
    ~PInternalService() override;
66
67
    void transmit_data(::google::protobuf::RpcController* controller,
68
                       const ::doris::PTransmitDataParams* request,
69
                       ::doris::PTransmitDataResult* response,
70
                       ::google::protobuf::Closure* done) override;
71
72
    void transmit_data_by_http(::google::protobuf::RpcController* controller,
73
                               const ::doris::PEmptyRequest* request,
74
                               ::doris::PTransmitDataResult* response,
75
                               ::google::protobuf::Closure* done) override;
76
77
    void exec_plan_fragment(google::protobuf::RpcController* controller,
78
                            const PExecPlanFragmentRequest* request,
79
                            PExecPlanFragmentResult* result,
80
                            google::protobuf::Closure* done) override;
81
82
    void exec_plan_fragment_prepare(google::protobuf::RpcController* controller,
83
                                    const PExecPlanFragmentRequest* request,
84
                                    PExecPlanFragmentResult* result,
85
                                    google::protobuf::Closure* done) override;
86
87
    void exec_plan_fragment_start(google::protobuf::RpcController* controller,
88
                                  const PExecPlanFragmentStartRequest* request,
89
                                  PExecPlanFragmentResult* result,
90
                                  google::protobuf::Closure* done) override;
91
92
    void cancel_plan_fragment(google::protobuf::RpcController* controller,
93
                              const PCancelPlanFragmentRequest* request,
94
                              PCancelPlanFragmentResult* result,
95
                              google::protobuf::Closure* done) override;
96
97
    void fetch_data(google::protobuf::RpcController* controller, const PFetchDataRequest* request,
98
                    PFetchDataResult* result, google::protobuf::Closure* done) override;
99
100
    void fetch_arrow_data(google::protobuf::RpcController* controller,
101
                          const PFetchArrowDataRequest* request, PFetchArrowDataResult* result,
102
                          google::protobuf::Closure* done) override;
103
104
    void outfile_write_success(google::protobuf::RpcController* controller,
105
                               const POutfileWriteSuccessRequest* request,
106
                               POutfileWriteSuccessResult* result,
107
                               google::protobuf::Closure* done) override;
108
109
    void fetch_table_schema(google::protobuf::RpcController* controller,
110
                            const PFetchTableSchemaRequest* request,
111
                            PFetchTableSchemaResult* result,
112
                            google::protobuf::Closure* done) override;
113
114
    void fetch_arrow_flight_schema(google::protobuf::RpcController* controller,
115
                                   const PFetchArrowFlightSchemaRequest* request,
116
                                   PFetchArrowFlightSchemaResult* result,
117
                                   google::protobuf::Closure* done) override;
118
119
    void tablet_writer_open(google::protobuf::RpcController* controller,
120
                            const PTabletWriterOpenRequest* request,
121
                            PTabletWriterOpenResult* response,
122
                            google::protobuf::Closure* done) override;
123
124
    void open_load_stream(google::protobuf::RpcController* controller,
125
                          const POpenLoadStreamRequest* request, POpenLoadStreamResponse* response,
126
                          google::protobuf::Closure* done) override;
127
128
    void tablet_writer_add_block(google::protobuf::RpcController* controller,
129
                                 const PTabletWriterAddBlockRequest* request,
130
                                 PTabletWriterAddBlockResult* response,
131
                                 google::protobuf::Closure* done) override;
132
133
    void tablet_writer_add_block_by_http(google::protobuf::RpcController* controller,
134
                                         const ::doris::PEmptyRequest* request,
135
                                         PTabletWriterAddBlockResult* response,
136
                                         google::protobuf::Closure* done) override;
137
138
    void tablet_writer_cancel(google::protobuf::RpcController* controller,
139
                              const PTabletWriterCancelRequest* request,
140
                              PTabletWriterCancelResult* response,
141
                              google::protobuf::Closure* done) override;
142
143
    void get_info(google::protobuf::RpcController* controller, const PProxyRequest* request,
144
                  PProxyResult* response, google::protobuf::Closure* done) override;
145
146
    void update_cache(google::protobuf::RpcController* controller,
147
                      const PUpdateCacheRequest* request, PCacheResponse* response,
148
                      google::protobuf::Closure* done) override;
149
150
    void fetch_cache(google::protobuf::RpcController* controller, const PFetchCacheRequest* request,
151
                     PFetchCacheResult* result, google::protobuf::Closure* done) override;
152
153
    void clear_cache(google::protobuf::RpcController* controller, const PClearCacheRequest* request,
154
                     PCacheResponse* response, google::protobuf::Closure* done) override;
155
156
    void merge_filter(::google::protobuf::RpcController* controller,
157
                      const ::doris::PMergeFilterRequest* request,
158
                      ::doris::PMergeFilterResponse* response,
159
                      ::google::protobuf::Closure* done) override;
160
161
    void send_filter_size(::google::protobuf::RpcController* controller,
162
                          const ::doris::PSendFilterSizeRequest* request,
163
                          ::doris::PSendFilterSizeResponse* response,
164
                          ::google::protobuf::Closure* done) override;
165
166
    void sync_filter_size(::google::protobuf::RpcController* controller,
167
                          const ::doris::PSyncFilterSizeRequest* request,
168
                          ::doris::PSyncFilterSizeResponse* response,
169
                          ::google::protobuf::Closure* done) override;
170
    void apply_filterv2(::google::protobuf::RpcController* controller,
171
                        const ::doris::PPublishFilterRequestV2* request,
172
                        ::doris::PPublishFilterResponse* response,
173
                        ::google::protobuf::Closure* done) override;
174
    void transmit_block(::google::protobuf::RpcController* controller,
175
                        const ::doris::PTransmitDataParams* request,
176
                        ::doris::PTransmitDataResult* response,
177
                        ::google::protobuf::Closure* done) override;
178
    void transmit_block_by_http(::google::protobuf::RpcController* controller,
179
                                const ::doris::PEmptyRequest* request,
180
                                ::doris::PTransmitDataResult* response,
181
                                ::google::protobuf::Closure* done) override;
182
183
    void send_data(google::protobuf::RpcController* controller, const PSendDataRequest* request,
184
                   PSendDataResult* response, google::protobuf::Closure* done) override;
185
    void commit(google::protobuf::RpcController* controller, const PCommitRequest* request,
186
                PCommitResult* response, google::protobuf::Closure* done) override;
187
    void rollback(google::protobuf::RpcController* controller, const PRollbackRequest* request,
188
                  PRollbackResult* response, google::protobuf::Closure* done) override;
189
    void fold_constant_expr(google::protobuf::RpcController* controller,
190
                            const PConstantExprRequest* request, PConstantExprResult* response,
191
                            google::protobuf::Closure* done) override;
192
    void check_rpc_channel(google::protobuf::RpcController* controller,
193
                           const PCheckRPCChannelRequest* request,
194
                           PCheckRPCChannelResponse* response,
195
                           google::protobuf::Closure* done) override;
196
    void reset_rpc_channel(google::protobuf::RpcController* controller,
197
                           const PResetRPCChannelRequest* request,
198
                           PResetRPCChannelResponse* response,
199
                           google::protobuf::Closure* done) override;
200
    void hand_shake(google::protobuf::RpcController* controller, const PHandShakeRequest* request,
201
                    PHandShakeResponse* response, google::protobuf::Closure* done) override;
202
203
    void report_stream_load_status(google::protobuf::RpcController* controller,
204
                                   const PReportStreamLoadStatusRequest* request,
205
                                   PReportStreamLoadStatusResponse* response,
206
                                   google::protobuf::Closure* done) override;
207
208
    void glob(google::protobuf::RpcController* controller, const PGlobRequest* request,
209
              PGlobResponse* response, google::protobuf::Closure* done) override;
210
211
    void group_commit_insert(google::protobuf::RpcController* controller,
212
                             const PGroupCommitInsertRequest* request,
213
                             PGroupCommitInsertResponse* response,
214
                             google::protobuf::Closure* done) override;
215
216
    void get_wal_queue_size(google::protobuf::RpcController* controller,
217
                            const PGetWalQueueSizeRequest* request,
218
                            PGetWalQueueSizeResponse* response,
219
                            google::protobuf::Closure* done) override;
220
221
    void multiget_data(google::protobuf::RpcController* controller, const PMultiGetRequest* request,
222
                       PMultiGetResponse* response, google::protobuf::Closure* done) override;
223
224
    void tablet_fetch_data(google::protobuf::RpcController* controller,
225
                           const PTabletKeyLookupRequest* request,
226
                           PTabletKeyLookupResponse* response,
227
                           google::protobuf::Closure* done) override;
228
229
    void test_jdbc_connection(google::protobuf::RpcController* controller,
230
                              const PJdbcTestConnectionRequest* request,
231
                              PJdbcTestConnectionResult* result,
232
                              google::protobuf::Closure* done) override;
233
234
    void fetch_remote_tablet_schema(google::protobuf::RpcController* controller,
235
                                    const PFetchRemoteSchemaRequest* request,
236
                                    PFetchRemoteSchemaResponse* response,
237
                                    google::protobuf::Closure* done) override;
238
239
    void get_be_resource(google::protobuf::RpcController* controller,
240
                         const PGetBeResourceRequest* request, PGetBeResourceResponse* response,
241
                         google::protobuf::Closure* done) override;
242
243
private:
244
    void _exec_plan_fragment_in_pthread(google::protobuf::RpcController* controller,
245
                                        const PExecPlanFragmentRequest* request,
246
                                        PExecPlanFragmentResult* result,
247
                                        google::protobuf::Closure* done);
248
249
    Status _exec_plan_fragment_impl(const std::string& s_request, PFragmentRequestVersion version,
250
                                    bool compact,
251
                                    const std::function<void(RuntimeState*, Status*)>& cb =
252
                                            std::function<void(RuntimeState*, Status*)>());
253
254
    Status _fold_constant_expr(const std::string& ser_request, PConstantExprResult* response);
255
256
    void _transmit_data(::google::protobuf::RpcController* controller,
257
                        const ::doris::PTransmitDataParams* request,
258
                        ::doris::PTransmitDataResult* response, ::google::protobuf::Closure* done,
259
                        const Status& extract_st);
260
261
    void _transmit_block(::google::protobuf::RpcController* controller,
262
                         const ::doris::PTransmitDataParams* request,
263
                         ::doris::PTransmitDataResult* response, ::google::protobuf::Closure* done,
264
                         const Status& extract_st, const int64_t wait_for_worker);
265
266
    Status _tablet_fetch_data(const PTabletKeyLookupRequest* request,
267
                              PTabletKeyLookupResponse* response);
268
269
protected:
270
    ExecEnv* _exec_env = nullptr;
271
272
    // every brpc service request should put into thread pool
273
    // the reason see issue #16634
274
    // define the interface for reading and writing data as heavy interface
275
    // otherwise as light interface
276
    FifoThreadPool _heavy_work_pool;
277
    FifoThreadPool _light_work_pool;
278
    FifoThreadPool _arrow_flight_work_pool;
279
};
280
281
// `StorageEngine` mixin for `PInternalService`
282
class PInternalServiceImpl final : public PInternalService {
283
public:
284
    PInternalServiceImpl(StorageEngine& engine, ExecEnv* exec_env);
285
286
    ~PInternalServiceImpl() override;
287
    void request_slave_tablet_pull_rowset(google::protobuf::RpcController* controller,
288
                                          const PTabletWriteSlaveRequest* request,
289
                                          PTabletWriteSlaveResult* response,
290
                                          google::protobuf::Closure* done) override;
291
    void response_slave_tablet_pull_rowset(google::protobuf::RpcController* controller,
292
                                           const PTabletWriteSlaveDoneRequest* request,
293
                                           PTabletWriteSlaveDoneResult* response,
294
                                           google::protobuf::Closure* done) override;
295
296
    void get_column_ids_by_tablet_ids(google::protobuf::RpcController* controller,
297
                                      const PFetchColIdsRequest* request,
298
                                      PFetchColIdsResponse* response,
299
                                      google::protobuf::Closure* done) override;
300
301
    void get_tablet_rowset_versions(google::protobuf::RpcController* controller,
302
                                    const PGetTabletVersionsRequest* request,
303
                                    PGetTabletVersionsResponse* response,
304
                                    google::protobuf::Closure* done) override;
305
306
private:
307
    void _response_pull_slave_rowset(const std::string& remote_host, int64_t brpc_port,
308
                                     int64_t txn_id, int64_t tablet_id, int64_t node_id,
309
                                     bool is_succeed);
310
    Status _multi_get(const PMultiGetRequest& request, PMultiGetResponse* response);
311
312
    void _get_column_ids_by_tablet_ids(google::protobuf::RpcController* controller,
313
                                       const PFetchColIdsRequest* request,
314
                                       PFetchColIdsResponse* response,
315
                                       google::protobuf::Closure* done);
316
317
    StorageEngine& _engine;
318
};
319
} // namespace doris