be/src/util/mysql_row_buffer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "util/mysql_row_buffer.h" |
19 | | |
20 | | #include <assert.h> |
21 | | #include <fmt/compile.h> |
22 | | #include <fmt/format.h> |
23 | | #include <string.h> |
24 | | #include <sys/types.h> |
25 | | |
26 | | #include <algorithm> |
27 | | #include <new> |
28 | | #include <string> |
29 | | |
30 | | #include "common/logging.h" |
31 | | #include "core/value/decimalv2_value.h" |
32 | | #include "core/value/ipv4_value.h" |
33 | | #include "core/value/ipv6_value.h" |
34 | | #include "core/value/large_int_value.h" |
35 | | #include "core/value/timestamptz_value.h" |
36 | | #include "core/value/vdatetime_value.h" // IWYU pragma: keep |
37 | | #include "exprs/function/cast/cast_to_string.h" |
38 | | #include "storage/olap_common.h" |
39 | | #include "util/date_func.h" |
40 | | #include "util/mysql_global.h" |
41 | | |
42 | | namespace doris { |
43 | | |
44 | | static uint8_t NEXT_TWO_BYTE = 252; |
45 | | static uint8_t NEXT_THREE_BYTE = 253; |
46 | | static uint8_t NEXT_EIGHT_BYTE = 254; |
47 | | // the EXTRA_RESERVE_BYTE wanner to make sure _pos pointer is always in _buf memory |
48 | | // used in reserve() for allocate current buffer |
49 | | static size_t EXTRA_RESERVE_BYTE = 16; |
50 | | |
51 | | // the first byte: |
52 | | // <= 250: length |
53 | | // = 251: nullptr |
54 | | // = 252: the next two byte is length |
55 | | // = 253: the next three byte is length |
56 | | // = 254: the next eight byte is length |
57 | 678 | static char* pack_vlen(char* packet, uint64_t length) { |
58 | 678 | if (length < 251ULL) { |
59 | 662 | int1store(packet, length); |
60 | 662 | return packet + 1; |
61 | 662 | } |
62 | | |
63 | | /* 251 is reserved for nullptr */ |
64 | 16 | if (length < 65536ULL) { |
65 | 16 | *packet++ = NEXT_TWO_BYTE; |
66 | 16 | int2store(packet, length); |
67 | 16 | return packet + 2; |
68 | 16 | } |
69 | | |
70 | 0 | if (length < 16777216ULL) { |
71 | 0 | *packet++ = NEXT_THREE_BYTE; |
72 | 0 | int3store(packet, length); |
73 | 0 | return packet + 3; |
74 | 0 | } |
75 | | |
76 | 0 | *packet++ = NEXT_EIGHT_BYTE; |
77 | 0 | int8store(packet, length); |
78 | 0 | return packet + 8; |
79 | 0 | } |
80 | | |
81 | | MysqlRowBuffer::MysqlRowBuffer() |
82 | 216 | : _pos(_default_buf), _buf(_default_buf), _buf_size(sizeof(_default_buf)) {} |
83 | | |
84 | 512 | void MysqlRowBuffer::start_binary_row(uint64_t num_cols) { |
85 | 512 | auto bit_fields = (num_cols + 9) / 8; |
86 | 512 | reserve(bit_fields + 1); |
87 | 512 | memset(_pos, 0, 1 + bit_fields); |
88 | 512 | _pos += bit_fields + 1; |
89 | 512 | _field_pos = 0; |
90 | 512 | } |
91 | | |
92 | 216 | MysqlRowBuffer::~MysqlRowBuffer() { |
93 | 216 | if (_buf != _default_buf) { |
94 | 0 | delete[] _buf; |
95 | 0 | _buf = _default_buf; |
96 | 0 | } |
97 | 216 | } |
98 | | |
99 | 2.45k | int MysqlRowBuffer::reserve(int64_t size) { |
100 | 2.45k | DCHECK(size > 0); |
101 | | |
102 | 2.45k | int64_t need_size = size + (_pos - _buf); |
103 | | |
104 | 2.45k | if (need_size <= _buf_size) { |
105 | 2.45k | return 0; |
106 | 2.45k | } |
107 | | |
108 | 0 | int64_t alloc_size = std::max(need_size, _buf_size * 2) + EXTRA_RESERVE_BYTE; |
109 | 0 | char* new_buf = new char[alloc_size]; |
110 | |
|
111 | 0 | size_t offset = _pos - _buf; |
112 | 0 | memcpy(new_buf, _buf, offset); |
113 | |
|
114 | 0 | if (_buf != _default_buf) { |
115 | 0 | delete[] _buf; |
116 | 0 | } |
117 | |
|
118 | 0 | _pos = new_buf + offset; |
119 | 0 | _buf = new_buf; |
120 | 0 | _buf_size = alloc_size; |
121 | |
|
122 | 0 | return 0; |
123 | 2.45k | } |
124 | | |
125 | 0 | static char* add_decimal(const DecimalV2Value& data, int round_scale, char* pos) { |
126 | 0 | int length = data.to_buffer(pos + 1, round_scale); |
127 | 0 | int1store(pos++, length); |
128 | 0 | return pos + length; |
129 | 0 | } |
130 | | |
131 | 1.26k | int MysqlRowBuffer::append(const char* data, int64_t len) { |
132 | 1.26k | reserve(len); |
133 | 1.26k | memcpy(_pos, data, len); |
134 | 1.26k | _pos += len; |
135 | 1.26k | return 0; |
136 | 1.26k | } |
137 | | |
138 | 0 | int MysqlRowBuffer::append_var_string(const char* data, int64_t len) { |
139 | | /* |
140 | | The +9 comes from that strings of length longer than 16M require |
141 | | 9 bytes to be stored (see net_store_length). |
142 | | */ |
143 | 0 | reserve(len + 9); |
144 | 0 | _pos = pack_vlen(_pos, len); |
145 | 0 | memcpy(_pos, data, len); |
146 | 0 | _pos += len; |
147 | 0 | return 0; |
148 | 0 | } |
149 | | |
150 | 100 | int MysqlRowBuffer::push_tinyint(int8_t data) { |
151 | 100 | char buff[1]; |
152 | 100 | _field_pos++; |
153 | 100 | int1store(buff, data); |
154 | 100 | return append(buff, 1); |
155 | 100 | } |
156 | | |
157 | 32 | int MysqlRowBuffer::push_smallint(int16_t data) { |
158 | 32 | char buff[2]; |
159 | 32 | _field_pos++; |
160 | 32 | int2store(buff, data); |
161 | 32 | return append(buff, 2); |
162 | 32 | } |
163 | | |
164 | 367 | int MysqlRowBuffer::push_int(int32_t data) { |
165 | 367 | char buff[4]; |
166 | 367 | _field_pos++; |
167 | 367 | int4store(buff, data); |
168 | 367 | return append(buff, 4); |
169 | 367 | } |
170 | | |
171 | 50 | int MysqlRowBuffer::push_bigint(int64_t data) { |
172 | 50 | char buff[8]; |
173 | 50 | _field_pos++; |
174 | 50 | int8store(buff, data); |
175 | 50 | return append(buff, 8); |
176 | 50 | } |
177 | | |
178 | 0 | int MysqlRowBuffer::push_unsigned_bigint(uint64_t data) { |
179 | 0 | char buff[8]; |
180 | 0 | _field_pos++; |
181 | 0 | int8store(buff, data); |
182 | 0 | return append(buff, 8); |
183 | 0 | } |
184 | | |
185 | 0 | int MysqlRowBuffer::push_largeint(int128_t data) { |
186 | | // large int as type string |
187 | 0 | std::string value = LargeIntValue::to_string(data); |
188 | 0 | _field_pos++; |
189 | 0 | return append_var_string(value.data(), value.size()); |
190 | 0 | } |
191 | | |
192 | 99 | int MysqlRowBuffer::push_float(float data) { |
193 | 99 | char buff[4]; |
194 | 99 | _field_pos++; |
195 | 99 | float4store(buff, data); |
196 | 99 | return append(buff, 4); |
197 | 99 | } |
198 | | |
199 | 28 | int MysqlRowBuffer::push_double(double data) { |
200 | 28 | char buff[8]; |
201 | 28 | _field_pos++; |
202 | 28 | float8store(buff, data); |
203 | 28 | return append(buff, 8); |
204 | 28 | } |
205 | | |
206 | | // Refer to https://dev.mysql.com/doc/refman/5.7/en/time.html |
207 | | // Encode time into MySQL binary protocol format with support for scale (microsecond precision) |
208 | | // Time value is limited between '-838:59:59' and '838:59:59' |
209 | 11 | static int encode_binary_timev2(char* buff, double time, int scale) { |
210 | | // Check if scale is valid (0 to 6) |
211 | 11 | if (scale < 0 || scale > 6) { |
212 | 0 | return -1; // Return error for invalid scale |
213 | 0 | } |
214 | | |
215 | 11 | int pos = 0; // Current position in the buffer |
216 | 11 | bool is_negative = time < 0; // Determine if the time is negative |
217 | 11 | double abs_time = std::abs(time); // Convert time to absolute value |
218 | | |
219 | | // Maximum time in microseconds: 838 hours, 59 minutes, 59 seconds |
220 | 11 | const int64_t MAX_TIME_MICROSECONDS = (838 * 3600 + 59 * 60 + 59) * 1000000LL; |
221 | | |
222 | | // Convert time into microseconds and enforce range limit |
223 | 11 | auto total_microseconds = static_cast<int64_t>(abs_time); // Total microseconds |
224 | 11 | total_microseconds = std::min(total_microseconds, MAX_TIME_MICROSECONDS); |
225 | | |
226 | | // Adjust microseconds precision based on scale |
227 | 11 | total_microseconds /= static_cast<int64_t>(std::pow(10, 6 - scale)); // Scale adjustment |
228 | 11 | total_microseconds *= static_cast<int64_t>(std::pow(10, 6 - scale)); // Truncate extra precision |
229 | | |
230 | | // Extract days, hours, minutes, seconds, and microseconds |
231 | 11 | int64_t days = total_microseconds / (3600LL * 24 * 1000000); // Calculate days |
232 | 11 | total_microseconds %= (3600LL * 24 * 1000000); |
233 | | |
234 | 11 | int64_t hours = total_microseconds / (3600LL * 1000000); // Remaining hours |
235 | 11 | total_microseconds %= (3600LL * 1000000); |
236 | | |
237 | 11 | int64_t minutes = total_microseconds / (60LL * 1000000); // Remaining minutes |
238 | 11 | total_microseconds %= (60LL * 1000000); |
239 | | |
240 | 11 | int64_t seconds = total_microseconds / 1000000; // Remaining seconds |
241 | 11 | int64_t microseconds = total_microseconds % 1000000; // Remaining microseconds |
242 | | |
243 | | // MySQL binary protocol rules for time encoding |
244 | 11 | if (days == 0 && hours == 0 && minutes == 0 && seconds == 0 && microseconds == 0) { |
245 | 1 | buff[pos++] = 0; // All zero: length is 0 |
246 | 10 | } else if (microseconds == 0) { |
247 | 7 | buff[pos++] = 8; // No microseconds: length is 8 |
248 | 7 | buff[pos++] = is_negative ? 1 : 0; // Sign byte |
249 | 7 | int4store(buff + pos, static_cast<uint32_t>(days)); // Store days (4 bytes) |
250 | 7 | pos += 4; |
251 | 7 | buff[pos++] = static_cast<char>(hours); // Store hours (1 byte) |
252 | 7 | buff[pos++] = static_cast<char>(minutes); // Store minutes (1 byte) |
253 | 7 | buff[pos++] = static_cast<char>(seconds); // Store seconds (1 byte) |
254 | 7 | } else { |
255 | 3 | buff[pos++] = 12; // Include microseconds: length is 12 |
256 | 3 | buff[pos++] = is_negative ? 1 : 0; // Sign byte |
257 | 3 | int4store(buff + pos, static_cast<uint32_t>(days)); // Store days (4 bytes) |
258 | 3 | pos += 4; |
259 | 3 | buff[pos++] = static_cast<char>(hours); // Store hours (1 byte) |
260 | 3 | buff[pos++] = static_cast<char>(minutes); // Store minutes (1 byte) |
261 | 3 | buff[pos++] = static_cast<char>(seconds); // Store seconds (1 byte) |
262 | 3 | int4store(buff + pos, static_cast<uint32_t>(microseconds)); // Store microseconds (4 bytes) |
263 | 3 | pos += 4; |
264 | 3 | } |
265 | | |
266 | 11 | return pos; // Return total bytes written to buffer |
267 | 11 | } |
268 | | |
269 | 11 | int MysqlRowBuffer::push_timev2(double data, int scale) { |
270 | 11 | char buff[13]; |
271 | 11 | _field_pos++; |
272 | 11 | int length = encode_binary_timev2(buff, data, scale); |
273 | 11 | return append(buff, length); |
274 | 11 | } |
275 | | |
276 | | template <typename DateType> |
277 | 581 | int MysqlRowBuffer::push_vec_datetime(DateType& data, int scale) { |
278 | 581 | return push_datetime(data, scale); |
279 | 581 | } _ZN5doris14MysqlRowBuffer17push_vec_datetimeINS_11DateV2ValueINS_15DateV2ValueTypeEEEEEiRT_i Line | Count | Source | 277 | 216 | int MysqlRowBuffer::push_vec_datetime(DateType& data, int scale) { | 278 | 216 | return push_datetime(data, scale); | 279 | 216 | } |
_ZN5doris14MysqlRowBuffer17push_vec_datetimeINS_11DateV2ValueINS_19DateTimeV2ValueTypeEEEEEiRT_i Line | Count | Source | 277 | 365 | int MysqlRowBuffer::push_vec_datetime(DateType& data, int scale) { | 278 | 365 | return push_datetime(data, scale); | 279 | 365 | } |
Unexecuted instantiation: _ZN5doris14MysqlRowBuffer17push_vec_datetimeINS_16VecDateTimeValueEEEiRT_i |
280 | | |
281 | | template <typename DateType> |
282 | 581 | int MysqlRowBuffer::push_datetime(const DateType& data, int scale) { |
283 | 581 | char buff[12], *pos; |
284 | 581 | size_t length; |
285 | 581 | _field_pos++; |
286 | 581 | pos = buff + 1; |
287 | | |
288 | 581 | int2store(pos, data.year()); |
289 | 581 | pos[2] = (uchar)data.month(); |
290 | 581 | pos[3] = (uchar)data.day(); |
291 | 581 | pos[4] = (uchar)data.hour(); |
292 | 581 | pos[5] = (uchar)data.minute(); |
293 | 581 | pos[6] = (uchar)data.second(); |
294 | 581 | if (data.hour() || data.minute() || data.second()) { |
295 | 275 | length = 7; |
296 | 306 | } else if (data.year() || data.month() || data.day()) { |
297 | 306 | length = 4; |
298 | 306 | } else { |
299 | 0 | length = 0; |
300 | 0 | } |
301 | | if constexpr (std::is_same_v<DateType, DateV2Value<DateV2ValueType>> || |
302 | 581 | std::is_same_v<DateType, DateV2Value<DateTimeV2ValueType>>) { |
303 | 581 | if (scale > 0 || data.microsecond()) { |
304 | 194 | int4store(pos + 7, data.microsecond()); |
305 | 194 | length = 11; |
306 | 194 | } |
307 | 581 | } |
308 | | |
309 | 581 | buff[0] = (char)length; // Length is stored first |
310 | 581 | return append(buff, length + 1); |
311 | 581 | } _ZN5doris14MysqlRowBuffer13push_datetimeINS_11DateV2ValueINS_15DateV2ValueTypeEEEEEiRKT_i Line | Count | Source | 282 | 216 | int MysqlRowBuffer::push_datetime(const DateType& data, int scale) { | 283 | 216 | char buff[12], *pos; | 284 | 216 | size_t length; | 285 | 216 | _field_pos++; | 286 | 216 | pos = buff + 1; | 287 | | | 288 | 216 | int2store(pos, data.year()); | 289 | 216 | pos[2] = (uchar)data.month(); | 290 | 216 | pos[3] = (uchar)data.day(); | 291 | 216 | pos[4] = (uchar)data.hour(); | 292 | 216 | pos[5] = (uchar)data.minute(); | 293 | 216 | pos[6] = (uchar)data.second(); | 294 | 216 | if (data.hour() || data.minute() || data.second()) { | 295 | 0 | length = 7; | 296 | 216 | } else if (data.year() || data.month() || data.day()) { | 297 | 216 | length = 4; | 298 | 216 | } else { | 299 | 0 | length = 0; | 300 | 0 | } | 301 | | if constexpr (std::is_same_v<DateType, DateV2Value<DateV2ValueType>> || | 302 | 216 | std::is_same_v<DateType, DateV2Value<DateTimeV2ValueType>>) { | 303 | 216 | if (scale > 0 || data.microsecond()) { | 304 | 0 | int4store(pos + 7, data.microsecond()); | 305 | 0 | length = 11; | 306 | 0 | } | 307 | 216 | } | 308 | | | 309 | 216 | buff[0] = (char)length; // Length is stored first | 310 | 216 | return append(buff, length + 1); | 311 | 216 | } |
_ZN5doris14MysqlRowBuffer13push_datetimeINS_11DateV2ValueINS_19DateTimeV2ValueTypeEEEEEiRKT_i Line | Count | Source | 282 | 365 | int MysqlRowBuffer::push_datetime(const DateType& data, int scale) { | 283 | 365 | char buff[12], *pos; | 284 | 365 | size_t length; | 285 | 365 | _field_pos++; | 286 | 365 | pos = buff + 1; | 287 | | | 288 | 365 | int2store(pos, data.year()); | 289 | 365 | pos[2] = (uchar)data.month(); | 290 | 365 | pos[3] = (uchar)data.day(); | 291 | 365 | pos[4] = (uchar)data.hour(); | 292 | 365 | pos[5] = (uchar)data.minute(); | 293 | 365 | pos[6] = (uchar)data.second(); | 294 | 365 | if (data.hour() || data.minute() || data.second()) { | 295 | 275 | length = 7; | 296 | 275 | } else if (data.year() || data.month() || data.day()) { | 297 | 90 | length = 4; | 298 | 90 | } else { | 299 | 0 | length = 0; | 300 | 0 | } | 301 | | if constexpr (std::is_same_v<DateType, DateV2Value<DateV2ValueType>> || | 302 | 365 | std::is_same_v<DateType, DateV2Value<DateTimeV2ValueType>>) { | 303 | 365 | if (scale > 0 || data.microsecond()) { | 304 | 194 | int4store(pos + 7, data.microsecond()); | 305 | 194 | length = 11; | 306 | 194 | } | 307 | 365 | } | 308 | | | 309 | 365 | buff[0] = (char)length; // Length is stored first | 310 | 365 | return append(buff, length + 1); | 311 | 365 | } |
Unexecuted instantiation: _ZN5doris14MysqlRowBuffer13push_datetimeINS_16VecDateTimeValueEEEiRKT_i |
312 | | |
313 | 0 | int MysqlRowBuffer::push_decimal(const DecimalV2Value& data, int round_scale) { |
314 | 0 | ++_field_pos; |
315 | 0 | reserve(2 + MAX_DECIMAL_WIDTH); |
316 | 0 | _pos = add_decimal(data, round_scale, _pos); |
317 | 0 | return 0; |
318 | 0 | } |
319 | | |
320 | 0 | int MysqlRowBuffer::push_ipv4(const IPv4Value& ipv4_val) { |
321 | 0 | auto ipv4_str = ipv4_val.to_string(); |
322 | 0 | return push_string(ipv4_str.c_str(), ipv4_str.length()); |
323 | 0 | } |
324 | | |
325 | 0 | int MysqlRowBuffer::push_ipv6(const IPv6Value& ipv6_val) { |
326 | 0 | auto ipv6_str = ipv6_val.to_string(); |
327 | 0 | return push_string(ipv6_str.c_str(), ipv6_str.length()); |
328 | 0 | } |
329 | | |
330 | 678 | int MysqlRowBuffer::push_string(const char* str, int64_t length) { |
331 | 678 | ++_field_pos; |
332 | 678 | DCHECK(str != nullptr) << "input string is nullptr."; |
333 | 678 | reserve(9 + length); |
334 | 678 | _pos = pack_vlen(_pos, length); |
335 | 678 | memcpy(_pos, str, length); |
336 | 678 | _pos += length; |
337 | 678 | return 0; |
338 | 678 | } |
339 | | |
340 | | int MysqlRowBuffer::push_timestamptz(const TimestampTzValue& tz, |
341 | 49 | const cctz::time_zone& local_time_zone, int scale) { |
342 | 49 | auto tz_str = tz.to_string(local_time_zone, scale); |
343 | 49 | return push_string(tz_str.c_str(), tz_str.length()); |
344 | 49 | } |
345 | | |
346 | 209 | int MysqlRowBuffer::push_null() { |
347 | 209 | uint offset = (_field_pos + 2) / 8 + 1; |
348 | 209 | uint bit = (1 << ((_field_pos + 2) & 7)); |
349 | | /* Room for this as it's allocated start_binary_row*/ |
350 | 209 | char* to = _buf + offset; |
351 | 209 | *to = (char)((uchar)*to | (uchar)bit); |
352 | 209 | _field_pos++; |
353 | 209 | return 0; |
354 | 209 | } |
355 | | |
356 | | template int MysqlRowBuffer::push_vec_datetime<DateV2Value<DateV2ValueType>>( |
357 | | DateV2Value<DateV2ValueType>& value, int scale); |
358 | | template int MysqlRowBuffer::push_vec_datetime<DateV2Value<DateTimeV2ValueType>>( |
359 | | DateV2Value<DateTimeV2ValueType>& value, int scale); |
360 | | template int MysqlRowBuffer::push_vec_datetime<VecDateTimeValue>(VecDateTimeValue& value, |
361 | | int scale); |
362 | | |
363 | | } // namespace doris |