Coverage Report

Created: 2026-05-14 23:40

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/proto_util.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <brpc/http_method.h>
21
#include <gen_cpp/internal_service.pb.h>
22
23
#include "common/config.h"
24
#include "common/status.h"
25
#include "runtime/exec_env.h"
26
#include "runtime/runtime_state.h"
27
#include "util/brpc_client_cache.h"
28
#include "util/defer_op.h"
29
#include "util/network_util.h"
30
31
namespace doris {
32
33
// When the tuple/block data is greater than 2G, embed the tuple/block data
34
// and the request serialization string in the attachment, and use "http" brpc.
35
// "http"brpc requires that only one of request and attachment be non-null.
36
//
37
// 2G: In the default "baidu_std" brpcd, upper limit of the request and attachment length is 2G.
38
constexpr size_t MIN_HTTP_BRPC_SIZE = (1ULL << 31);
39
40
// Embed column_values and brpc request serialization string in controller attachment.
41
template <typename Params, typename Closure>
42
Status request_embed_attachment_contain_blockv2(Params* brpc_request,
43
                                                std::unique_ptr<Closure>& closure,
44
2
                                                bool restore_column_values = false) {
45
2
    auto* block = brpc_request->mutable_block();
46
2
    if (restore_column_values) {
47
        // Some callers borrow block storage from a shared owner. Temporarily detach the large
48
        // column_values field so the serialized request stays small, then restore it before
49
        // returning so the real owner can still be reused by later sends.
50
1
        auto* column_values = block->release_column_values();
51
1
        DORIS_CHECK(column_values != nullptr);
52
53
1
        Defer restore(
54
1
                [block, column_values] { block->set_allocated_column_values(column_values); });
proto_util_test.cpp:_ZZN5doris40request_embed_attachment_contain_blockv2INS_19PTransmitDataParamsENS_12_GLOBAL__N_120ProtoUtilTestClosureEEENS_6StatusEPT_RSt10unique_ptrIT0_St14default_deleteIS8_EEbENKUlvE_clEv
Line
Count
Source
54
1
                [block, column_values] { block->set_allocated_column_values(column_values); });
Unexecuted instantiation: _ZZN5doris40request_embed_attachment_contain_blockv2INS_19PTransmitDataParamsENS_18AutoReleaseClosureIS1_NS_20ExchangeSendCallbackINS_19PTransmitDataResultEEEEEEENS_6StatusEPT_RSt10unique_ptrIT0_St14default_deleteISB_EEbENKUlvE_clEv
Unexecuted instantiation: _ZZN5doris40request_embed_attachment_contain_blockv2INS_28PTabletWriterAddBlockRequestENS_18AutoReleaseClosureIS1_NS_18WriteBlockCallbackINS_27PTabletWriterAddBlockResultEEEEEEENS_6StatusEPT_RSt10unique_ptrIT0_St14default_deleteISB_EEbENKUlvE_clEv
55
56
1
        return request_embed_attachmentv2(brpc_request, *column_values, closure);
57
1
    }
58
59
1
    std::string column_values = std::move(*block->mutable_column_values());
60
1
    block->mutable_column_values()->clear();
61
1
    return request_embed_attachmentv2(brpc_request, column_values, closure);
62
2
}
proto_util_test.cpp:_ZN5doris40request_embed_attachment_contain_blockv2INS_19PTransmitDataParamsENS_12_GLOBAL__N_120ProtoUtilTestClosureEEENS_6StatusEPT_RSt10unique_ptrIT0_St14default_deleteIS8_EEb
Line
Count
Source
44
2
                                                bool restore_column_values = false) {
45
2
    auto* block = brpc_request->mutable_block();
46
2
    if (restore_column_values) {
47
        // Some callers borrow block storage from a shared owner. Temporarily detach the large
48
        // column_values field so the serialized request stays small, then restore it before
49
        // returning so the real owner can still be reused by later sends.
50
1
        auto* column_values = block->release_column_values();
51
1
        DORIS_CHECK(column_values != nullptr);
52
53
1
        Defer restore(
54
1
                [block, column_values] { block->set_allocated_column_values(column_values); });
55
56
1
        return request_embed_attachmentv2(brpc_request, *column_values, closure);
57
1
    }
58
59
1
    std::string column_values = std::move(*block->mutable_column_values());
60
1
    block->mutable_column_values()->clear();
61
1
    return request_embed_attachmentv2(brpc_request, column_values, closure);
62
2
}
Unexecuted instantiation: _ZN5doris40request_embed_attachment_contain_blockv2INS_19PTransmitDataParamsENS_18AutoReleaseClosureIS1_NS_20ExchangeSendCallbackINS_19PTransmitDataResultEEEEEEENS_6StatusEPT_RSt10unique_ptrIT0_St14default_deleteISB_EEb
Unexecuted instantiation: _ZN5doris40request_embed_attachment_contain_blockv2INS_28PTabletWriterAddBlockRequestENS_18AutoReleaseClosureIS1_NS_18WriteBlockCallbackINS_27PTabletWriterAddBlockResultEEEEEEENS_6StatusEPT_RSt10unique_ptrIT0_St14default_deleteISB_EEb
63
64
28
inline bool enable_http_send_block(const PTransmitDataParams& request) {
65
28
    if (!config::transfer_large_data_by_brpc) {
66
0
        return false;
67
0
    }
68
28
    if (!request.has_block() || !request.block().has_column_values()) {
69
28
        return false;
70
28
    }
71
0
    if (request.ByteSizeLong() < MIN_HTTP_BRPC_SIZE) {
72
0
        return false;
73
0
    }
74
0
    return true;
75
0
}
76
77
template <typename Closure>
78
void transmit_blockv2(PBackendService_Stub* stub, std::unique_ptr<Closure> closure) {
79
    closure->cntl_->http_request().Clear();
80
    stub->transmit_block(closure->cntl_.get(), closure->request_.get(), closure->response_.get(),
81
                         closure.get());
82
    closure.release();
83
}
84
85
template <typename Closure>
86
Status transmit_block_httpv2(ExecEnv* exec_env, std::unique_ptr<Closure> closure,
87
0
                             TNetworkAddress brpc_dest_addr, bool restore_column_values = false) {
88
0
    RETURN_IF_ERROR(request_embed_attachment_contain_blockv2(closure->request_.get(), closure,
89
0
                                                             restore_column_values));
90
91
0
    std::string host = brpc_dest_addr.hostname;
92
0
    auto dns_cache = ExecEnv::GetInstance()->dns_cache();
93
0
    if (dns_cache == nullptr) {
94
0
        LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
95
0
    } else if (!is_valid_ip(brpc_dest_addr.hostname)) {
96
0
        Status status = dns_cache->get(brpc_dest_addr.hostname, &host);
97
0
        if (!status.ok()) {
98
0
            LOG(WARNING) << "failed to get ip from host " << brpc_dest_addr.hostname << ": "
99
0
                         << status.to_string();
100
0
            return Status::InternalError("failed to get ip from host {}", brpc_dest_addr.hostname);
101
0
        }
102
0
    }
103
    //format an ipv6 address
104
0
    std::string brpc_url = get_brpc_http_url(brpc_dest_addr.hostname, brpc_dest_addr.port);
105
106
0
    std::shared_ptr<PBackendService_Stub> brpc_http_stub =
107
0
            exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url, "http");
