Coverage Report

Created: 2025-03-12 11:55

/root/doris/be/src/service/internal_service.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <brpc/closure_guard.h>
21
#include <gen_cpp/internal_service.pb.h>
22
23
#include <string>
24
25
#include "common/status.h"
26
#include "util/work_thread_pool.hpp"
27
28
namespace google::protobuf {
29
class Closure;
30
class RpcController;
31
} // namespace google::protobuf
32
33
namespace doris {
34
35
class StorageEngine;
36
class ExecEnv;
37
class PHandShakeRequest;
38
class PHandShakeResponse;
39
class RuntimeState;
40
41
template <typename T>
42
concept CanCancel = requires(T* response) { response->mutable_status(); };
43
44
template <typename T>
45
0
void offer_failed(T* response, google::protobuf::Closure* done, const FifoThreadPool& pool) {
46
0
    brpc::ClosureGuard closure_guard(done);
47
0
    LOG(WARNING) << "fail to offer request to the work pool, pool=" << pool.get_info();
48
0
}
Unexecuted instantiation: _ZN5doris12offer_failedINS_25PTabletWriterCancelResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_14PCacheResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_17PFetchCacheResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
49
50
template <CanCancel T>
51
0
void offer_failed(T* response, google::protobuf::Closure* done, const FifoThreadPool& pool) {
52
0
    brpc::ClosureGuard closure_guard(done);
53
    // Should use status to generate protobuf message, because it will encoding Backend Info
54
    // into the error message and then we could know which backend's pool is full.
55
0
    Status st = Status::Error<TStatusCode::CANCELLED>(
56
0
            "fail to offer request to the work pool, pool={}", pool.get_info());
57
0
    st.to_protobuf(response->mutable_status());
58
0
    LOG(WARNING) << "cancelled due to fail to offer request to the work pool, pool="
59
0
                 << pool.get_info();
60
0
}
Unexecuted instantiation: _ZN5doris12offer_failedINS_23PTabletWriterOpenResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_23PExecPlanFragmentResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_23POpenLoadStreamResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_27PTabletWriterAddBlockResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_25PCancelPlanFragmentResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_21PFetchArrowDataResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_26POutfileWriteSuccessResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_23PFetchTableSchemaResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_29PFetchArrowFlightSchemaResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_24PTabletKeyLookupResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_25PJdbcTestConnectionResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_20PFetchColIdsResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_26PFetchRemoteSchemaResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_12PProxyResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_20PMergeFilterResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_23PSendFilterSizeResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_23PSyncFilterSizeResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_22PPublishFilterResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_15PSendDataResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_13PCommitResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_15PRollbackResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_19PConstantExprResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_19PTransmitDataResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_24PCheckRPCChannelResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_24PResetRPCChannelResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_23PTabletWriteSlaveResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_27PTabletWriteSlaveDoneResultEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_17PMultiGetResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_13PGlobResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_26PGroupCommitInsertResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_24PGetWalQueueSizeResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
Unexecuted instantiation: _ZN5doris12offer_failedINS_22PGetBeResourceResponseEEEvPT_PN6google8protobuf7ClosureERKNS_14WorkThreadPoolILb0EEE
61
62
class PInternalService : public PBackendService {
63
public:
64
    PInternalService(ExecEnv* exec_env);
65
    ~PInternalService() override;
66
67
    void exec_plan_fragment(google::protobuf::RpcController* controller,
68
                            const PExecPlanFragmentRequest* request,
69
                            PExecPlanFragmentResult* result,
70
                            google::protobuf::Closure* done) override;
71
72
    void exec_plan_fragment_prepare(google::protobuf::RpcController* controller,
73
                                    const PExecPlanFragmentRequest* request,
74
                                    PExecPlanFragmentResult* result,
75
                                    google::protobuf::Closure* done) override;
76
77
    void exec_plan_fragment_start(google::protobuf::RpcController* controller,
78
                                  const PExecPlanFragmentStartRequest* request,
79
                                  PExecPlanFragmentResult* result,
80
                                  google::protobuf::Closure* done) override;
81
82
    void cancel_plan_fragment(google::protobuf::RpcController* controller,
83
                              const PCancelPlanFragmentRequest* request,
84
                              PCancelPlanFragmentResult* result,
85
                              google::protobuf::Closure* done) override;
86
87
    void fetch_data(google::protobuf::RpcController* controller, const PFetchDataRequest* request,
88
                    PFetchDataResult* result, google::protobuf::Closure* done) override;
89
90
    void fetch_arrow_data(google::protobuf::RpcController* controller,
91
                          const PFetchArrowDataRequest* request, PFetchArrowDataResult* result,
92
                          google::protobuf::Closure* done) override;
93
94
    void outfile_write_success(google::protobuf::RpcController* controller,
95
                               const POutfileWriteSuccessRequest* request,
96
                               POutfileWriteSuccessResult* result,
97
                               google::protobuf::Closure* done) override;
98
99
    void fetch_table_schema(google::protobuf::RpcController* controller,
100
                            const PFetchTableSchemaRequest* request,
101
                            PFetchTableSchemaResult* result,
102
                            google::protobuf::Closure* done) override;
103
104
    void fetch_arrow_flight_schema(google::protobuf::RpcController* controller,
105
                                   const PFetchArrowFlightSchemaRequest* request,
106
                                   PFetchArrowFlightSchemaResult* result,
107
                                   google::protobuf::Closure* done) override;
108
109
    void tablet_writer_open(google::protobuf::RpcController* controller,
110
                            const PTabletWriterOpenRequest* request,
111
                            PTabletWriterOpenResult* response,
112
                            google::protobuf::Closure* done) override;
113
114
    void open_load_stream(google::protobuf::RpcController* controller,
115
                          const POpenLoadStreamRequest* request, POpenLoadStreamResponse* response,
116
                          google::protobuf::Closure* done) override;
117
118
    void tablet_writer_add_block(google::protobuf::RpcController* controller,
119
                                 const PTabletWriterAddBlockRequest* request,
120
                                 PTabletWriterAddBlockResult* response,
121
                                 google::protobuf::Closure* done) override;
122
123
    void tablet_writer_add_block_by_http(google::protobuf::RpcController* controller,
124
                                         const ::doris::PEmptyRequest* request,
125
                                         PTabletWriterAddBlockResult* response,
126
                                         google::protobuf::Closure* done) override;
127
128
    void tablet_writer_cancel(google::protobuf::RpcController* controller,
129
                              const PTabletWriterCancelRequest* request,
130
                              PTabletWriterCancelResult* response,
131
                              google::protobuf::Closure* done) override;
132
133
    void get_info(google::protobuf::RpcController* controller, const PProxyRequest* request,
134
                  PProxyResult* response, google::protobuf::Closure* done) override;
135
136
    void update_cache(google::protobuf::RpcController* controller,
137
                      const PUpdateCacheRequest* request, PCacheResponse* response,
138
                      google::protobuf::Closure* done) override;
139
140
    void fetch_cache(google::protobuf::RpcController* controller, const PFetchCacheRequest* request,
141
                     PFetchCacheResult* result, google::protobuf::Closure* done) override;
142
143
    void clear_cache(google::protobuf::RpcController* controller, const PClearCacheRequest* request,
144
                     PCacheResponse* response, google::protobuf::Closure* done) override;
145
146
    void merge_filter(::google::protobuf::RpcController* controller,
147
                      const ::doris::PMergeFilterRequest* request,
148
                      ::doris::PMergeFilterResponse* response,
149
                      ::google::protobuf::Closure* done) override;
150
151
    void send_filter_size(::google::protobuf::RpcController* controller,
152
                          const ::doris::PSendFilterSizeRequest* request,
153
                          ::doris::PSendFilterSizeResponse* response,
154
                          ::google::protobuf::Closure* done) override;
155
156
    void sync_filter_size(::google::protobuf::RpcController* controller,
157
                          const ::doris::PSyncFilterSizeRequest* request,
158
                          ::doris::PSyncFilterSizeResponse* response,
159
                          ::google::protobuf::Closure* done) override;
160
    void apply_filterv2(::google::protobuf::RpcController* controller,
161
                        const ::doris::PPublishFilterRequestV2* request,
162
                        ::doris::PPublishFilterResponse* response,
163
                        ::google::protobuf::Closure* done) override;
164
    void transmit_block(::google::protobuf::RpcController* controller,
165
                        const ::doris::PTransmitDataParams* request,
166
                        ::doris::PTransmitDataResult* response,
167
                        ::google::protobuf::Closure* done) override;
168
    void transmit_block_by_http(::google::protobuf::RpcController* controller,
169
                                const ::doris::PEmptyRequest* request,
170
                                ::doris::PTransmitDataResult* response,
171
                                ::google::protobuf::Closure* done) override;
172
173
    void send_data(google::protobuf::RpcController* controller, const PSendDataRequest* request,
174
                   PSendDataResult* response, google::protobuf::Closure* done) override;
175
    void commit(google::protobuf::RpcController* controller, const PCommitRequest* request,
176
                PCommitResult* response, google::protobuf::Closure* done) override;
177
    void rollback(google::protobuf::RpcController* controller, const PRollbackRequest* request,
178
                  PRollbackResult* response, google::protobuf::Closure* done) override;
179
    void fold_constant_expr(google::protobuf::RpcController* controller,
180
                            const PConstantExprRequest* request, PConstantExprResult* response,
181
                            google::protobuf::Closure* done) override;
182
    void check_rpc_channel(google::protobuf::RpcController* controller,
183
                           const PCheckRPCChannelRequest* request,
184
                           PCheckRPCChannelResponse* response,
185
                           google::protobuf::Closure* done) override;
186
    void reset_rpc_channel(google::protobuf::RpcController* controller,
187
                           const PResetRPCChannelRequest* request,
188
                           PResetRPCChannelResponse* response,
189
                           google::protobuf::Closure* done) override;
190
    void hand_shake(google::protobuf::RpcController* controller, const PHandShakeRequest* request,
191
                    PHandShakeResponse* response, google::protobuf::Closure* done) override;
192
193
    void report_stream_load_status(google::protobuf::RpcController* controller,
194
                                   const PReportStreamLoadStatusRequest* request,
195
                                   PReportStreamLoadStatusResponse* response,
196
                                   google::protobuf::Closure* done) override;
197
198
    void glob(google::protobuf::RpcController* controller, const PGlobRequest* request,
199
              PGlobResponse* response, google::protobuf::Closure* done) override;
200
201
    void group_commit_insert(google::protobuf::RpcController* controller,
202
                             const PGroupCommitInsertRequest* request,
203
                             PGroupCommitInsertResponse* response,
204
                             google::protobuf::Closure* done) override;
205
206
    void get_wal_queue_size(google::protobuf::RpcController* controller,
207
                            const PGetWalQueueSizeRequest* request,
208
                            PGetWalQueueSizeResponse* response,
209
                            google::protobuf::Closure* done) override;
210
211
    void multiget_data(google::protobuf::RpcController* controller, const PMultiGetRequest* request,
212
                       PMultiGetResponse* response, google::protobuf::Closure* done) override;
213
214
    void tablet_fetch_data(google::protobuf::RpcController* controller,
215
                           const PTabletKeyLookupRequest* request,
216
                           PTabletKeyLookupResponse* response,
217
                           google::protobuf::Closure* done) override;
218
219
    void test_jdbc_connection(google::protobuf::RpcController* controller,
220
                              const PJdbcTestConnectionRequest* request,
221
                              PJdbcTestConnectionResult* result,
222
                              google::protobuf::Closure* done) override;
223
224
    void fetch_remote_tablet_schema(google::protobuf::RpcController* controller,
225
                                    const PFetchRemoteSchemaRequest* request,
226
                                    PFetchRemoteSchemaResponse* response,
227
                                    google::protobuf::Closure* done) override;
228
229
    void get_be_resource(google::protobuf::RpcController* controller,
230
                         const PGetBeResourceRequest* request, PGetBeResourceResponse* response,
231
                         google::protobuf::Closure* done) override;
232
233
private:
234
    void _exec_plan_fragment_in_pthread(google::protobuf::RpcController* controller,
235
                                        const PExecPlanFragmentRequest* request,
236
                                        PExecPlanFragmentResult* result,
237
                                        google::protobuf::Closure* done);
238
239
    Status _exec_plan_fragment_impl(const std::string& s_request, PFragmentRequestVersion version,
240
                                    bool compact,
241
                                    const std::function<void(RuntimeState*, Status*)>& cb =
242
                                            std::function<void(RuntimeState*, Status*)>());
243
244
    Status _fold_constant_expr(const std::string& ser_request, PConstantExprResult* response);
245
246
    void _transmit_block(::google::protobuf::RpcController* controller,
247
                         const ::doris::PTransmitDataParams* request,
248
                         ::doris::PTransmitDataResult* response, ::google::protobuf::Closure* done,
249
                         const Status& extract_st, const int64_t wait_for_worker);
250
251
    Status _tablet_fetch_data(const PTabletKeyLookupRequest* request,
252
                              PTabletKeyLookupResponse* response);
253
254
protected:
255
    ExecEnv* _exec_env = nullptr;
256
257
    // every brpc service request should put into thread pool
258
    // the reason see issue #16634
259
    // define the interface for reading and writing data as heavy interface
260
    // otherwise as light interface
261
    FifoThreadPool _heavy_work_pool;
262
    FifoThreadPool _light_work_pool;
263
    FifoThreadPool _arrow_flight_work_pool;
264
};
265
266
// `StorageEngine` mixin for `PInternalService`
267
class PInternalServiceImpl final : public PInternalService {
268
public:
269
    PInternalServiceImpl(StorageEngine& engine, ExecEnv* exec_env);
270
271
    ~PInternalServiceImpl() override;
272
    void request_slave_tablet_pull_rowset(google::protobuf::RpcController* controller,
273
                                          const PTabletWriteSlaveRequest* request,
274
                                          PTabletWriteSlaveResult* response,
275
                                          google::protobuf::Closure* done) override;
276
    void response_slave_tablet_pull_rowset(google::protobuf::RpcController* controller,
277
                                           const PTabletWriteSlaveDoneRequest* request,
278
                                           PTabletWriteSlaveDoneResult* response,
279
                                           google::protobuf::Closure* done) override;
280
281
    void get_column_ids_by_tablet_ids(google::protobuf::RpcController* controller,
282
                                      const PFetchColIdsRequest* request,
283
                                      PFetchColIdsResponse* response,
284
                                      google::protobuf::Closure* done) override;
285
286
    void get_tablet_rowset_versions(google::protobuf::RpcController* controller,
287
                                    const PGetTabletVersionsRequest* request,
288
                                    PGetTabletVersionsResponse* response,
289
                                    google::protobuf::Closure* done) override;
290
291
private:
292
    void _response_pull_slave_rowset(const std::string& remote_host, int64_t brpc_port,
293
                                     int64_t txn_id, int64_t tablet_id, int64_t node_id,
294
                                     bool is_succeed);
295
    Status _multi_get(const PMultiGetRequest& request, PMultiGetResponse* response);
296
297
    void _get_column_ids_by_tablet_ids(google::protobuf::RpcController* controller,
298
                                       const PFetchColIdsRequest* request,
299
                                       PFetchColIdsResponse* response,
300
                                       google::protobuf::Closure* done);
301
302
    StorageEngine& _engine;
303
};
304
} // namespace doris