be/src/util/brpc_closure.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <google/protobuf/stubs/common.h> |
21 | | |
22 | | #include <atomic> |
23 | | #include <utility> |
24 | | |
25 | | #include "runtime/query_context.h" |
26 | | #include "runtime/thread_context.h" |
27 | | #include "service/brpc.h" |
28 | | |
29 | | namespace doris { |
30 | | |
31 | | template <typename Response> |
32 | | class DummyBrpcCallback { |
33 | | ENABLE_FACTORY_CREATOR(DummyBrpcCallback); |
34 | | |
35 | | public: |
36 | | using ResponseType = Response; |
37 | 2.86M | DummyBrpcCallback() { |
38 | 2.86M | cntl_ = std::make_shared<brpc::Controller>(); |
39 | 2.86M | call_id_ = cntl_->call_id(); |
40 | 2.86M | response_ = std::make_shared<Response>(); |
41 | 2.86M | } _ZN5doris17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEEC2Ev Line | Count | Source | 37 | 139 | DummyBrpcCallback() { | 38 | 139 | cntl_ = std::make_shared<brpc::Controller>(); | 39 | 139 | call_id_ = cntl_->call_id(); | 40 | 139 | response_ = std::make_shared<Response>(); | 41 | 139 | } |
_ZN5doris17DummyBrpcCallbackINS_22PPublishFilterResponseEEC2Ev Line | Count | Source | 37 | 1.76k | DummyBrpcCallback() { | 38 | 1.76k | cntl_ = std::make_shared<brpc::Controller>(); | 39 | 1.76k | call_id_ = cntl_->call_id(); | 40 | 1.76k | response_ = std::make_shared<Response>(); | 41 | 1.76k | } |
_ZN5doris17DummyBrpcCallbackINS_23PSendFilterSizeResponseEEC2Ev Line | Count | Source | 37 | 139 | DummyBrpcCallback() { | 38 | 139 | cntl_ = std::make_shared<brpc::Controller>(); | 39 | 139 | call_id_ = cntl_->call_id(); | 40 | 139 | response_ = std::make_shared<Response>(); | 41 | 139 | } |
_ZN5doris17DummyBrpcCallbackINS_20PMergeFilterResponseEEC2Ev Line | Count | Source | 37 | 2.37k | DummyBrpcCallback() { | 38 | 2.37k | cntl_ = std::make_shared<brpc::Controller>(); | 39 | 2.37k | call_id_ = cntl_->call_id(); | 40 | 2.37k | response_ = std::make_shared<Response>(); | 41 | 2.37k | } |
_ZN5doris17DummyBrpcCallbackINS_27PTabletWriterAddBlockResultEEC2Ev Line | Count | Source | 37 | 67.6k | DummyBrpcCallback() { | 38 | 67.6k | cntl_ = std::make_shared<brpc::Controller>(); | 39 | 67.6k | call_id_ = cntl_->call_id(); | 40 | 67.6k | response_ = std::make_shared<Response>(); | 41 | 67.6k | } |
_ZN5doris17DummyBrpcCallbackINS_23PTabletWriterOpenResultEEC2Ev Line | Count | Source | 37 | 67.4k | DummyBrpcCallback() { | 38 | 67.4k | cntl_ = std::make_shared<brpc::Controller>(); | 39 | 67.4k | call_id_ = cntl_->call_id(); | 40 | 67.4k | response_ = std::make_shared<Response>(); | 41 | 67.4k | } |
_ZN5doris17DummyBrpcCallbackINS_25PTabletWriterCancelResultEEC2Ev Line | Count | Source | 37 | 94 | DummyBrpcCallback() { | 38 | 94 | cntl_ = std::make_shared<brpc::Controller>(); | 39 | 94 | call_id_ = cntl_->call_id(); | 40 | 94 | response_ = std::make_shared<Response>(); | 41 | 94 | } |
_ZN5doris17DummyBrpcCallbackINS_19PTransmitDataResultEEC2Ev Line | Count | Source | 37 | 2.72M | DummyBrpcCallback() { | 38 | 2.72M | cntl_ = std::make_shared<brpc::Controller>(); | 39 | 2.72M | call_id_ = cntl_->call_id(); | 40 | 2.72M | response_ = std::make_shared<Response>(); | 41 | 2.72M | } |
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_29PFetchArrowFlightSchemaResultEEC2Ev Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_21PFetchArrowDataResultEEC2Ev Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEEC2Ev Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEEC2Ev |
42 | | |
43 | 2.89M | virtual ~DummyBrpcCallback() = default; _ZN5doris17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEED2Ev Line | Count | Source | 43 | 139 | virtual ~DummyBrpcCallback() = default; |
_ZN5doris17DummyBrpcCallbackINS_22PPublishFilterResponseEED2Ev Line | Count | Source | 43 | 1.76k | virtual ~DummyBrpcCallback() = default; |
_ZN5doris17DummyBrpcCallbackINS_23PSendFilterSizeResponseEED2Ev Line | Count | Source | 43 | 139 | virtual ~DummyBrpcCallback() = default; |
_ZN5doris17DummyBrpcCallbackINS_20PMergeFilterResponseEED2Ev Line | Count | Source | 43 | 2.37k | virtual ~DummyBrpcCallback() = default; |
_ZN5doris17DummyBrpcCallbackINS_27PTabletWriterAddBlockResultEED2Ev Line | Count | Source | 43 | 67.7k | virtual ~DummyBrpcCallback() = default; |
_ZN5doris17DummyBrpcCallbackINS_23PTabletWriterOpenResultEED2Ev Line | Count | Source | 43 | 67.9k | virtual ~DummyBrpcCallback() = default; |
_ZN5doris17DummyBrpcCallbackINS_25PTabletWriterCancelResultEED2Ev Line | Count | Source | 43 | 94 | virtual ~DummyBrpcCallback() = default; |
_ZN5doris17DummyBrpcCallbackINS_19PTransmitDataResultEED2Ev Line | Count | Source | 43 | 2.75M | virtual ~DummyBrpcCallback() = default; |
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_29PFetchArrowFlightSchemaResultEED2Ev Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_21PFetchArrowDataResultEED2Ev Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEED2Ev Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEED2Ev |
44 | | |
45 | 67.8k | virtual void call() {}Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_19PTransmitDataResultEE4callEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEE4callEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_22PPublishFilterResponseEE4callEv _ZN5doris17DummyBrpcCallbackINS_23PSendFilterSizeResponseEE4callEv Line | Count | Source | 45 | 2 | virtual void call() {} |
_ZN5doris17DummyBrpcCallbackINS_20PMergeFilterResponseEE4callEv Line | Count | Source | 45 | 4 | virtual void call() {} |
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_27PTabletWriterAddBlockResultEE4callEv _ZN5doris17DummyBrpcCallbackINS_23PTabletWriterOpenResultEE4callEv Line | Count | Source | 45 | 67.8k | virtual void call() {} |
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_25PTabletWriterCancelResultEE4callEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_29PFetchArrowFlightSchemaResultEE4callEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_21PFetchArrowDataResultEE4callEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEE4callEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEE4callEv |
46 | | |
47 | 67.9k | virtual void join() { brpc::Join(call_id_); }Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_19PTransmitDataResultEE4joinEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEE4joinEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_22PPublishFilterResponseEE4joinEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PSendFilterSizeResponseEE4joinEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_20PMergeFilterResponseEE4joinEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_27PTabletWriterAddBlockResultEE4joinEv _ZN5doris17DummyBrpcCallbackINS_23PTabletWriterOpenResultEE4joinEv Line | Count | Source | 47 | 67.9k | virtual void join() { brpc::Join(call_id_); } |
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_25PTabletWriterCancelResultEE4joinEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_29PFetchArrowFlightSchemaResultEE4joinEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_21PFetchArrowDataResultEE4joinEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEE4joinEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEE4joinEv |
48 | | |
49 | | // according to brpc doc, we MUST save the call_id before rpc done. use this id to join. |
50 | | // if a rpc is already done then we get the id and join, it's wrong. |
51 | | brpc::CallId call_id_; |
52 | | // controller has to be the same lifecycle with the closure, because brpc may use |
53 | | // it in any stage of the rpc. |
54 | | std::shared_ptr<brpc::Controller> cntl_; |
55 | | // We do not know if brpc will use request or response after brpc method returns. |
56 | | // So that we need keep a shared ptr here to ensure that brpc could use req/rep |
57 | | // at any stage. |
58 | | std::shared_ptr<Response> response_; |
59 | | }; |
60 | | |
61 | | // The closure will be deleted after callback. |
62 | | // It could only be created by using shared ptr or unique ptr. |
63 | | // It will hold a weak ptr of T and call run of T |
64 | | // Callback() { |
65 | | // xxxx; |
66 | | // public |
67 | | // void run() { |
68 | | // logxxx |
69 | | // } |
70 | | // } |
71 | | // |
72 | | // std::shared_ptr<Callback> b; |
73 | | // |
74 | | // std::unique_ptr<AutoReleaseClosure> a(b); |
75 | | // brpc_call(a.release()); |
76 | | |
77 | | template <typename T> |
78 | | concept HasStatus = requires(T* response) { response->status(); }; |
79 | | |
80 | | template <typename Request, typename Callback> |
81 | | class AutoReleaseClosure : public google::protobuf::Closure { |
82 | | using Weak = typename std::shared_ptr<Callback>::weak_type; |
83 | | using ResponseType = typename Callback::ResponseType; |
84 | | ENABLE_FACTORY_CREATOR(AutoReleaseClosure); |
85 | | |
86 | | public: |
87 | | AutoReleaseClosure(std::shared_ptr<Request> req, std::shared_ptr<Callback> callback, |
88 | | std::weak_ptr<QueryContext> context = {}, std::string_view error_msg = {}) |
89 | 2.89M | : request_(req), callback_(callback), context_(std::move(context)) { |
90 | 2.89M | this->cntl_ = callback->cntl_; |
91 | 2.89M | this->response_ = callback->response_; |
92 | 2.89M | } _ZN5doris18AutoReleaseClosureINS_22PSyncFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE Line | Count | Source | 89 | 139 | : request_(req), callback_(callback), context_(std::move(context)) { | 90 | 139 | this->cntl_ = callback->cntl_; | 91 | 139 | this->response_ = callback->response_; | 92 | 139 | } |
_ZN5doris18AutoReleaseClosureINS_23PPublishFilterRequestV2ENS_17DummyBrpcCallbackINS_22PPublishFilterResponseEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE Line | Count | Source | 89 | 1.76k | : request_(req), callback_(callback), context_(std::move(context)) { | 90 | 1.76k | this->cntl_ = callback->cntl_; | 91 | 1.76k | this->response_ = callback->response_; | 92 | 1.76k | } |
_ZN5doris18AutoReleaseClosureINS_22PSendFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSendFilterSizeResponseEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE Line | Count | Source | 89 | 139 | : request_(req), callback_(callback), context_(std::move(context)) { | 90 | 139 | this->cntl_ = callback->cntl_; | 91 | 139 | this->response_ = callback->response_; | 92 | 139 | } |
_ZN5doris18AutoReleaseClosureINS_19PMergeFilterRequestENS_17DummyBrpcCallbackINS_20PMergeFilterResponseEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE Line | Count | Source | 89 | 2.37k | : request_(req), callback_(callback), context_(std::move(context)) { | 90 | 2.37k | this->cntl_ = callback->cntl_; | 91 | 2.37k | this->response_ = callback->response_; | 92 | 2.37k | } |
_ZN5doris18AutoReleaseClosureINS_24PTabletWriterOpenRequestENS_17DummyBrpcCallbackINS_23PTabletWriterOpenResultEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE Line | Count | Source | 89 | 67.5k | : request_(req), callback_(callback), context_(std::move(context)) { | 90 | 67.5k | this->cntl_ = callback->cntl_; | 91 | 67.5k | this->response_ = callback->response_; | 92 | 67.5k | } |
_ZN5doris18AutoReleaseClosureINS_28PTabletWriterAddBlockRequestENS_18WriteBlockCallbackINS_27PTabletWriterAddBlockResultEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE Line | Count | Source | 89 | 70.0k | : request_(req), callback_(callback), context_(std::move(context)) { | 90 | 70.0k | this->cntl_ = callback->cntl_; | 91 | 70.0k | this->response_ = callback->response_; | 92 | 70.0k | } |
_ZN5doris18AutoReleaseClosureINS_26PTabletWriterCancelRequestENS_17DummyBrpcCallbackINS_25PTabletWriterCancelResultEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE Line | Count | Source | 89 | 94 | : request_(req), callback_(callback), context_(std::move(context)) { | 90 | 94 | this->cntl_ = callback->cntl_; | 91 | 94 | this->response_ = callback->response_; | 92 | 94 | } |
_ZN5doris18AutoReleaseClosureINS_19PTransmitDataParamsENS_20ExchangeSendCallbackINS_19PTransmitDataResultEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE Line | Count | Source | 89 | 2.75M | : request_(req), callback_(callback), context_(std::move(context)) { | 90 | 2.75M | this->cntl_ = callback->cntl_; | 91 | 2.75M | this->response_ = callback->response_; | 92 | 2.75M | } |
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_30PFetchArrowFlightSchemaRequestENS_17DummyBrpcCallbackINS_29PFetchArrowFlightSchemaResultEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PFetchArrowDataRequestENS_17DummyBrpcCallbackINS_21PFetchArrowDataResultEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_28PTabletWriteSlaveDoneRequestENS_17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_24PTabletWriteSlaveRequestENS_17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE |
93 | | |
94 | 2.88M | ~AutoReleaseClosure() override = default; _ZN5doris18AutoReleaseClosureINS_22PSyncFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEEEED2Ev Line | Count | Source | 94 | 139 | ~AutoReleaseClosure() override = default; |
_ZN5doris18AutoReleaseClosureINS_23PPublishFilterRequestV2ENS_17DummyBrpcCallbackINS_22PPublishFilterResponseEEEED2Ev Line | Count | Source | 94 | 1.76k | ~AutoReleaseClosure() override = default; |
_ZN5doris18AutoReleaseClosureINS_22PSendFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSendFilterSizeResponseEEEED2Ev Line | Count | Source | 94 | 139 | ~AutoReleaseClosure() override = default; |
_ZN5doris18AutoReleaseClosureINS_19PMergeFilterRequestENS_17DummyBrpcCallbackINS_20PMergeFilterResponseEEEED2Ev Line | Count | Source | 94 | 2.37k | ~AutoReleaseClosure() override = default; |
_ZN5doris18AutoReleaseClosureINS_24PTabletWriterOpenRequestENS_17DummyBrpcCallbackINS_23PTabletWriterOpenResultEEEED2Ev Line | Count | Source | 94 | 67.8k | ~AutoReleaseClosure() override = default; |
_ZN5doris18AutoReleaseClosureINS_28PTabletWriterAddBlockRequestENS_18WriteBlockCallbackINS_27PTabletWriterAddBlockResultEEEED2Ev Line | Count | Source | 94 | 69.9k | ~AutoReleaseClosure() override = default; |
_ZN5doris18AutoReleaseClosureINS_26PTabletWriterCancelRequestENS_17DummyBrpcCallbackINS_25PTabletWriterCancelResultEEEED2Ev Line | Count | Source | 94 | 94 | ~AutoReleaseClosure() override = default; |
_ZN5doris18AutoReleaseClosureINS_19PTransmitDataParamsENS_20ExchangeSendCallbackINS_19PTransmitDataResultEEEED2Ev Line | Count | Source | 94 | 2.73M | ~AutoReleaseClosure() override = default; |
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_30PFetchArrowFlightSchemaRequestENS_17DummyBrpcCallbackINS_29PFetchArrowFlightSchemaResultEEEED2Ev Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PFetchArrowDataRequestENS_17DummyBrpcCallbackINS_21PFetchArrowDataResultEEEED2Ev Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_28PTabletWriteSlaveDoneRequestENS_17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEEEED2Ev Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_24PTabletWriteSlaveRequestENS_17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEEEED2Ev |
95 | | |
96 | | // Will delete itself |
97 | 2.89M | void Run() override { |
98 | 2.89M | Defer defer {[&]() { delete this; }};_ZZN5doris18AutoReleaseClosureINS_22PSyncFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEEEE3RunEvENKUlvE_clEv Line | Count | Source | 98 | 139 | Defer defer {[&]() { delete this; }}; |
_ZZN5doris18AutoReleaseClosureINS_23PPublishFilterRequestV2ENS_17DummyBrpcCallbackINS_22PPublishFilterResponseEEEE3RunEvENKUlvE_clEv Line | Count | Source | 98 | 1.76k | Defer defer {[&]() { delete this; }}; |
_ZZN5doris18AutoReleaseClosureINS_22PSendFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSendFilterSizeResponseEEEE3RunEvENKUlvE_clEv Line | Count | Source | 98 | 139 | Defer defer {[&]() { delete this; }}; |
_ZZN5doris18AutoReleaseClosureINS_19PMergeFilterRequestENS_17DummyBrpcCallbackINS_20PMergeFilterResponseEEEE3RunEvENKUlvE_clEv Line | Count | Source | 98 | 2.37k | Defer defer {[&]() { delete this; }}; |
_ZZN5doris18AutoReleaseClosureINS_24PTabletWriterOpenRequestENS_17DummyBrpcCallbackINS_23PTabletWriterOpenResultEEEE3RunEvENKUlvE_clEv Line | Count | Source | 98 | 67.8k | Defer defer {[&]() { delete this; }}; |
_ZZN5doris18AutoReleaseClosureINS_28PTabletWriterAddBlockRequestENS_18WriteBlockCallbackINS_27PTabletWriterAddBlockResultEEEE3RunEvENKUlvE_clEv Line | Count | Source | 98 | 70.0k | Defer defer {[&]() { delete this; }}; |
_ZZN5doris18AutoReleaseClosureINS_26PTabletWriterCancelRequestENS_17DummyBrpcCallbackINS_25PTabletWriterCancelResultEEEE3RunEvENKUlvE_clEv Line | Count | Source | 98 | 94 | Defer defer {[&]() { delete this; }}; |
_ZZN5doris18AutoReleaseClosureINS_19PTransmitDataParamsENS_20ExchangeSendCallbackINS_19PTransmitDataResultEEEE3RunEvENKUlvE_clEv Line | Count | Source | 98 | 2.74M | Defer defer {[&]() { delete this; }}; |
Unexecuted instantiation: _ZZN5doris18AutoReleaseClosureINS_30PFetchArrowFlightSchemaRequestENS_17DummyBrpcCallbackINS_29PFetchArrowFlightSchemaResultEEEE3RunEvENKUlvE_clEv Unexecuted instantiation: _ZZN5doris18AutoReleaseClosureINS_22PFetchArrowDataRequestENS_17DummyBrpcCallbackINS_21PFetchArrowDataResultEEEE3RunEvENKUlvE_clEv Unexecuted instantiation: _ZZN5doris18AutoReleaseClosureINS_28PTabletWriteSlaveDoneRequestENS_17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEEEE3RunEvENKUlvE_clEv Unexecuted instantiation: _ZZN5doris18AutoReleaseClosureINS_24PTabletWriteSlaveRequestENS_17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEEEE3RunEvENKUlvE_clEv |
99 | | // If lock failed, it means the callback object is deconstructed, then no need |
100 | | // to deal with the callback any more. |
101 | 2.89M | if (auto tmp = callback_.lock()) { |
102 | 2.89M | tmp->call(); |
103 | 2.89M | } |
104 | 2.89M | if (cntl_->Failed()) { |
105 | 1 | _process_if_rpc_failed(); |
106 | 2.89M | } else { |
107 | 2.89M | _process_status<ResponseType>(response_.get()); |
108 | 2.89M | } |
109 | 2.89M | } _ZN5doris18AutoReleaseClosureINS_22PSyncFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEEEE3RunEv Line | Count | Source | 97 | 139 | void Run() override { | 98 | 139 | Defer defer {[&]() { delete this; }}; | 99 | | // If lock failed, it means the callback object is deconstructed, then no need | 100 | | // to deal with the callback any more. | 101 | 139 | if (auto tmp = callback_.lock()) { | 102 | 0 | tmp->call(); | 103 | 0 | } | 104 | 139 | if (cntl_->Failed()) { | 105 | 0 | _process_if_rpc_failed(); | 106 | 139 | } else { | 107 | 139 | _process_status<ResponseType>(response_.get()); | 108 | 139 | } | 109 | 139 | } |
_ZN5doris18AutoReleaseClosureINS_23PPublishFilterRequestV2ENS_17DummyBrpcCallbackINS_22PPublishFilterResponseEEEE3RunEv Line | Count | Source | 97 | 1.76k | void Run() override { | 98 | 1.76k | Defer defer {[&]() { delete this; }}; | 99 | | // If lock failed, it means the callback object is deconstructed, then no need | 100 | | // to deal with the callback any more. | 101 | 1.76k | if (auto tmp = callback_.lock()) { | 102 | 0 | tmp->call(); | 103 | 0 | } | 104 | 1.76k | if (cntl_->Failed()) { | 105 | 0 | _process_if_rpc_failed(); | 106 | 1.76k | } else { | 107 | 1.76k | _process_status<ResponseType>(response_.get()); | 108 | 1.76k | } | 109 | 1.76k | } |
_ZN5doris18AutoReleaseClosureINS_22PSendFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSendFilterSizeResponseEEEE3RunEv Line | Count | Source | 97 | 139 | void Run() override { | 98 | 139 | Defer defer {[&]() { delete this; }}; | 99 | | // If lock failed, it means the callback object is deconstructed, then no need | 100 | | // to deal with the callback any more. | 101 | 139 | if (auto tmp = callback_.lock()) { | 102 | 2 | tmp->call(); | 103 | 2 | } | 104 | 139 | if (cntl_->Failed()) { | 105 | 0 | _process_if_rpc_failed(); | 106 | 139 | } else { | 107 | 139 | _process_status<ResponseType>(response_.get()); | 108 | 139 | } | 109 | 139 | } |
_ZN5doris18AutoReleaseClosureINS_19PMergeFilterRequestENS_17DummyBrpcCallbackINS_20PMergeFilterResponseEEEE3RunEv Line | Count | Source | 97 | 2.37k | void Run() override { | 98 | 2.37k | Defer defer {[&]() { delete this; }}; | 99 | | // If lock failed, it means the callback object is deconstructed, then no need | 100 | | // to deal with the callback any more. | 101 | 2.37k | if (auto tmp = callback_.lock()) { | 102 | 4 | tmp->call(); | 103 | 4 | } | 104 | 2.37k | if (cntl_->Failed()) { | 105 | 0 | _process_if_rpc_failed(); | 106 | 2.37k | } else { | 107 | 2.37k | _process_status<ResponseType>(response_.get()); | 108 | 2.37k | } | 109 | 2.37k | } |
_ZN5doris18AutoReleaseClosureINS_24PTabletWriterOpenRequestENS_17DummyBrpcCallbackINS_23PTabletWriterOpenResultEEEE3RunEv Line | Count | Source | 97 | 67.8k | void Run() override { | 98 | 67.8k | Defer defer {[&]() { delete this; }}; | 99 | | // If lock failed, it means the callback object is deconstructed, then no need | 100 | | // to deal with the callback any more. | 101 | 67.8k | if (auto tmp = callback_.lock()) { | 102 | 67.8k | tmp->call(); | 103 | 67.8k | } | 104 | 67.8k | if (cntl_->Failed()) { | 105 | 0 | _process_if_rpc_failed(); | 106 | 67.8k | } else { | 107 | 67.8k | _process_status<ResponseType>(response_.get()); | 108 | 67.8k | } | 109 | 67.8k | } |
_ZN5doris18AutoReleaseClosureINS_28PTabletWriterAddBlockRequestENS_18WriteBlockCallbackINS_27PTabletWriterAddBlockResultEEEE3RunEv Line | Count | Source | 97 | 70.0k | void Run() override { | 98 | 70.0k | Defer defer {[&]() { delete this; }}; | 99 | | // If lock failed, it means the callback object is deconstructed, then no need | 100 | | // to deal with the callback any more. | 101 | 70.0k | if (auto tmp = callback_.lock()) { | 102 | 70.0k | tmp->call(); | 103 | 70.0k | } | 104 | 70.0k | if (cntl_->Failed()) { | 105 | 0 | _process_if_rpc_failed(); | 106 | 70.0k | } else { | 107 | 70.0k | _process_status<ResponseType>(response_.get()); | 108 | 70.0k | } | 109 | 70.0k | } |
_ZN5doris18AutoReleaseClosureINS_26PTabletWriterCancelRequestENS_17DummyBrpcCallbackINS_25PTabletWriterCancelResultEEEE3RunEv Line | Count | Source | 97 | 94 | void Run() override { | 98 | 94 | Defer defer {[&]() { delete this; }}; | 99 | | // If lock failed, it means the callback object is deconstructed, then no need | 100 | | // to deal with the callback any more. | 101 | 94 | if (auto tmp = callback_.lock()) { | 102 | 0 | tmp->call(); | 103 | 0 | } | 104 | 94 | if (cntl_->Failed()) { | 105 | 0 | _process_if_rpc_failed(); | 106 | 94 | } else { | 107 | 94 | _process_status<ResponseType>(response_.get()); | 108 | 94 | } | 109 | 94 | } |
_ZN5doris18AutoReleaseClosureINS_19PTransmitDataParamsENS_20ExchangeSendCallbackINS_19PTransmitDataResultEEEE3RunEv Line | Count | Source | 97 | 2.75M | void Run() override { | 98 | 2.75M | Defer defer {[&]() { delete this; }}; | 99 | | // If lock failed, it means the callback object is deconstructed, then no need | 100 | | // to deal with the callback any more. | 101 | 2.75M | if (auto tmp = callback_.lock()) { | 102 | 2.75M | tmp->call(); | 103 | 2.75M | } | 104 | 2.75M | if (cntl_->Failed()) { | 105 | 1 | _process_if_rpc_failed(); | 106 | 2.75M | } else { | 107 | 2.75M | _process_status<ResponseType>(response_.get()); | 108 | 2.75M | } | 109 | 2.75M | } |
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_30PFetchArrowFlightSchemaRequestENS_17DummyBrpcCallbackINS_29PFetchArrowFlightSchemaResultEEEE3RunEv Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PFetchArrowDataRequestENS_17DummyBrpcCallbackINS_21PFetchArrowDataResultEEEE3RunEv Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_28PTabletWriteSlaveDoneRequestENS_17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEEEE3RunEv Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_24PTabletWriteSlaveRequestENS_17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEEEE3RunEv |
110 | | |
111 | | // controller has to be the same lifecycle with the closure, because brpc may use |
112 | | // it in any stage of the rpc. |
113 | | std::shared_ptr<brpc::Controller> cntl_; |
114 | | // We do not know if brpc will use request or response after brpc method returns. |
115 | | // So that we need keep a shared ptr here to ensure that brpc could use req/rep |
116 | | // at any stage. |
117 | | std::shared_ptr<Request> request_; |
118 | | std::shared_ptr<ResponseType> response_; |
119 | | std::string error_msg_; |
120 | | |
121 | | protected: |
122 | 1 | virtual void _process_if_rpc_failed() { |
123 | 1 | std::string error_msg = |
124 | 1 | fmt::format("RPC meet failed: {} {}", cntl_->ErrorText(), error_msg_); |
125 | 1 | if (auto ctx = context_.lock(); ctx) { |
126 | 0 | ctx->cancel(Status::NetworkError(error_msg)); |
127 | 1 | } else { |
128 | 1 | LOG(WARNING) << error_msg; |
129 | 1 | } |
130 | 1 | } Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PSyncFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEEEE22_process_if_rpc_failedEv Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_23PPublishFilterRequestV2ENS_17DummyBrpcCallbackINS_22PPublishFilterResponseEEEE22_process_if_rpc_failedEv Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PSendFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSendFilterSizeResponseEEEE22_process_if_rpc_failedEv Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_19PMergeFilterRequestENS_17DummyBrpcCallbackINS_20PMergeFilterResponseEEEE22_process_if_rpc_failedEv Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_24PTabletWriterOpenRequestENS_17DummyBrpcCallbackINS_23PTabletWriterOpenResultEEEE22_process_if_rpc_failedEv Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_28PTabletWriterAddBlockRequestENS_18WriteBlockCallbackINS_27PTabletWriterAddBlockResultEEEE22_process_if_rpc_failedEv Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_26PTabletWriterCancelRequestENS_17DummyBrpcCallbackINS_25PTabletWriterCancelResultEEEE22_process_if_rpc_failedEv _ZN5doris18AutoReleaseClosureINS_19PTransmitDataParamsENS_20ExchangeSendCallbackINS_19PTransmitDataResultEEEE22_process_if_rpc_failedEv Line | Count | Source | 122 | 1 | virtual void _process_if_rpc_failed() { | 123 | 1 | std::string error_msg = | 124 | 1 | fmt::format("RPC meet failed: {} {}", cntl_->ErrorText(), error_msg_); | 125 | 1 | if (auto ctx = context_.lock(); ctx) { | 126 | 0 | ctx->cancel(Status::NetworkError(error_msg)); | 127 | 1 | } else { | 128 | | LOG(WARNING) << error_msg; | 129 | 1 | } | 130 | 1 | } |
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_30PFetchArrowFlightSchemaRequestENS_17DummyBrpcCallbackINS_29PFetchArrowFlightSchemaResultEEEE22_process_if_rpc_failedEv Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PFetchArrowDataRequestENS_17DummyBrpcCallbackINS_21PFetchArrowDataResultEEEE22_process_if_rpc_failedEv Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_28PTabletWriteSlaveDoneRequestENS_17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEEEE22_process_if_rpc_failedEv Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_24PTabletWriteSlaveRequestENS_17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEEEE22_process_if_rpc_failedEv |
131 | | |
132 | 15.1k | virtual void _process_if_meet_error_status(const Status& status) { |
133 | 15.1k | if (status.is<ErrorCode::END_OF_FILE>()) { |
134 | | // no need to log END_OF_FILE, reduce the unlessful log |
135 | 15.0k | return; |
136 | 15.0k | } |
137 | 34 | if (auto ctx = context_.lock(); ctx) { |
138 | 0 | ctx->cancel(status); |
139 | 34 | } else { |
140 | 34 | LOG(WARNING) << "RPC meet error status: " << status; |
141 | 34 | } |
142 | 34 | } Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PSyncFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEEEE29_process_if_meet_error_statusERKNS_6StatusE Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_23PPublishFilterRequestV2ENS_17DummyBrpcCallbackINS_22PPublishFilterResponseEEEE29_process_if_meet_error_statusERKNS_6StatusE Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PSendFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSendFilterSizeResponseEEEE29_process_if_meet_error_statusERKNS_6StatusE _ZN5doris18AutoReleaseClosureINS_19PMergeFilterRequestENS_17DummyBrpcCallbackINS_20PMergeFilterResponseEEEE29_process_if_meet_error_statusERKNS_6StatusE Line | Count | Source | 132 | 19 | virtual void _process_if_meet_error_status(const Status& status) { | 133 | 19 | if (status.is<ErrorCode::END_OF_FILE>()) { | 134 | | // no need to log END_OF_FILE, reduce the unlessful log | 135 | 19 | return; | 136 | 19 | } | 137 | 0 | if (auto ctx = context_.lock(); ctx) { | 138 | 0 | ctx->cancel(status); | 139 | 0 | } else { | 140 | | LOG(WARNING) << "RPC meet error status: " << status; | 141 | 0 | } | 142 | 0 | } |
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_24PTabletWriterOpenRequestENS_17DummyBrpcCallbackINS_23PTabletWriterOpenResultEEEE29_process_if_meet_error_statusERKNS_6StatusE _ZN5doris18AutoReleaseClosureINS_28PTabletWriterAddBlockRequestENS_18WriteBlockCallbackINS_27PTabletWriterAddBlockResultEEEE29_process_if_meet_error_statusERKNS_6StatusE Line | Count | Source | 132 | 48 | virtual void _process_if_meet_error_status(const Status& status) { | 133 | 48 | if (status.is<ErrorCode::END_OF_FILE>()) { | 134 | | // no need to log END_OF_FILE, reduce the unlessful log | 135 | 0 | return; | 136 | 0 | } | 137 | 48 | if (auto ctx = context_.lock(); ctx) { | 138 | 0 | ctx->cancel(status); | 139 | 48 | } else { | 140 | | LOG(WARNING) << "RPC meet error status: " << status; | 141 | 48 | } | 142 | 48 | } |
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_26PTabletWriterCancelRequestENS_17DummyBrpcCallbackINS_25PTabletWriterCancelResultEEEE29_process_if_meet_error_statusERKNS_6StatusE _ZN5doris18AutoReleaseClosureINS_19PTransmitDataParamsENS_20ExchangeSendCallbackINS_19PTransmitDataResultEEEE29_process_if_meet_error_statusERKNS_6StatusE Line | Count | Source | 132 | 15.0k | virtual void _process_if_meet_error_status(const Status& status) { | 133 | 15.0k | if (status.is<ErrorCode::END_OF_FILE>()) { | 134 | | // no need to log END_OF_FILE, reduce the unlessful log | 135 | 15.0k | return; | 136 | 15.0k | } | 137 | 18.4E | if (auto ctx = context_.lock(); ctx) { | 138 | 0 | ctx->cancel(status); | 139 | 18.4E | } else { | 140 | | LOG(WARNING) << "RPC meet error status: " << status; | 141 | 18.4E | } | 142 | 18.4E | } |
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_30PFetchArrowFlightSchemaRequestENS_17DummyBrpcCallbackINS_29PFetchArrowFlightSchemaResultEEEE29_process_if_meet_error_statusERKNS_6StatusE Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PFetchArrowDataRequestENS_17DummyBrpcCallbackINS_21PFetchArrowDataResultEEEE29_process_if_meet_error_statusERKNS_6StatusE Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_28PTabletWriteSlaveDoneRequestENS_17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEEEE29_process_if_meet_error_statusERKNS_6StatusE Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_24PTabletWriteSlaveRequestENS_17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEEEE29_process_if_meet_error_statusERKNS_6StatusE |
143 | | |
144 | | private: |
145 | | template <typename Response> |
146 | 94 | void _process_status(Response* response) {} |
147 | | |
148 | | template <HasStatus Response> |
149 | 2.86M | void _process_status(Response* response) { |
150 | 2.86M | if (Status status = Status::create(response->status()); !status.ok()) { |
151 | 15.1k | _process_if_meet_error_status(status); |
152 | 15.1k | } |
153 | 2.86M | } _ZN5doris18AutoReleaseClosureINS_22PSyncFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEEEE15_process_statusITkNS_9HasStatusES3_EEvPT_ Line | Count | Source | 149 | 139 | void _process_status(Response* response) { | 150 | 139 | if (Status status = Status::create(response->status()); !status.ok()) { | 151 | 0 | _process_if_meet_error_status(status); | 152 | 0 | } | 153 | 139 | } |
_ZN5doris18AutoReleaseClosureINS_23PPublishFilterRequestV2ENS_17DummyBrpcCallbackINS_22PPublishFilterResponseEEEE15_process_statusITkNS_9HasStatusES3_EEvPT_ Line | Count | Source | 149 | 1.76k | void _process_status(Response* response) { | 150 | 1.76k | if (Status status = Status::create(response->status()); !status.ok()) { | 151 | 0 | _process_if_meet_error_status(status); | 152 | 0 | } | 153 | 1.76k | } |
_ZN5doris18AutoReleaseClosureINS_22PSendFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSendFilterSizeResponseEEEE15_process_statusITkNS_9HasStatusES3_EEvPT_ Line | Count | Source | 149 | 139 | void _process_status(Response* response) { | 150 | 139 | if (Status status = Status::create(response->status()); !status.ok()) { | 151 | 0 | _process_if_meet_error_status(status); | 152 | 0 | } | 153 | 139 | } |
_ZN5doris18AutoReleaseClosureINS_19PMergeFilterRequestENS_17DummyBrpcCallbackINS_20PMergeFilterResponseEEEE15_process_statusITkNS_9HasStatusES3_EEvPT_ Line | Count | Source | 149 | 2.37k | void _process_status(Response* response) { | 150 | 2.37k | if (Status status = Status::create(response->status()); !status.ok()) { | 151 | 19 | _process_if_meet_error_status(status); | 152 | 19 | } | 153 | 2.37k | } |
_ZN5doris18AutoReleaseClosureINS_24PTabletWriterOpenRequestENS_17DummyBrpcCallbackINS_23PTabletWriterOpenResultEEEE15_process_statusITkNS_9HasStatusES3_EEvPT_ Line | Count | Source | 149 | 67.7k | void _process_status(Response* response) { | 150 | 67.7k | if (Status status = Status::create(response->status()); !status.ok()) { | 151 | 0 | _process_if_meet_error_status(status); | 152 | 0 | } | 153 | 67.7k | } |
_ZN5doris18AutoReleaseClosureINS_28PTabletWriterAddBlockRequestENS_18WriteBlockCallbackINS_27PTabletWriterAddBlockResultEEEE15_process_statusITkNS_9HasStatusES3_EEvPT_ Line | Count | Source | 149 | 69.8k | void _process_status(Response* response) { | 150 | 69.8k | if (Status status = Status::create(response->status()); !status.ok()) { | 151 | 48 | _process_if_meet_error_status(status); | 152 | 48 | } | 153 | 69.8k | } |
_ZN5doris18AutoReleaseClosureINS_19PTransmitDataParamsENS_20ExchangeSendCallbackINS_19PTransmitDataResultEEEE15_process_statusITkNS_9HasStatusES3_EEvPT_ Line | Count | Source | 149 | 2.72M | void _process_status(Response* response) { | 150 | 2.72M | if (Status status = Status::create(response->status()); !status.ok()) { | 151 | 15.1k | _process_if_meet_error_status(status); | 152 | 15.1k | } | 153 | 2.72M | } |
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_30PFetchArrowFlightSchemaRequestENS_17DummyBrpcCallbackINS_29PFetchArrowFlightSchemaResultEEEE15_process_statusITkNS_9HasStatusES3_EEvPT_ Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PFetchArrowDataRequestENS_17DummyBrpcCallbackINS_21PFetchArrowDataResultEEEE15_process_statusITkNS_9HasStatusES3_EEvPT_ Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_28PTabletWriteSlaveDoneRequestENS_17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEEEE15_process_statusITkNS_9HasStatusES3_EEvPT_ Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_24PTabletWriteSlaveRequestENS_17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEEEE15_process_statusITkNS_9HasStatusES3_EEvPT_ |
154 | | // Use a weak ptr to keep the callback, so that the callback can be deleted if the main |
155 | | // thread is freed. |
156 | | Weak callback_; |
157 | | std::weak_ptr<QueryContext> context_; |
158 | | }; |
159 | | |
160 | | } // namespace doris |