108
0
    if (brpc_http_stub == nullptr) {
109
0
        return Status::InternalError("failed to open brpc http client to {}", brpc_url);
110
0
    }
111
0
    closure->cntl_->http_request().uri() =
112
0
            brpc_url + "/PInternalServiceImpl/transmit_block_by_http";
113
0
    closure->cntl_->http_request().set_method(brpc::HTTP_METHOD_POST);
114
0
    closure->cntl_->http_request().set_content_type("application/json");
115
0
    brpc_http_stub->transmit_block_by_http(closure->cntl_.get(), nullptr, closure->response_.get(),
116
0
                                           closure.get());
117
0
    closure.release();
118
119
0
    return Status::OK();
120
0
}
121
122
template <typename Params, typename Closure>
123
Status request_embed_attachmentv2(Params* brpc_request, const std::string& data,
124
2
                                  std::unique_ptr<Closure>& closure) {
125
2
    butil::IOBuf attachment;
126
127
    // step1: serialize brpc_request to string, and append to attachment.
128
2
    std::string req_str;
129
2
    if (!brpc_request->SerializeToString(&req_str)) {
130
0
        return Status::InternalError("failed to serialize the request");
131
0
    }
132
2
    int64_t req_str_size = req_str.size();
133
2
    attachment.append(&req_str_size, sizeof(req_str_size));
134
2
    attachment.append(req_str);
135
136
    // step2: append data to attachment and put it in the closure.
137
2
    int64_t data_size = data.size();
138
2
    attachment.append(&data_size, sizeof(data_size));
139
2
    try {
140
2
        attachment.append(data);
141
2
    } catch (...) {
142
0
        LOG(WARNING) << "Try to alloc " << data_size
143
0
                     << " bytes for append data to attachment failed. ";
144
0
        return Status::MemoryAllocFailed("request embed attachment failed to memcpy {} bytes",
145
0
                                         data_size);
146
0
    }
147
    // step3: attachment add to closure.
148
2
    closure->cntl_->request_attachment().swap(attachment);
149
2
    return Status::OK();
150
2
}
proto_util_test.cpp:_ZN5doris26request_embed_attachmentv2INS_19PTransmitDataParamsENS_12_GLOBAL__N_120ProtoUtilTestClosureEEENS_6StatusEPT_RKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERSt10unique_ptrIT0_St14default_deleteISG_EE
Line
Count
Source
124
2
                                  std::unique_ptr<Closure>& closure) {
125
2
    butil::IOBuf attachment;
126
127
    // step1: serialize brpc_request to string, and append to attachment.
128
2
    std::string req_str;
129
2
    if (!brpc_request->SerializeToString(&req_str)) {
130
0
        return Status::InternalError("failed to serialize the request");
131
0
    }
132
2
    int64_t req_str_size = req_str.size();
133
2
    attachment.append(&req_str_size, sizeof(req_str_size));
134
2
    attachment.append(req_str);
135
136
    // step2: append data to attachment and put it in the closure.
137
2
    int64_t data_size = data.size();
138
2
    attachment.append(&data_size, sizeof(data_size));
139
2
    try {
140
2
        attachment.append(data);
141
2
    } catch (...) {
142
0
        LOG(WARNING) << "Try to alloc " << data_size
143
0
                     << " bytes for append data to attachment failed. ";
144
0
        return Status::MemoryAllocFailed("request embed attachment failed to memcpy {} bytes",
145
0
                                         data_size);
146
0
    }
147
    // step3: attachment add to closure.
148
2
    closure->cntl_->request_attachment().swap(attachment);
149
2
    return Status::OK();
150
2
}
Unexecuted instantiation: _ZN5doris26request_embed_attachmentv2INS_19PTransmitDataParamsENS_18AutoReleaseClosureIS1_NS_20ExchangeSendCallbackINS_19PTransmitDataResultEEEEEEENS_6StatusEPT_RKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERSt10unique_ptrIT0_St14default_deleteISJ_EE
Unexecuted instantiation: _ZN5doris26request_embed_attachmentv2INS_28PTabletWriterAddBlockRequestENS_18AutoReleaseClosureIS1_NS_18WriteBlockCallbackINS_27PTabletWriterAddBlockResultEEEEEEENS_6StatusEPT_RKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERSt10unique_ptrIT0_St14default_deleteISJ_EE
151
152
// Extract the brpc request and block from the controller attachment,
153
// and put the block into the request.
154
template <typename Params>
155
2
Status attachment_extract_request_contain_block(Params* brpc_request, brpc::Controller* cntl) {
156
2
    auto block = brpc_request->mutable_block();
157
2
    return attachment_extract_request(brpc_request, cntl, block->mutable_column_values());
158
2
}
_ZN5doris40attachment_extract_request_contain_blockINS_19PTransmitDataParamsEEENS_6StatusEPT_PN4brpc10ControllerE
Line
Count
Source
155
2
Status attachment_extract_request_contain_block(Params* brpc_request, brpc::Controller* cntl) {
156
2
    auto block = brpc_request->mutable_block();
157
2
    return attachment_extract_request(brpc_request, cntl, block->mutable_column_values());
158
2
}
Unexecuted instantiation: _ZN5doris40attachment_extract_request_contain_blockINS_28PTabletWriterAddBlockRequestEEENS_6StatusEPT_PN4brpc10ControllerE
159
160
template <typename Params>
161
2
Status attachment_extract_request(Params* brpc_request, brpc::Controller* cntl, std::string* data) {
162
2
    const butil::IOBuf& io_buf = cntl->request_attachment();
163
164
    // step1: deserialize request string to brpc_request from attachment.
165
2
    int64_t req_str_size;
166
2
    io_buf.copy_to(&req_str_size, sizeof(req_str_size), 0);
167
2
    std::string req_str;
168
2
    io_buf.copy_to(&req_str, req_str_size, sizeof(req_str_size));
169
2
    brpc_request->ParseFromString(req_str);
170
171
    // step2: extract data from attachment.
172
2
    int64_t data_size;
173
2
    io_buf.copy_to(&data_size, sizeof(data_size), sizeof(req_str_size) + req_str_size);
174
2
    try {
175
2
        io_buf.copy_to(data, data_size, sizeof(data_size) + sizeof(req_str_size) + req_str_size);
176
2
    } catch (...) {
177
0
        LOG(WARNING) << "Try to alloc " << data_size
178
0
                     << " bytes for extract data from attachment failed. ";
179
0
        return Status::MemoryAllocFailed("attachment extract request failed to memcpy {} bytes",
180
0
                                         data_size);
181
0
    }
182
2
    return Status::OK();
183
2
}
_ZN5doris26attachment_extract_requestINS_19PTransmitDataParamsEEENS_6StatusEPT_PN4brpc10ControllerEPNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
Line
Count
Source
161
2
Status attachment_extract_request(Params* brpc_request, brpc::Controller* cntl, std::string* data) {
162
2
    const butil::IOBuf& io_buf = cntl->request_attachment();
163
164
    // step1: deserialize request string to brpc_request from attachment.
165
2
    int64_t req_str_size;
166
2
    io_buf.copy_to(&req_str_size, sizeof(req_str_size), 0);
167
2
    std::string req_str;
168
2
    io_buf.copy_to(&req_str, req_str_size, sizeof(req_str_size));
169
2
    brpc_request->ParseFromString(req_str);
170
171
    // step2: extract data from attachment.
172
2
    int64_t data_size;
173
2
    io_buf.copy_to(&data_size, sizeof(data_size), sizeof(req_str_size) + req_str_size);
174
2
    try {
175
2
        io_buf.copy_to(data, data_size, sizeof(data_size) + sizeof(req_str_size) + req_str_size);
176
2
    } catch (...) {
177
0
        LOG(WARNING) << "Try to alloc " << data_size
178
0
                     << " bytes for extract data from attachment failed. ";
179
0
        return Status::MemoryAllocFailed("attachment extract request failed to memcpy {} bytes",
180
0
                                         data_size);
181
0
    }
182
2
    return Status::OK();
183
2
}
Unexecuted instantiation: _ZN5doris26attachment_extract_requestINS_28PTabletWriterAddBlockRequestEEENS_6StatusEPT_PN4brpc10ControllerEPNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
184
185
} // namespace doris