/root/doris/be/src/util/thrift_util.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <gen_cpp/DataSinks_types.h> |
21 | | #include <gen_cpp/Types_types.h> |
22 | | #include <thrift/TApplicationException.h> |
23 | | #include <thrift/transport/TBufferTransports.h> |
24 | | |
25 | | #include <cstdint> |
26 | | #include <cstring> |
27 | | #include <exception> |
28 | | #include <memory> |
29 | | #include <string> |
30 | | #include <vector> |
31 | | |
32 | | #include "common/status.h" |
33 | | |
34 | | namespace apache::thrift::protocol { |
35 | | class TProtocol; |
36 | | class TProtocolFactory; |
37 | | } // namespace apache::thrift::protocol |
38 | | |
39 | | namespace doris { |
40 | | |
41 | | class TNetworkAddress; |
42 | | class ThriftServer; |
43 | | |
44 | | // Utility class to serialize thrift objects to a binary format. This object |
45 | | // should be reused if possible to reuse the underlying memory. |
46 | | // Note: thrift will encode NULLs into the serialized buffer so it is not valid |
47 | | // to treat it as a string. |
48 | | class ThriftSerializer { |
49 | | public: |
50 | | // If compact, the objects will be serialized using the Compact Protocol. Otherwise, |
51 | | // we'll use the binary protocol. |
52 | | // Note: the deserializer must be matching. |
53 | | ThriftSerializer(bool compact, int initial_buffer_size); |
54 | | |
55 | | // Serializes obj into result. Result will contain a copy of the memory. |
56 | | template <class T> |
57 | | Status serialize(T* obj, std::vector<uint8_t>* result) { |
58 | | uint32_t len = 0; |
59 | | uint8_t* buffer = nullptr; |
60 | | RETURN_IF_ERROR(serialize<T>(obj, &len, &buffer)); |
61 | | result->resize(len); |
62 | | memcpy(result->data(), buffer, len); |
63 | | return Status::OK(); |
64 | | } |
65 | | |
66 | | // serialize obj into a memory buffer. The result is returned in buffer/len. The |
67 | | // memory returned is owned by this object and will be invalid when another object |
68 | | // is serialized. |
69 | | template <class T> |
70 | 0 | Status serialize(T* obj, uint32_t* len, uint8_t** buffer) { |
71 | 0 | try { |
72 | 0 | _mem_buffer->resetBuffer(); |
73 | 0 | obj->write(_protocol.get()); |
74 | 0 | } catch (std::exception& e) { |
75 | 0 | return Status::InternalError("Couldn't serialize thrift object:\n{}", e.what()); |
76 | 0 | } |
77 | | |
78 | 0 | _mem_buffer->getBuffer(buffer, len); |
79 | 0 | return Status::OK(); |
80 | 0 | } Unexecuted instantiation: _ZN5doris16ThriftSerializer9serializeINS_12TResultBatchEEENS_6StatusEPT_PjPPh Unexecuted instantiation: _ZN5doris16ThriftSerializer9serializeINS_19TRuntimeProfileTreeEEENS_6StatusEPT_PjPPh Unexecuted instantiation: _ZN5doris16ThriftSerializer9serializeINS_23TJdbcExecutorCtorParamsEEENS_6StatusEPT_PjPPh Unexecuted instantiation: _ZN5doris16ThriftSerializer9serializeINS_26TJavaUdfExecutorCtorParamsEEENS_6StatusEPT_PjPPh |
81 | | |
82 | | template <class T> |
83 | | Status serialize(T* obj, std::string* result) { |
84 | | try { |
85 | | _mem_buffer->resetBuffer(); |
86 | | obj->write(_protocol.get()); |
87 | | } catch (apache::thrift::TApplicationException& e) { |
88 | | return Status::InternalError("Couldn't serialize thrift object:\n{}", e.what()); |
89 | | } |
90 | | |
91 | | *result = _mem_buffer->getBufferAsString(); |
92 | | return Status::OK(); |
93 | | } |
94 | | |
95 | | template <class T> |
96 | | Status serialize(T* obj) { |
97 | | try { |
98 | | _mem_buffer->resetBuffer(); |
99 | | obj->write(_protocol.get()); |
100 | | } catch (apache::thrift::TApplicationException& e) { |
101 | | return Status::InternalError("Couldn't serialize thrift object:\n{}", e.what()); |
102 | | } |
103 | | |
104 | | return Status::OK(); |
105 | | } |
106 | | |
107 | 0 | void get_buffer(uint8_t** buffer, uint32_t* length) { _mem_buffer->getBuffer(buffer, length); } |
108 | | |
109 | | private: |
110 | | std::shared_ptr<apache::thrift::transport::TMemoryBuffer> _mem_buffer; |
111 | | std::shared_ptr<apache::thrift::protocol::TProtocol> _protocol; |
112 | | }; |
113 | | |
114 | | class ThriftDeserializer { |
115 | | public: |
116 | | ThriftDeserializer(bool compact); |
117 | | |
118 | | private: |
119 | | std::shared_ptr<apache::thrift::protocol::TProtocolFactory> _factory; |
120 | | std::shared_ptr<apache::thrift::protocol::TProtocol> _tproto; |
121 | | }; |
122 | | |
123 | | // Utility to create a protocol (deserialization) object for 'mem'. |
124 | | std::shared_ptr<apache::thrift::protocol::TProtocol> create_deserialize_protocol( |
125 | | std::shared_ptr<apache::thrift::transport::TMemoryBuffer> mem, bool compact); |
126 | | |
127 | | // Deserialize a thrift message from buf/len. buf/len must at least contain |
128 | | // all the bytes needed to store the thrift message. On return, len will be |
129 | | // set to the actual length of the header. |
130 | | template <class T> |
131 | | Status deserialize_thrift_msg(const uint8_t* buf, uint32_t* len, bool compact, |
132 | 59 | T* deserialized_msg) { |
133 | | // Deserialize msg bytes into c++ thrift msg using memory |
134 | | // transport. TMemoryBuffer is not const-safe, although we use it in |
135 | | // a const-safe way, so we have to explicitly cast away the const. |
136 | 59 | auto conf = std::make_shared<apache::thrift::TConfiguration>(); |
137 | | // On Thrift 0.14.0+, need use TConfiguration to raise the max message size. |
138 | | // max message size is 100MB default, so make it unlimited. |
139 | 59 | conf->setMaxMessageSize(std::numeric_limits<int>::max()); |
140 | 59 | std::shared_ptr<apache::thrift::transport::TMemoryBuffer> tmem_transport( |
141 | 59 | new apache::thrift::transport::TMemoryBuffer( |
142 | 59 | const_cast<uint8_t*>(buf), *len, |
143 | 59 | apache::thrift::transport::TMemoryBuffer::OBSERVE, conf)); |
144 | 59 | std::shared_ptr<apache::thrift::protocol::TProtocol> tproto = |
145 | 59 | create_deserialize_protocol(tmem_transport, compact); |
146 | | |
147 | 59 | try { |
148 | 59 | deserialized_msg->read(tproto.get()); |
149 | 59 | } catch (std::exception& e) { |
150 | 0 | return Status::InternalError<false>("Couldn't deserialize thrift msg:\n{}", e.what()); |
151 | 0 | } catch (...) { |
152 | | // TODO: Find the right exception for 0 bytes |
153 | 0 | return Status::InternalError("Unknown exception"); |
154 | 0 | } |
155 | | |
156 | 59 | uint32_t bytes_left = tmem_transport->available_read(); |
157 | 59 | *len = *len - bytes_left; |
158 | 59 | return Status::OK(); |
159 | 59 | } _ZN5doris22deserialize_thrift_msgIN8tparquet12FileMetaDataEEENS_6StatusEPKhPjbPT_ Line | Count | Source | 132 | 5 | T* deserialized_msg) { | 133 | | // Deserialize msg bytes into c++ thrift msg using memory | 134 | | // transport. TMemoryBuffer is not const-safe, although we use it in | 135 | | // a const-safe way, so we have to explicitly cast away the const. | 136 | 5 | auto conf = std::make_shared<apache::thrift::TConfiguration>(); | 137 | | // On Thrift 0.14.0+, need use TConfiguration to raise the max message size. | 138 | | // max message size is 100MB default, so make it unlimited. | 139 | 5 | conf->setMaxMessageSize(std::numeric_limits<int>::max()); | 140 | 5 | std::shared_ptr<apache::thrift::transport::TMemoryBuffer> tmem_transport( | 141 | 5 | new apache::thrift::transport::TMemoryBuffer( | 142 | 5 | const_cast<uint8_t*>(buf), *len, | 143 | 5 | apache::thrift::transport::TMemoryBuffer::OBSERVE, conf)); | 144 | 5 | std::shared_ptr<apache::thrift::protocol::TProtocol> tproto = | 145 | 5 | create_deserialize_protocol(tmem_transport, compact); | 146 | | | 147 | 5 | try { | 148 | 5 | deserialized_msg->read(tproto.get()); | 149 | 5 | } catch (std::exception& e) { | 150 | 0 | return Status::InternalError<false>("Couldn't deserialize thrift msg:\n{}", e.what()); | 151 | 0 | } catch (...) { | 152 | | // TODO: Find the right exception for 0 bytes | 153 | 0 | return Status::InternalError("Unknown exception"); | 154 | 0 | } | 155 | | | 156 | 5 | uint32_t bytes_left = tmem_transport->available_read(); | 157 | 5 | *len = *len - bytes_left; | 158 | 5 | return Status::OK(); | 159 | 5 | } |
Unexecuted instantiation: _ZN5doris22deserialize_thrift_msgINS_23TExecPlanFragmentParamsEEENS_6StatusEPKhPjbPT_ Unexecuted instantiation: _ZN5doris22deserialize_thrift_msgINS_27TExecPlanFragmentParamsListEEENS_6StatusEPKhPjbPT_ Unexecuted instantiation: _ZN5doris22deserialize_thrift_msgINS_27TPipelineFragmentParamsListEEENS_6StatusEPKhPjbPT_ Unexecuted instantiation: _ZN5doris22deserialize_thrift_msgINS_15TResultFileSinkEEENS_6StatusEPKhPjbPT_ Unexecuted instantiation: _ZN5doris22deserialize_thrift_msgINS_14TFileScanRangeEEENS_6StatusEPKhPjbPT_ Unexecuted instantiation: _ZN5doris22deserialize_thrift_msgINS_16TTableDescriptorEEENS_6StatusEPKhPjbPT_ Unexecuted instantiation: _ZN5doris22deserialize_thrift_msgINS_19TFoldConstantParamsEEENS_6StatusEPKhPjbPT_ Unexecuted instantiation: _ZN5doris22deserialize_thrift_msgINS_16TDescriptorTableEEENS_6StatusEPKhPjbPT_ Unexecuted instantiation: _ZN5doris22deserialize_thrift_msgINS_9TExprListEEENS_6StatusEPKhPjbPT_ Unexecuted instantiation: _ZN5doris22deserialize_thrift_msgINS_13TQueryOptionsEEENS_6StatusEPKhPjbPT_ _ZN5doris22deserialize_thrift_msgIN8tparquet10PageHeaderEEENS_6StatusEPKhPjbPT_ Line | Count | Source | 132 | 54 | T* deserialized_msg) { | 133 | | // Deserialize msg bytes into c++ thrift msg using memory | 134 | | // transport. TMemoryBuffer is not const-safe, although we use it in | 135 | | // a const-safe way, so we have to explicitly cast away the const. | 136 | 54 | auto conf = std::make_shared<apache::thrift::TConfiguration>(); | 137 | | // On Thrift 0.14.0+, need use TConfiguration to raise the max message size. | 138 | | // max message size is 100MB default, so make it unlimited. | 139 | 54 | conf->setMaxMessageSize(std::numeric_limits<int>::max()); | 140 | 54 | std::shared_ptr<apache::thrift::transport::TMemoryBuffer> tmem_transport( | 141 | 54 | new apache::thrift::transport::TMemoryBuffer( | 142 | 54 | const_cast<uint8_t*>(buf), *len, | 143 | 54 | apache::thrift::transport::TMemoryBuffer::OBSERVE, conf)); | 144 | 54 | std::shared_ptr<apache::thrift::protocol::TProtocol> tproto = | 145 | 54 | create_deserialize_protocol(tmem_transport, compact); | 146 | | | 147 | 54 | try { | 148 | 54 | deserialized_msg->read(tproto.get()); | 149 | 54 | } catch (std::exception& e) { | 150 | 0 | return Status::InternalError<false>("Couldn't deserialize thrift msg:\n{}", e.what()); | 151 | 0 | } catch (...) { | 152 | | // TODO: Find the right exception for 0 bytes | 153 | 0 | return Status::InternalError("Unknown exception"); | 154 | 0 | } | 155 | | | 156 | 54 | uint32_t bytes_left = tmem_transport->available_read(); | 157 | 54 | *len = *len - bytes_left; | 158 | 54 | return Status::OK(); | 159 | 54 | } |
Unexecuted instantiation: _ZN5doris22deserialize_thrift_msgIN8tparquet11ColumnIndexEEENS_6StatusEPKhPjbPT_ Unexecuted instantiation: _ZN5doris22deserialize_thrift_msgIN8tparquet11OffsetIndexEEENS_6StatusEPKhPjbPT_ Unexecuted instantiation: _ZN5doris22deserialize_thrift_msgINS_19TRuntimeProfileTreeEEENS_6StatusEPKhPjbPT_ |
160 | | |
161 | | // Redirects all Thrift logging to VLOG_CRITICAL |
162 | | void init_thrift_logging(); |
163 | | |
164 | | // Wait for a server that is running locally to start accepting |
165 | | // connections, up to a maximum timeout |
166 | | Status wait_for_local_server(const ThriftServer& server, int num_retries, int retry_interval_ms); |
167 | | |
168 | | // Wait for a server to start accepting connections, up to a maximum timeout |
169 | | Status wait_for_server(const std::string& host, int port, int num_retries, int retry_interval_ms); |
170 | | |
171 | | // Utility method to print address as address:port |
172 | | void t_network_address_to_string(const TNetworkAddress& address, std::string* out); |
173 | | |
174 | | // Compares two TNetworkAddresses alphanumerically by their host:port |
175 | | // string representation |
176 | | bool t_network_address_comparator(const TNetworkAddress& a, const TNetworkAddress& b); |
177 | | |
178 | | PURE std::string to_string(const TUniqueId& id); |
179 | | |
180 | | PURE bool _has_inverted_index_v1_or_partial_update(TOlapTableSink sink); |
181 | | |
182 | | } // namespace doris |