/root/doris/be/src/util/brpc_closure.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <google/protobuf/stubs/common.h> |
21 | | |
22 | | #include <atomic> |
23 | | #include <utility> |
24 | | |
25 | | #include "runtime/query_context.h" |
26 | | #include "runtime/thread_context.h" |
27 | | #include "service/brpc.h" |
28 | | |
29 | | namespace doris { |
30 | | |
31 | | template <typename Response> |
32 | | class DummyBrpcCallback { |
33 | | ENABLE_FACTORY_CREATOR(DummyBrpcCallback); |
34 | | |
35 | | public: |
36 | | using ResponseType = Response; |
37 | 22 | DummyBrpcCallback() { |
38 | 22 | cntl_ = std::make_shared<brpc::Controller>(); |
39 | 22 | call_id_ = cntl_->call_id(); |
40 | 22 | response_ = std::make_shared<Response>(); |
41 | 22 | } Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEEC2Ev Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_20PMergeFilterResponseEEC2Ev Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEEC2Ev Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_22PPublishFilterResponseEEC2Ev Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PSendFilterSizeResponseEEC2Ev Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEEC2Ev _ZN5doris17DummyBrpcCallbackINS_19PTransmitDataResultEEC2Ev Line | Count | Source | 37 | 22 | DummyBrpcCallback() { | 38 | 22 | cntl_ = std::make_shared<brpc::Controller>(); | 39 | 22 | call_id_ = cntl_->call_id(); | 40 | 22 | response_ = std::make_shared<Response>(); | 41 | 22 | } |
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_27PTabletWriterAddBlockResultEEC2Ev Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PTabletWriterOpenResultEEC2Ev Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_25PTabletWriterCancelResultEEC2Ev |
42 | | |
43 | 22 | virtual ~DummyBrpcCallback() = default; Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEED2Ev Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_20PMergeFilterResponseEED2Ev Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEED2Ev Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_22PPublishFilterResponseEED2Ev Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PSendFilterSizeResponseEED2Ev Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEED2Ev _ZN5doris17DummyBrpcCallbackINS_19PTransmitDataResultEED2Ev Line | Count | Source | 43 | 22 | virtual ~DummyBrpcCallback() = default; |
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_27PTabletWriterAddBlockResultEED2Ev Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PTabletWriterOpenResultEED2Ev Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_25PTabletWriterCancelResultEED2Ev |
44 | | |
45 | 0 | virtual void call() {}Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_19PTransmitDataResultEE4callEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEE4callEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_20PMergeFilterResponseEE4callEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEE4callEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_22PPublishFilterResponseEE4callEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PSendFilterSizeResponseEE4callEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEE4callEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_27PTabletWriterAddBlockResultEE4callEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PTabletWriterOpenResultEE4callEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_25PTabletWriterCancelResultEE4callEv |
46 | | |
47 | 0 | virtual void join() { brpc::Join(call_id_); }Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_19PTransmitDataResultEE4joinEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEE4joinEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_20PMergeFilterResponseEE4joinEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEE4joinEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_22PPublishFilterResponseEE4joinEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PSendFilterSizeResponseEE4joinEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEE4joinEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_27PTabletWriterAddBlockResultEE4joinEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PTabletWriterOpenResultEE4joinEv Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_25PTabletWriterCancelResultEE4joinEv |
48 | | |
49 | | // according to brpc doc, we MUST save the call_id before rpc done. use this id to join. |
50 | | // if a rpc is already done then we get the id and join, it's wrong. |
51 | | brpc::CallId call_id_; |
52 | | // controller has to be the same lifecycle with the closure, because brpc may use |
53 | | // it in any stage of the rpc. |
54 | | std::shared_ptr<brpc::Controller> cntl_; |
55 | | // We do not know if brpc will use request or response after brpc method returns. |
56 | | // So that we need keep a shared ptr here to ensure that brpc could use req/rep |
57 | | // at any stage. |
58 | | std::shared_ptr<Response> response_; |
59 | | }; |
60 | | |
61 | | // The closure will be deleted after callback. |
62 | | // It could only be created by using shared ptr or unique ptr. |
63 | | // It will hold a weak ptr of T and call run of T |
64 | | // Callback() { |
65 | | // xxxx; |
66 | | // public |
67 | | // void run() { |
68 | | // logxxx |
69 | | // } |
70 | | // } |
71 | | // |
72 | | // std::shared_ptr<Callback> b; |
73 | | // |
74 | | // std::unique_ptr<AutoReleaseClosure> a(b); |
75 | | // brpc_call(a.release()); |
76 | | |
77 | | template <typename T> |
78 | | concept HasStatus = requires(T* response) { response->status(); }; |
79 | | |
80 | | template <typename Request, typename Callback> |
81 | | class AutoReleaseClosure : public google::protobuf::Closure { |
82 | | using Weak = typename std::shared_ptr<Callback>::weak_type; |
83 | | using ResponseType = typename Callback::ResponseType; |
84 | | ENABLE_FACTORY_CREATOR(AutoReleaseClosure); |
85 | | |
86 | | public: |
87 | | AutoReleaseClosure(std::shared_ptr<Request> req, std::shared_ptr<Callback> callback, |
88 | | std::weak_ptr<QueryContext> context = {}, std::string_view error_msg = {}) |
89 | 28 | : request_(req), callback_(callback), context_(std::move(context)) { |
90 | 28 | this->cntl_ = callback->cntl_; |
91 | 28 | this->response_ = callback->response_; |
92 | 28 | } Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_24PTabletWriteSlaveRequestENS_17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_19PMergeFilterRequestENS_17DummyBrpcCallbackINS_20PMergeFilterResponseEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PSyncFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_23PPublishFilterRequestV2ENS_17DummyBrpcCallbackINS_22PPublishFilterResponseEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PSendFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSendFilterSizeResponseEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_28PTabletWriteSlaveDoneRequestENS_17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE _ZN5doris18AutoReleaseClosureINS_19PTransmitDataParamsENS_8pipeline20ExchangeSendCallbackINS_19PTransmitDataResultEEEEC2ESt10shared_ptrIS1_ES7_IS5_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE Line | Count | Source | 89 | 28 | : request_(req), callback_(callback), context_(std::move(context)) { | 90 | 28 | this->cntl_ = callback->cntl_; | 91 | 28 | this->response_ = callback->response_; | 92 | 28 | } |
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_24PTabletWriterOpenRequestENS_17DummyBrpcCallbackINS_23PTabletWriterOpenResultEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_28PTabletWriterAddBlockRequestENS_10vectorized18WriteBlockCallbackINS_27PTabletWriterAddBlockResultEEEEC2ESt10shared_ptrIS1_ES7_IS5_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_26PTabletWriterCancelRequestENS_17DummyBrpcCallbackINS_25PTabletWriterCancelResultEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE |
93 | | |
94 | 28 | ~AutoReleaseClosure() override = default; Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_24PTabletWriteSlaveRequestENS_17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEEEED2Ev Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_19PMergeFilterRequestENS_17DummyBrpcCallbackINS_20PMergeFilterResponseEEEED2Ev Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PSyncFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEEEED2Ev Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_23PPublishFilterRequestV2ENS_17DummyBrpcCallbackINS_22PPublishFilterResponseEEEED2Ev Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PSendFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSendFilterSizeResponseEEEED2Ev Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_28PTabletWriteSlaveDoneRequestENS_17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEEEED2Ev _ZN5doris18AutoReleaseClosureINS_19PTransmitDataParamsENS_8pipeline20ExchangeSendCallbackINS_19PTransmitDataResultEEEED2Ev Line | Count | Source | 94 | 28 | ~AutoReleaseClosure() override = default; |
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_24PTabletWriterOpenRequestENS_17DummyBrpcCallbackINS_23PTabletWriterOpenResultEEEED2Ev Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_28PTabletWriterAddBlockRequestENS_10vectorized18WriteBlockCallbackINS_27PTabletWriterAddBlockResultEEEED2Ev Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_26PTabletWriterCancelRequestENS_17DummyBrpcCallbackINS_25PTabletWriterCancelResultEEEED2Ev |
95 | | |
96 | | // Will delete itself |
97 | 28 | void Run() override { |
98 | 28 | Defer defer {[&]() { delete this; }};Unexecuted instantiation: _ZZN5doris18AutoReleaseClosureINS_24PTabletWriteSlaveRequestENS_17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEEEE3RunEvENKUlvE_clEv Unexecuted instantiation: _ZZN5doris18AutoReleaseClosureINS_19PMergeFilterRequestENS_17DummyBrpcCallbackINS_20PMergeFilterResponseEEEE3RunEvENKUlvE_clEv Unexecuted instantiation: _ZZN5doris18AutoReleaseClosureINS_22PSyncFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEEEE3RunEvENKUlvE_clEv Unexecuted instantiation: _ZZN5doris18AutoReleaseClosureINS_23PPublishFilterRequestV2ENS_17DummyBrpcCallbackINS_22PPublishFilterResponseEEEE3RunEvENKUlvE_clEv Unexecuted instantiation: _ZZN5doris18AutoReleaseClosureINS_22PSendFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSendFilterSizeResponseEEEE3RunEvENKUlvE_clEv Unexecuted instantiation: _ZZN5doris18AutoReleaseClosureINS_28PTabletWriteSlaveDoneRequestENS_17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEEEE3RunEvENKUlvE_clEv _ZZN5doris18AutoReleaseClosureINS_19PTransmitDataParamsENS_8pipeline20ExchangeSendCallbackINS_19PTransmitDataResultEEEE3RunEvENKUlvE_clEv Line | Count | Source | 98 | 28 | Defer defer {[&]() { delete this; }}; |
Unexecuted instantiation: _ZZN5doris18AutoReleaseClosureINS_24PTabletWriterOpenRequestENS_17DummyBrpcCallbackINS_23PTabletWriterOpenResultEEEE3RunEvENKUlvE_clEv Unexecuted instantiation: _ZZN5doris18AutoReleaseClosureINS_28PTabletWriterAddBlockRequestENS_10vectorized18WriteBlockCallbackINS_27PTabletWriterAddBlockResultEEEE3RunEvENKUlvE_clEv Unexecuted instantiation: _ZZN5doris18AutoReleaseClosureINS_26PTabletWriterCancelRequestENS_17DummyBrpcCallbackINS_25PTabletWriterCancelResultEEEE3RunEvENKUlvE_clEv |
99 | | // If lock failed, it means the callback object is deconstructed, then no need |
100 | | // to deal with the callback any more. |
101 | 28 | if (auto tmp = callback_.lock()) { |
102 | 28 | tmp->call(); |
103 | 28 | } |
104 | 28 | if (cntl_->Failed()) { |
105 | 1 | _process_if_rpc_failed(); |
106 | 27 | } else { |
107 | 27 | _process_status<ResponseType>(response_.get()); |
108 | 27 | } |
109 | 28 | } _ZN5doris18AutoReleaseClosureINS_19PTransmitDataParamsENS_8pipeline20ExchangeSendCallbackINS_19PTransmitDataResultEEEE3RunEv Line | Count | Source | 97 | 28 | void Run() override { | 98 | 28 | Defer defer {[&]() { delete this; }}; | 99 | | // If lock failed, it means the callback object is deconstructed, then no need | 100 | | // to deal with the callback any more. | 101 | 28 | if (auto tmp = callback_.lock()) { | 102 | 28 | tmp->call(); | 103 | 28 | } | 104 | 28 | if (cntl_->Failed()) { | 105 | 1 | _process_if_rpc_failed(); | 106 | 27 | } else { | 107 | 27 | _process_status<ResponseType>(response_.get()); | 108 | 27 | } | 109 | 28 | } |
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_24PTabletWriteSlaveRequestENS_17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEEEE3RunEv Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_19PMergeFilterRequestENS_17DummyBrpcCallbackINS_20PMergeFilterResponseEEEE3RunEv Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PSyncFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEEEE3RunEv Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_23PPublishFilterRequestV2ENS_17DummyBrpcCallbackINS_22PPublishFilterResponseEEEE3RunEv Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PSendFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSendFilterSizeResponseEEEE3RunEv Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_28PTabletWriteSlaveDoneRequestENS_17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEEEE3RunEv Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_24PTabletWriterOpenRequestENS_17DummyBrpcCallbackINS_23PTabletWriterOpenResultEEEE3RunEv Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_28PTabletWriterAddBlockRequestENS_10vectorized18WriteBlockCallbackINS_27PTabletWriterAddBlockResultEEEE3RunEv Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_26PTabletWriterCancelRequestENS_17DummyBrpcCallbackINS_25PTabletWriterCancelResultEEEE3RunEv |
110 | | |
111 | | // controller has to be the same lifecycle with the closure, because brpc may use |
112 | | // it in any stage of the rpc. |
113 | | std::shared_ptr<brpc::Controller> cntl_; |
114 | | // We do not know if brpc will use request or response after brpc method returns. |
115 | | // So that we need keep a shared ptr here to ensure that brpc could use req/rep |
116 | | // at any stage. |
117 | | std::shared_ptr<Request> request_; |
118 | | std::shared_ptr<ResponseType> response_; |
119 | | std::string error_msg_; |
120 | | |
121 | | protected: |
122 | 1 | virtual void _process_if_rpc_failed() { |
123 | 1 | std::string error_msg = |
124 | 1 | fmt::format("RPC meet failed: {} {}", cntl_->ErrorText(), error_msg_); |
125 | 1 | if (auto ctx = context_.lock(); ctx) { |
126 | 0 | ctx->cancel(Status::NetworkError(error_msg)); |
127 | 1 | } else { |
128 | 1 | LOG(WARNING) << error_msg; |
129 | 1 | } |
130 | 1 | } _ZN5doris18AutoReleaseClosureINS_19PTransmitDataParamsENS_8pipeline20ExchangeSendCallbackINS_19PTransmitDataResultEEEE22_process_if_rpc_failedEv Line | Count | Source | 122 | 1 | virtual void _process_if_rpc_failed() { | 123 | 1 | std::string error_msg = | 124 | 1 | fmt::format("RPC meet failed: {} {}", cntl_->ErrorText(), error_msg_); | 125 | 1 | if (auto ctx = context_.lock(); ctx) { | 126 | 0 | ctx->cancel(Status::NetworkError(error_msg)); | 127 | 1 | } else { | 128 | | LOG(WARNING) << error_msg; | 129 | 1 | } | 130 | 1 | } |
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_24PTabletWriteSlaveRequestENS_17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEEEE22_process_if_rpc_failedEv Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_19PMergeFilterRequestENS_17DummyBrpcCallbackINS_20PMergeFilterResponseEEEE22_process_if_rpc_failedEv Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PSyncFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEEEE22_process_if_rpc_failedEv Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_23PPublishFilterRequestV2ENS_17DummyBrpcCallbackINS_22PPublishFilterResponseEEEE22_process_if_rpc_failedEv Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PSendFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSendFilterSizeResponseEEEE22_process_if_rpc_failedEv Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_28PTabletWriteSlaveDoneRequestENS_17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEEEE22_process_if_rpc_failedEv Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_24PTabletWriterOpenRequestENS_17DummyBrpcCallbackINS_23PTabletWriterOpenResultEEEE22_process_if_rpc_failedEv Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_28PTabletWriterAddBlockRequestENS_10vectorized18WriteBlockCallbackINS_27PTabletWriterAddBlockResultEEEE22_process_if_rpc_failedEv Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_26PTabletWriterCancelRequestENS_17DummyBrpcCallbackINS_25PTabletWriterCancelResultEEEE22_process_if_rpc_failedEv |
131 | | |
132 | 2 | virtual void _process_if_meet_error_status(const Status& status) { |
133 | 2 | if (status.is<ErrorCode::END_OF_FILE>()) { |
134 | | // no need to log END_OF_FILE, reduce the unlessful log |
135 | 2 | return; |
136 | 2 | } |
137 | 0 | if (auto ctx = context_.lock(); ctx) { |
138 | 0 | ctx->cancel(status); |
139 | 0 | } else { |
140 | 0 | LOG(WARNING) << "RPC meet error status: " << status; |
141 | 0 | } |
142 | 0 | } _ZN5doris18AutoReleaseClosureINS_19PTransmitDataParamsENS_8pipeline20ExchangeSendCallbackINS_19PTransmitDataResultEEEE29_process_if_meet_error_statusERKNS_6StatusE Line | Count | Source | 132 | 2 | virtual void _process_if_meet_error_status(const Status& status) { | 133 | 2 | if (status.is<ErrorCode::END_OF_FILE>()) { | 134 | | // no need to log END_OF_FILE, reduce the unlessful log | 135 | 2 | return; | 136 | 2 | } | 137 | 0 | if (auto ctx = context_.lock(); ctx) { | 138 | 0 | ctx->cancel(status); | 139 | 0 | } else { | 140 | | LOG(WARNING) << "RPC meet error status: " << status; | 141 | 0 | } | 142 | 0 | } |
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_24PTabletWriteSlaveRequestENS_17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEEEE29_process_if_meet_error_statusERKNS_6StatusE Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_19PMergeFilterRequestENS_17DummyBrpcCallbackINS_20PMergeFilterResponseEEEE29_process_if_meet_error_statusERKNS_6StatusE Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PSyncFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEEEE29_process_if_meet_error_statusERKNS_6StatusE Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_23PPublishFilterRequestV2ENS_17DummyBrpcCallbackINS_22PPublishFilterResponseEEEE29_process_if_meet_error_statusERKNS_6StatusE Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PSendFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSendFilterSizeResponseEEEE29_process_if_meet_error_statusERKNS_6StatusE Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_28PTabletWriteSlaveDoneRequestENS_17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEEEE29_process_if_meet_error_statusERKNS_6StatusE Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_24PTabletWriterOpenRequestENS_17DummyBrpcCallbackINS_23PTabletWriterOpenResultEEEE29_process_if_meet_error_statusERKNS_6StatusE Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_28PTabletWriterAddBlockRequestENS_10vectorized18WriteBlockCallbackINS_27PTabletWriterAddBlockResultEEEE29_process_if_meet_error_statusERKNS_6StatusE Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_26PTabletWriterCancelRequestENS_17DummyBrpcCallbackINS_25PTabletWriterCancelResultEEEE29_process_if_meet_error_statusERKNS_6StatusE |
143 | | |
144 | | private: |
145 | | template <typename Response> |
146 | 0 | void _process_status(Response* response) {} |
147 | | |
148 | | template <HasStatus Response> |
149 | 27 | void _process_status(Response* response) { |
150 | 27 | if (Status status = Status::create(response->status()); !status.ok()) { |
151 | 2 | _process_if_meet_error_status(status); |
152 | 2 | } |
153 | 27 | } _ZN5doris18AutoReleaseClosureINS_19PTransmitDataParamsENS_8pipeline20ExchangeSendCallbackINS_19PTransmitDataResultEEEE15_process_statusITkNS_9HasStatusES4_EEvPT_ Line | Count | Source | 149 | 27 | void _process_status(Response* response) { | 150 | 27 | if (Status status = Status::create(response->status()); !status.ok()) { | 151 | 2 | _process_if_meet_error_status(status); | 152 | 2 | } | 153 | 27 | } |
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_24PTabletWriteSlaveRequestENS_17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEEEE15_process_statusITkNS_9HasStatusES3_EEvPT_ Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_19PMergeFilterRequestENS_17DummyBrpcCallbackINS_20PMergeFilterResponseEEEE15_process_statusITkNS_9HasStatusES3_EEvPT_ Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PSyncFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEEEE15_process_statusITkNS_9HasStatusES3_EEvPT_ Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_23PPublishFilterRequestV2ENS_17DummyBrpcCallbackINS_22PPublishFilterResponseEEEE15_process_statusITkNS_9HasStatusES3_EEvPT_ Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PSendFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSendFilterSizeResponseEEEE15_process_statusITkNS_9HasStatusES3_EEvPT_ Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_28PTabletWriteSlaveDoneRequestENS_17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEEEE15_process_statusITkNS_9HasStatusES3_EEvPT_ Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_24PTabletWriterOpenRequestENS_17DummyBrpcCallbackINS_23PTabletWriterOpenResultEEEE15_process_statusITkNS_9HasStatusES3_EEvPT_ Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_28PTabletWriterAddBlockRequestENS_10vectorized18WriteBlockCallbackINS_27PTabletWriterAddBlockResultEEEE15_process_statusITkNS_9HasStatusES4_EEvPT_ |
154 | | // Use a weak ptr to keep the callback, so that the callback can be deleted if the main |
155 | | // thread is freed. |
156 | | Weak callback_; |
157 | | std::weak_ptr<QueryContext> context_; |
158 | | }; |
159 | | |
160 | | } // namespace doris |