Coverage Report

Created: 2026-03-16 13:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/brpc_closure.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <google/protobuf/stubs/common.h>
21
22
#include <atomic>
23
#include <utility>
24
25
#include "runtime/query_context.h"
26
#include "runtime/thread_context.h"
27
#include "service/brpc.h"
28
29
namespace doris {
30
31
template <typename Response>
32
class DummyBrpcCallback {
33
    ENABLE_FACTORY_CREATOR(DummyBrpcCallback);
34
35
public:
36
    using ResponseType = Response;
37
4.06M
    DummyBrpcCallback() {
38
4.06M
        cntl_ = std::make_shared<brpc::Controller>();
39
4.06M
        call_id_ = cntl_->call_id();
40
4.06M
        response_ = std::make_shared<Response>();
41
4.06M
    }
_ZN5doris17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEEC2Ev
Line
Count
Source
37
211
    DummyBrpcCallback() {
38
211
        cntl_ = std::make_shared<brpc::Controller>();
39
211
        call_id_ = cntl_->call_id();
40
211
        response_ = std::make_shared<Response>();
41
211
    }
_ZN5doris17DummyBrpcCallbackINS_22PPublishFilterResponseEEC2Ev
Line
Count
Source
37
1.51k
    DummyBrpcCallback() {
38
1.51k
        cntl_ = std::make_shared<brpc::Controller>();
39
1.51k
        call_id_ = cntl_->call_id();
40
1.51k
        response_ = std::make_shared<Response>();
41
1.51k
    }
_ZN5doris17DummyBrpcCallbackINS_23PSendFilterSizeResponseEEC2Ev
Line
Count
Source
37
212
    DummyBrpcCallback() {
38
212
        cntl_ = std::make_shared<brpc::Controller>();
39
212
        call_id_ = cntl_->call_id();
40
212
        response_ = std::make_shared<Response>();
41
212
    }
_ZN5doris17DummyBrpcCallbackINS_20PMergeFilterResponseEEC2Ev
Line
Count
Source
37
4.97k
    DummyBrpcCallback() {
38
4.97k
        cntl_ = std::make_shared<brpc::Controller>();
39
4.97k
        call_id_ = cntl_->call_id();
40
4.97k
        response_ = std::make_shared<Response>();
41
4.97k
    }
_ZN5doris17DummyBrpcCallbackINS_27PTabletWriterAddBlockResultEEC2Ev
Line
Count
Source
37
68.4k
    DummyBrpcCallback() {
38
68.4k
        cntl_ = std::make_shared<brpc::Controller>();
39
68.4k
        call_id_ = cntl_->call_id();
40
68.4k
        response_ = std::make_shared<Response>();
41
68.4k
    }
_ZN5doris17DummyBrpcCallbackINS_23PTabletWriterOpenResultEEC2Ev
Line
Count
Source
37
68.4k
    DummyBrpcCallback() {
38
68.4k
        cntl_ = std::make_shared<brpc::Controller>();
39
68.4k
        call_id_ = cntl_->call_id();
40
68.4k
        response_ = std::make_shared<Response>();
41
68.4k
    }
_ZN5doris17DummyBrpcCallbackINS_25PTabletWriterCancelResultEEC2Ev
Line
Count
Source
37
184
    DummyBrpcCallback() {
38
184
        cntl_ = std::make_shared<brpc::Controller>();
39
184
        call_id_ = cntl_->call_id();
40
184
        response_ = std::make_shared<Response>();
41
184
    }
_ZN5doris17DummyBrpcCallbackINS_19PTransmitDataResultEEC2Ev
Line
Count
Source
37
3.92M
    DummyBrpcCallback() {
38
3.92M
        cntl_ = std::make_shared<brpc::Controller>();
39
3.92M
        call_id_ = cntl_->call_id();
40
3.92M
        response_ = std::make_shared<Response>();
41
3.92M
    }
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_29PFetchArrowFlightSchemaResultEEC2Ev
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_21PFetchArrowDataResultEEC2Ev
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEEC2Ev
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEEC2Ev
42
43
4.10M
    virtual ~DummyBrpcCallback() = default;
_ZN5doris17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEED2Ev
Line
Count
Source
43
211
    virtual ~DummyBrpcCallback() = default;
_ZN5doris17DummyBrpcCallbackINS_22PPublishFilterResponseEED2Ev
Line
Count
Source
43
1.51k
    virtual ~DummyBrpcCallback() = default;
_ZN5doris17DummyBrpcCallbackINS_23PSendFilterSizeResponseEED2Ev
Line
Count
Source
43
212
    virtual ~DummyBrpcCallback() = default;
_ZN5doris17DummyBrpcCallbackINS_20PMergeFilterResponseEED2Ev
Line
Count
Source
43
4.98k
    virtual ~DummyBrpcCallback() = default;
_ZN5doris17DummyBrpcCallbackINS_27PTabletWriterAddBlockResultEED2Ev
Line
Count
Source
43
68.5k
    virtual ~DummyBrpcCallback() = default;
_ZN5doris17DummyBrpcCallbackINS_23PTabletWriterOpenResultEED2Ev
Line
Count
Source
43
68.7k
    virtual ~DummyBrpcCallback() = default;
_ZN5doris17DummyBrpcCallbackINS_25PTabletWriterCancelResultEED2Ev
Line
Count
Source
43
184
    virtual ~DummyBrpcCallback() = default;
_ZN5doris17DummyBrpcCallbackINS_19PTransmitDataResultEED2Ev
Line
Count
Source
43
3.96M
    virtual ~DummyBrpcCallback() = default;
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_29PFetchArrowFlightSchemaResultEED2Ev
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_21PFetchArrowDataResultEED2Ev
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEED2Ev
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEED2Ev
44
45
68.6k
    virtual void call() {}
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_19PTransmitDataResultEE4callEv
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEE4callEv
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_22PPublishFilterResponseEE4callEv
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PSendFilterSizeResponseEE4callEv
_ZN5doris17DummyBrpcCallbackINS_20PMergeFilterResponseEE4callEv
Line
Count
Source
45
5
    virtual void call() {}
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_27PTabletWriterAddBlockResultEE4callEv
_ZN5doris17DummyBrpcCallbackINS_23PTabletWriterOpenResultEE4callEv
Line
Count
Source
45
68.6k
    virtual void call() {}
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_25PTabletWriterCancelResultEE4callEv
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_29PFetchArrowFlightSchemaResultEE4callEv
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_21PFetchArrowDataResultEE4callEv
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEE4callEv
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEE4callEv
46
47
68.8k
    virtual void join() { brpc::Join(call_id_); }
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_19PTransmitDataResultEE4joinEv
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEE4joinEv
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_22PPublishFilterResponseEE4joinEv
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PSendFilterSizeResponseEE4joinEv
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_20PMergeFilterResponseEE4joinEv
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_27PTabletWriterAddBlockResultEE4joinEv
_ZN5doris17DummyBrpcCallbackINS_23PTabletWriterOpenResultEE4joinEv
Line
Count
Source
47
68.8k
    virtual void join() { brpc::Join(call_id_); }
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_25PTabletWriterCancelResultEE4joinEv
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_29PFetchArrowFlightSchemaResultEE4joinEv
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_21PFetchArrowDataResultEE4joinEv
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEE4joinEv
Unexecuted instantiation: _ZN5doris17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEE4joinEv
48
49
    // according to brpc doc, we MUST save the call_id before rpc done. use this id to join.
50
    // if a rpc is already done then we get the id and join, it's wrong.
51
    brpc::CallId call_id_;
52
    // controller has to be the same lifecycle with the closure, because brpc may use
53
    // it in any stage of the rpc.
54
    std::shared_ptr<brpc::Controller> cntl_;
55
    // We do not know if brpc will use request or response after brpc method returns.
56
    // So that we need keep a shared ptr here to ensure that brpc could use req/rep
57
    // at any stage.
58
    std::shared_ptr<Response> response_;
59
};
60
61
// The closure will be deleted after callback.
62
// It could only be created by using shared ptr or unique ptr.
63
// It will hold a weak ptr of T and call run of T
64
// Callback() {
65
//  xxxx;
66
//  public
67
//  void run() {
68
//      logxxx
69
//  }
70
//  }
71
//
72
//  std::shared_ptr<Callback> b;
73
//
74
//  std::unique_ptr<AutoReleaseClosure> a(b);
75
//  brpc_call(a.release());
76
77
template <typename T>
78
concept HasStatus = requires(T* response) { response->status(); };
79
80
template <typename Request, typename Callback>
81
class AutoReleaseClosure : public google::protobuf::Closure {
82
    using Weak = typename std::shared_ptr<Callback>::weak_type;
83
    using ResponseType = typename Callback::ResponseType;
84
    ENABLE_FACTORY_CREATOR(AutoReleaseClosure);
85
86
public:
87
    AutoReleaseClosure(std::shared_ptr<Request> req, std::shared_ptr<Callback> callback,
88
                       std::weak_ptr<QueryContext> context = {}, std::string_view error_msg = {})
89
4.10M
            : request_(req), callback_(callback), context_(std::move(context)) {
90
4.10M
        this->cntl_ = callback->cntl_;
91
4.10M
        this->response_ = callback->response_;
92
4.10M
    }
_ZN5doris18AutoReleaseClosureINS_22PSyncFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE
Line
Count
Source
89
211
            : request_(req), callback_(callback), context_(std::move(context)) {
90
211
        this->cntl_ = callback->cntl_;
91
211
        this->response_ = callback->response_;
92
211
    }
_ZN5doris18AutoReleaseClosureINS_23PPublishFilterRequestV2ENS_17DummyBrpcCallbackINS_22PPublishFilterResponseEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE
Line
Count
Source
89
1.51k
            : request_(req), callback_(callback), context_(std::move(context)) {
90
1.51k
        this->cntl_ = callback->cntl_;
91
1.51k
        this->response_ = callback->response_;
92
1.51k
    }
_ZN5doris18AutoReleaseClosureINS_22PSendFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSendFilterSizeResponseEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE
Line
Count
Source
89
212
            : request_(req), callback_(callback), context_(std::move(context)) {
90
212
        this->cntl_ = callback->cntl_;
91
212
        this->response_ = callback->response_;
92
212
    }
_ZN5doris18AutoReleaseClosureINS_19PMergeFilterRequestENS_17DummyBrpcCallbackINS_20PMergeFilterResponseEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE
Line
Count
Source
89
4.98k
            : request_(req), callback_(callback), context_(std::move(context)) {
90
4.98k
        this->cntl_ = callback->cntl_;
91
4.98k
        this->response_ = callback->response_;
92
4.98k
    }
_ZN5doris18AutoReleaseClosureINS_24PTabletWriterOpenRequestENS_17DummyBrpcCallbackINS_23PTabletWriterOpenResultEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE
Line
Count
Source
89
68.6k
            : request_(req), callback_(callback), context_(std::move(context)) {
90
68.6k
        this->cntl_ = callback->cntl_;
91
68.6k
        this->response_ = callback->response_;
92
68.6k
    }
_ZN5doris18AutoReleaseClosureINS_28PTabletWriterAddBlockRequestENS_18WriteBlockCallbackINS_27PTabletWriterAddBlockResultEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE
Line
Count
Source
89
70.8k
            : request_(req), callback_(callback), context_(std::move(context)) {
90
70.8k
        this->cntl_ = callback->cntl_;
91
70.8k
        this->response_ = callback->response_;
92
70.8k
    }
_ZN5doris18AutoReleaseClosureINS_26PTabletWriterCancelRequestENS_17DummyBrpcCallbackINS_25PTabletWriterCancelResultEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE
Line
Count
Source
89
184
            : request_(req), callback_(callback), context_(std::move(context)) {
90
184
        this->cntl_ = callback->cntl_;
91
184
        this->response_ = callback->response_;
92
184
    }
_ZN5doris18AutoReleaseClosureINS_19PTransmitDataParamsENS_20ExchangeSendCallbackINS_19PTransmitDataResultEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE
Line
Count
Source
89
3.96M
            : request_(req), callback_(callback), context_(std::move(context)) {
90
3.96M
        this->cntl_ = callback->cntl_;
91
3.96M
        this->response_ = callback->response_;
92
3.96M
    }
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_30PFetchArrowFlightSchemaRequestENS_17DummyBrpcCallbackINS_29PFetchArrowFlightSchemaResultEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PFetchArrowDataRequestENS_17DummyBrpcCallbackINS_21PFetchArrowDataResultEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_28PTabletWriteSlaveDoneRequestENS_17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_24PTabletWriteSlaveRequestENS_17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEEEEC2ESt10shared_ptrIS1_ES6_IS4_ESt8weak_ptrINS_12QueryContextEESt17basic_string_viewIcSt11char_traitsIcEE
93
94
4.06M
    ~AutoReleaseClosure() override = default;
_ZN5doris18AutoReleaseClosureINS_22PSyncFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEEEED2Ev
Line
Count
Source
94
211
    ~AutoReleaseClosure() override = default;
_ZN5doris18AutoReleaseClosureINS_23PPublishFilterRequestV2ENS_17DummyBrpcCallbackINS_22PPublishFilterResponseEEEED2Ev
Line
Count
Source
94
1.51k
    ~AutoReleaseClosure() override = default;
_ZN5doris18AutoReleaseClosureINS_22PSendFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSendFilterSizeResponseEEEED2Ev
Line
Count
Source
94
212
    ~AutoReleaseClosure() override = default;
_ZN5doris18AutoReleaseClosureINS_19PMergeFilterRequestENS_17DummyBrpcCallbackINS_20PMergeFilterResponseEEEED2Ev
Line
Count
Source
94
4.97k
    ~AutoReleaseClosure() override = default;
_ZN5doris18AutoReleaseClosureINS_24PTabletWriterOpenRequestENS_17DummyBrpcCallbackINS_23PTabletWriterOpenResultEEEED2Ev
Line
Count
Source
94
68.6k
    ~AutoReleaseClosure() override = default;
_ZN5doris18AutoReleaseClosureINS_28PTabletWriterAddBlockRequestENS_18WriteBlockCallbackINS_27PTabletWriterAddBlockResultEEEED2Ev
Line
Count
Source
94
70.7k
    ~AutoReleaseClosure() override = default;
_ZN5doris18AutoReleaseClosureINS_26PTabletWriterCancelRequestENS_17DummyBrpcCallbackINS_25PTabletWriterCancelResultEEEED2Ev
Line
Count
Source
94
184
    ~AutoReleaseClosure() override = default;
_ZN5doris18AutoReleaseClosureINS_19PTransmitDataParamsENS_20ExchangeSendCallbackINS_19PTransmitDataResultEEEED2Ev
Line
Count
Source
94
3.91M
    ~AutoReleaseClosure() override = default;
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_30PFetchArrowFlightSchemaRequestENS_17DummyBrpcCallbackINS_29PFetchArrowFlightSchemaResultEEEED2Ev
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PFetchArrowDataRequestENS_17DummyBrpcCallbackINS_21PFetchArrowDataResultEEEED2Ev
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_28PTabletWriteSlaveDoneRequestENS_17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEEEED2Ev
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_24PTabletWriteSlaveRequestENS_17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEEEED2Ev
95
96
    //  Will delete itself
97
4.08M
    void Run() override {
98
4.08M
        Defer defer {[&]() { delete this; }};
_ZZN5doris18AutoReleaseClosureINS_22PSyncFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEEEE3RunEvENKUlvE_clEv
Line
Count
Source
98
211
        Defer defer {[&]() { delete this; }};
_ZZN5doris18AutoReleaseClosureINS_23PPublishFilterRequestV2ENS_17DummyBrpcCallbackINS_22PPublishFilterResponseEEEE3RunEvENKUlvE_clEv
Line
Count
Source
98
1.51k
        Defer defer {[&]() { delete this; }};
_ZZN5doris18AutoReleaseClosureINS_22PSendFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSendFilterSizeResponseEEEE3RunEvENKUlvE_clEv
Line
Count
Source
98
212
        Defer defer {[&]() { delete this; }};
_ZZN5doris18AutoReleaseClosureINS_19PMergeFilterRequestENS_17DummyBrpcCallbackINS_20PMergeFilterResponseEEEE3RunEvENKUlvE_clEv
Line
Count
Source
98
4.97k
        Defer defer {[&]() { delete this; }};
_ZZN5doris18AutoReleaseClosureINS_24PTabletWriterOpenRequestENS_17DummyBrpcCallbackINS_23PTabletWriterOpenResultEEEE3RunEvENKUlvE_clEv
Line
Count
Source
98
68.6k
        Defer defer {[&]() { delete this; }};
_ZZN5doris18AutoReleaseClosureINS_28PTabletWriterAddBlockRequestENS_18WriteBlockCallbackINS_27PTabletWriterAddBlockResultEEEE3RunEvENKUlvE_clEv
Line
Count
Source
98
70.7k
        Defer defer {[&]() { delete this; }};
_ZZN5doris18AutoReleaseClosureINS_26PTabletWriterCancelRequestENS_17DummyBrpcCallbackINS_25PTabletWriterCancelResultEEEE3RunEvENKUlvE_clEv
Line
Count
Source
98
184
        Defer defer {[&]() { delete this; }};
_ZZN5doris18AutoReleaseClosureINS_19PTransmitDataParamsENS_20ExchangeSendCallbackINS_19PTransmitDataResultEEEE3RunEvENKUlvE_clEv
Line
Count
Source
98
3.93M
        Defer defer {[&]() { delete this; }};
Unexecuted instantiation: _ZZN5doris18AutoReleaseClosureINS_30PFetchArrowFlightSchemaRequestENS_17DummyBrpcCallbackINS_29PFetchArrowFlightSchemaResultEEEE3RunEvENKUlvE_clEv
Unexecuted instantiation: _ZZN5doris18AutoReleaseClosureINS_22PFetchArrowDataRequestENS_17DummyBrpcCallbackINS_21PFetchArrowDataResultEEEE3RunEvENKUlvE_clEv
Unexecuted instantiation: _ZZN5doris18AutoReleaseClosureINS_28PTabletWriteSlaveDoneRequestENS_17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEEEE3RunEvENKUlvE_clEv
Unexecuted instantiation: _ZZN5doris18AutoReleaseClosureINS_24PTabletWriteSlaveRequestENS_17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEEEE3RunEvENKUlvE_clEv
99
        // If lock failed, it means the callback object is deconstructed, then no need
100
        // to deal with the callback any more.
101
4.09M
        if (auto tmp = callback_.lock()) {
102
4.09M
            tmp->call();
103
4.09M
        }
104
4.08M
        if (cntl_->Failed()) {
105
1
            _process_if_rpc_failed();
106
4.08M
        } else {
107
4.08M
            _process_status<ResponseType>(response_.get());
108
4.08M
        }
109
4.08M
    }
_ZN5doris18AutoReleaseClosureINS_22PSyncFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEEEE3RunEv
Line
Count
Source
97
211
    void Run() override {
98
211
        Defer defer {[&]() { delete this; }};
99
        // If lock failed, it means the callback object is deconstructed, then no need
100
        // to deal with the callback any more.
101
211
        if (auto tmp = callback_.lock()) {
102
0
            tmp->call();
103
0
        }
104
211
        if (cntl_->Failed()) {
105
0
            _process_if_rpc_failed();
106
211
        } else {
107
211
            _process_status<ResponseType>(response_.get());
108
211
        }
109
211
    }
_ZN5doris18AutoReleaseClosureINS_23PPublishFilterRequestV2ENS_17DummyBrpcCallbackINS_22PPublishFilterResponseEEEE3RunEv
Line
Count
Source
97
1.51k
    void Run() override {
98
1.51k
        Defer defer {[&]() { delete this; }};
99
        // If lock failed, it means the callback object is deconstructed, then no need
100
        // to deal with the callback any more.
101
1.51k
        if (auto tmp = callback_.lock()) {
102
0
            tmp->call();
103
0
        }
104
1.51k
        if (cntl_->Failed()) {
105
0
            _process_if_rpc_failed();
106
1.51k
        } else {
107
1.51k
            _process_status<ResponseType>(response_.get());
108
1.51k
        }
109
1.51k
    }
_ZN5doris18AutoReleaseClosureINS_22PSendFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSendFilterSizeResponseEEEE3RunEv
Line
Count
Source
97
212
    void Run() override {
98
212
        Defer defer {[&]() { delete this; }};
99
        // If lock failed, it means the callback object is deconstructed, then no need
100
        // to deal with the callback any more.
101
212
        if (auto tmp = callback_.lock()) {
102
0
            tmp->call();
103
0
        }
104
212
        if (cntl_->Failed()) {
105
0
            _process_if_rpc_failed();
106
212
        } else {
107
212
            _process_status<ResponseType>(response_.get());
108
212
        }
109
212
    }
_ZN5doris18AutoReleaseClosureINS_19PMergeFilterRequestENS_17DummyBrpcCallbackINS_20PMergeFilterResponseEEEE3RunEv
Line
Count
Source
97
4.97k
    void Run() override {
98
4.97k
        Defer defer {[&]() { delete this; }};
99
        // If lock failed, it means the callback object is deconstructed, then no need
100
        // to deal with the callback any more.
101
4.97k
        if (auto tmp = callback_.lock()) {
102
5
            tmp->call();
103
5
        }
104
4.97k
        if (cntl_->Failed()) {
105
0
            _process_if_rpc_failed();
106
4.97k
        } else {
107
4.97k
            _process_status<ResponseType>(response_.get());
108
4.97k
        }
109
4.97k
    }
_ZN5doris18AutoReleaseClosureINS_24PTabletWriterOpenRequestENS_17DummyBrpcCallbackINS_23PTabletWriterOpenResultEEEE3RunEv
Line
Count
Source
97
68.6k
    void Run() override {
98
68.6k
        Defer defer {[&]() { delete this; }};
99
        // If lock failed, it means the callback object is deconstructed, then no need
100
        // to deal with the callback any more.
101
68.7k
        if (auto tmp = callback_.lock()) {
102
68.7k
            tmp->call();
103
68.7k
        }
104
68.6k
        if (cntl_->Failed()) {
105
0
            _process_if_rpc_failed();
106
68.6k
        } else {
107
68.6k
            _process_status<ResponseType>(response_.get());
108
68.6k
        }
109
68.6k
    }
_ZN5doris18AutoReleaseClosureINS_28PTabletWriterAddBlockRequestENS_18WriteBlockCallbackINS_27PTabletWriterAddBlockResultEEEE3RunEv
Line
Count
Source
97
70.8k
    void Run() override {
98
70.8k
        Defer defer {[&]() { delete this; }};
99
        // If lock failed, it means the callback object is deconstructed, then no need
100
        // to deal with the callback any more.
101
70.8k
        if (auto tmp = callback_.lock()) {
102
70.7k
            tmp->call();
103
70.7k
        }
104
70.8k
        if (cntl_->Failed()) {
105
0
            _process_if_rpc_failed();
106
70.8k
        } else {
107
70.8k
            _process_status<ResponseType>(response_.get());
108
70.8k
        }
109
70.8k
    }
_ZN5doris18AutoReleaseClosureINS_26PTabletWriterCancelRequestENS_17DummyBrpcCallbackINS_25PTabletWriterCancelResultEEEE3RunEv
Line
Count
Source
97
184
    void Run() override {
98
184
        Defer defer {[&]() { delete this; }};
99
        // If lock failed, it means the callback object is deconstructed, then no need
100
        // to deal with the callback any more.
101
184
        if (auto tmp = callback_.lock()) {
102
0
            tmp->call();
103
0
        }
104
184
        if (cntl_->Failed()) {
105
0
            _process_if_rpc_failed();
106
184
        } else {
107
184
            _process_status<ResponseType>(response_.get());
108
184
        }
109
184
    }
_ZN5doris18AutoReleaseClosureINS_19PTransmitDataParamsENS_20ExchangeSendCallbackINS_19PTransmitDataResultEEEE3RunEv
Line
Count
Source
97
3.93M
    void Run() override {
98
3.93M
        Defer defer {[&]() { delete this; }};
99
        // If lock failed, it means the callback object is deconstructed, then no need
100
        // to deal with the callback any more.
101
3.95M
        if (auto tmp = callback_.lock()) {
102
3.95M
            tmp->call();
103
3.95M
        }
104
3.93M
        if (cntl_->Failed()) {
105
1
            _process_if_rpc_failed();
106
3.93M
        } else {
107
3.93M
            _process_status<ResponseType>(response_.get());
108
3.93M
        }
109
3.93M
    }
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_30PFetchArrowFlightSchemaRequestENS_17DummyBrpcCallbackINS_29PFetchArrowFlightSchemaResultEEEE3RunEv
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PFetchArrowDataRequestENS_17DummyBrpcCallbackINS_21PFetchArrowDataResultEEEE3RunEv
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_28PTabletWriteSlaveDoneRequestENS_17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEEEE3RunEv
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_24PTabletWriteSlaveRequestENS_17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEEEE3RunEv
110
111
    // controller has to be the same lifecycle with the closure, because brpc may use
112
    // it in any stage of the rpc.
113
    std::shared_ptr<brpc::Controller> cntl_;
114
    // We do not know if brpc will use request or response after brpc method returns.
115
    // So that we need keep a shared ptr here to ensure that brpc could use req/rep
116
    // at any stage.
117
    std::shared_ptr<Request> request_;
118
    std::shared_ptr<ResponseType> response_;
119
    std::string error_msg_;
120
121
protected:
122
1
    virtual void _process_if_rpc_failed() {
123
1
        std::string error_msg =
124
1
                fmt::format("RPC meet failed: {} {}", cntl_->ErrorText(), error_msg_);
125
1
        if (auto ctx = context_.lock(); ctx) {
126
0
            ctx->cancel(Status::NetworkError(error_msg));
127
1
        } else {
128
1
            LOG(WARNING) << error_msg;
129
1
        }
130
1
    }
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PSyncFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEEEE22_process_if_rpc_failedEv
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_23PPublishFilterRequestV2ENS_17DummyBrpcCallbackINS_22PPublishFilterResponseEEEE22_process_if_rpc_failedEv
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PSendFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSendFilterSizeResponseEEEE22_process_if_rpc_failedEv
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_19PMergeFilterRequestENS_17DummyBrpcCallbackINS_20PMergeFilterResponseEEEE22_process_if_rpc_failedEv
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_24PTabletWriterOpenRequestENS_17DummyBrpcCallbackINS_23PTabletWriterOpenResultEEEE22_process_if_rpc_failedEv
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_28PTabletWriterAddBlockRequestENS_18WriteBlockCallbackINS_27PTabletWriterAddBlockResultEEEE22_process_if_rpc_failedEv
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_26PTabletWriterCancelRequestENS_17DummyBrpcCallbackINS_25PTabletWriterCancelResultEEEE22_process_if_rpc_failedEv
_ZN5doris18AutoReleaseClosureINS_19PTransmitDataParamsENS_20ExchangeSendCallbackINS_19PTransmitDataResultEEEE22_process_if_rpc_failedEv
Line
Count
Source
122
1
    virtual void _process_if_rpc_failed() {
123
1
        std::string error_msg =
124
1
                fmt::format("RPC meet failed: {} {}", cntl_->ErrorText(), error_msg_);
125
1
        if (auto ctx = context_.lock(); ctx) {
126
0
            ctx->cancel(Status::NetworkError(error_msg));
127
1
        } else {
128
            LOG(WARNING) << error_msg;
129
1
        }
130
1
    }
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_30PFetchArrowFlightSchemaRequestENS_17DummyBrpcCallbackINS_29PFetchArrowFlightSchemaResultEEEE22_process_if_rpc_failedEv
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PFetchArrowDataRequestENS_17DummyBrpcCallbackINS_21PFetchArrowDataResultEEEE22_process_if_rpc_failedEv
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_28PTabletWriteSlaveDoneRequestENS_17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEEEE22_process_if_rpc_failedEv
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_24PTabletWriteSlaveRequestENS_17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEEEE22_process_if_rpc_failedEv
131
132
16.6k
    virtual void _process_if_meet_error_status(const Status& status) {
133
16.6k
        if (status.is<ErrorCode::END_OF_FILE>()) {
134
            // no need to log END_OF_FILE, reduce the unlessful log
135
16.5k
            return;
136
16.5k
        }
137
67
        if (auto ctx = context_.lock(); ctx) {
138
0
            ctx->cancel(status);
139
67
        } else {
140
67
            LOG(WARNING) << "RPC meet error status: " << status;
141
67
        }
142
67
    }
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PSyncFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEEEE29_process_if_meet_error_statusERKNS_6StatusE
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_23PPublishFilterRequestV2ENS_17DummyBrpcCallbackINS_22PPublishFilterResponseEEEE29_process_if_meet_error_statusERKNS_6StatusE
_ZN5doris18AutoReleaseClosureINS_22PSendFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSendFilterSizeResponseEEEE29_process_if_meet_error_statusERKNS_6StatusE
Line
Count
Source
132
1
    virtual void _process_if_meet_error_status(const Status& status) {
133
1
        if (status.is<ErrorCode::END_OF_FILE>()) {
134
            // no need to log END_OF_FILE, reduce the unlessful log
135
1
            return;
136
1
        }
137
0
        if (auto ctx = context_.lock(); ctx) {
138
0
            ctx->cancel(status);
139
0
        } else {
140
            LOG(WARNING) << "RPC meet error status: " << status;
141
0
        }
142
0
    }
_ZN5doris18AutoReleaseClosureINS_19PMergeFilterRequestENS_17DummyBrpcCallbackINS_20PMergeFilterResponseEEEE29_process_if_meet_error_statusERKNS_6StatusE
Line
Count
Source
132
25
    virtual void _process_if_meet_error_status(const Status& status) {
133
25
        if (status.is<ErrorCode::END_OF_FILE>()) {
134
            // no need to log END_OF_FILE, reduce the unlessful log
135
25
            return;
136
25
        }
137
0
        if (auto ctx = context_.lock(); ctx) {
138
0
            ctx->cancel(status);
139
0
        } else {
140
            LOG(WARNING) << "RPC meet error status: " << status;
141
0
        }
142
0
    }
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_24PTabletWriterOpenRequestENS_17DummyBrpcCallbackINS_23PTabletWriterOpenResultEEEE29_process_if_meet_error_statusERKNS_6StatusE
_ZN5doris18AutoReleaseClosureINS_28PTabletWriterAddBlockRequestENS_18WriteBlockCallbackINS_27PTabletWriterAddBlockResultEEEE29_process_if_meet_error_statusERKNS_6StatusE
Line
Count
Source
132
48
    virtual void _process_if_meet_error_status(const Status& status) {
133
48
        if (status.is<ErrorCode::END_OF_FILE>()) {
134
            // no need to log END_OF_FILE, reduce the unlessful log
135
0
            return;
136
0
        }
137
48
        if (auto ctx = context_.lock(); ctx) {
138
0
            ctx->cancel(status);
139
48
        } else {
140
            LOG(WARNING) << "RPC meet error status: " << status;
141
48
        }
142
48
    }
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_26PTabletWriterCancelRequestENS_17DummyBrpcCallbackINS_25PTabletWriterCancelResultEEEE29_process_if_meet_error_statusERKNS_6StatusE
_ZN5doris18AutoReleaseClosureINS_19PTransmitDataParamsENS_20ExchangeSendCallbackINS_19PTransmitDataResultEEEE29_process_if_meet_error_statusERKNS_6StatusE
Line
Count
Source
132
16.5k
    virtual void _process_if_meet_error_status(const Status& status) {
133
16.5k
        if (status.is<ErrorCode::END_OF_FILE>()) {
134
            // no need to log END_OF_FILE, reduce the unlessful log
135
16.5k
            return;
136
16.5k
        }
137
19
        if (auto ctx = context_.lock(); ctx) {
138
0
            ctx->cancel(status);
139
19
        } else {
140
            LOG(WARNING) << "RPC meet error status: " << status;
141
19
        }
142
19
    }
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_30PFetchArrowFlightSchemaRequestENS_17DummyBrpcCallbackINS_29PFetchArrowFlightSchemaResultEEEE29_process_if_meet_error_statusERKNS_6StatusE
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PFetchArrowDataRequestENS_17DummyBrpcCallbackINS_21PFetchArrowDataResultEEEE29_process_if_meet_error_statusERKNS_6StatusE
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_28PTabletWriteSlaveDoneRequestENS_17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEEEE29_process_if_meet_error_statusERKNS_6StatusE
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_24PTabletWriteSlaveRequestENS_17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEEEE29_process_if_meet_error_statusERKNS_6StatusE
143
144
private:
145
    template <typename Response>
146
184
    void _process_status(Response* response) {}
147
148
    template <HasStatus Response>
149
4.05M
    void _process_status(Response* response) {
150
4.05M
        if (Status status = Status::create(response->status()); !status.ok()) {
151
16.6k
            _process_if_meet_error_status(status);
152
16.6k
        }
153
4.05M
    }
_ZN5doris18AutoReleaseClosureINS_22PSyncFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSyncFilterSizeResponseEEEE15_process_statusITkNS_9HasStatusES3_EEvPT_
Line
Count
Source
149
211
    void _process_status(Response* response) {
150
211
        if (Status status = Status::create(response->status()); !status.ok()) {
151
0
            _process_if_meet_error_status(status);
152
0
        }
153
211
    }
_ZN5doris18AutoReleaseClosureINS_23PPublishFilterRequestV2ENS_17DummyBrpcCallbackINS_22PPublishFilterResponseEEEE15_process_statusITkNS_9HasStatusES3_EEvPT_
Line
Count
Source
149
1.51k
    void _process_status(Response* response) {
150
1.51k
        if (Status status = Status::create(response->status()); !status.ok()) {
151
0
            _process_if_meet_error_status(status);
152
0
        }
153
1.51k
    }
_ZN5doris18AutoReleaseClosureINS_22PSendFilterSizeRequestENS_17DummyBrpcCallbackINS_23PSendFilterSizeResponseEEEE15_process_statusITkNS_9HasStatusES3_EEvPT_
Line
Count
Source
149
212
    void _process_status(Response* response) {
150
212
        if (Status status = Status::create(response->status()); !status.ok()) {
151
1
            _process_if_meet_error_status(status);
152
1
        }
153
212
    }
_ZN5doris18AutoReleaseClosureINS_19PMergeFilterRequestENS_17DummyBrpcCallbackINS_20PMergeFilterResponseEEEE15_process_statusITkNS_9HasStatusES3_EEvPT_
Line
Count
Source
149
4.96k
    void _process_status(Response* response) {
150
4.96k
        if (Status status = Status::create(response->status()); !status.ok()) {
151
24
            _process_if_meet_error_status(status);
152
24
        }
153
4.96k
    }
_ZN5doris18AutoReleaseClosureINS_24PTabletWriterOpenRequestENS_17DummyBrpcCallbackINS_23PTabletWriterOpenResultEEEE15_process_statusITkNS_9HasStatusES3_EEvPT_
Line
Count
Source
149
68.5k
    void _process_status(Response* response) {
150
68.5k
        if (Status status = Status::create(response->status()); !status.ok()) {
151
0
            _process_if_meet_error_status(status);
152
0
        }
153
68.5k
    }
_ZN5doris18AutoReleaseClosureINS_28PTabletWriterAddBlockRequestENS_18WriteBlockCallbackINS_27PTabletWriterAddBlockResultEEEE15_process_statusITkNS_9HasStatusES3_EEvPT_
Line
Count
Source
149
70.6k
    void _process_status(Response* response) {
150
70.6k
        if (Status status = Status::create(response->status()); !status.ok()) {
151
48
            _process_if_meet_error_status(status);
152
48
        }
153
70.6k
    }
_ZN5doris18AutoReleaseClosureINS_19PTransmitDataParamsENS_20ExchangeSendCallbackINS_19PTransmitDataResultEEEE15_process_statusITkNS_9HasStatusES3_EEvPT_
Line
Count
Source
149
3.90M
    void _process_status(Response* response) {
150
3.90M
        if (Status status = Status::create(response->status()); !status.ok()) {
151
16.6k
            _process_if_meet_error_status(status);
152
16.6k
        }
153
3.90M
    }
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_30PFetchArrowFlightSchemaRequestENS_17DummyBrpcCallbackINS_29PFetchArrowFlightSchemaResultEEEE15_process_statusITkNS_9HasStatusES3_EEvPT_
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_22PFetchArrowDataRequestENS_17DummyBrpcCallbackINS_21PFetchArrowDataResultEEEE15_process_statusITkNS_9HasStatusES3_EEvPT_
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_28PTabletWriteSlaveDoneRequestENS_17DummyBrpcCallbackINS_27PTabletWriteSlaveDoneResultEEEE15_process_statusITkNS_9HasStatusES3_EEvPT_
Unexecuted instantiation: _ZN5doris18AutoReleaseClosureINS_24PTabletWriteSlaveRequestENS_17DummyBrpcCallbackINS_23PTabletWriteSlaveResultEEEE15_process_statusITkNS_9HasStatusES3_EEvPT_
154
    // Use a weak ptr to keep the callback, so that the callback can be deleted if the main
155
    // thread is freed.
156
    Weak callback_;
157
    std::weak_ptr<QueryContext> context_;
158
};
159
160
} // namespace doris