Coverage Report

Created: 2026-03-12 17:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/mysql_row_buffer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "util/mysql_row_buffer.h"
19
20
#include <assert.h>
21
#include <fmt/compile.h>
22
#include <fmt/format.h>
23
#include <string.h>
24
#include <sys/types.h>
25
26
#include <algorithm>
27
#include <new>
28
#include <string>
29
30
#include "common/logging.h"
31
#include "core/value/decimalv2_value.h"
32
#include "core/value/ipv4_value.h"
33
#include "core/value/ipv6_value.h"
34
#include "core/value/large_int_value.h"
35
#include "core/value/timestamptz_value.h"
36
#include "core/value/vdatetime_value.h" // IWYU pragma: keep
37
#include "exprs/function/cast/cast_to_string.h"
38
#include "storage/olap_common.h"
39
#include "util/date_func.h"
40
#include "util/mysql_global.h"
41
42
namespace doris {
43
44
static uint8_t NEXT_TWO_BYTE = 252;
45
static uint8_t NEXT_THREE_BYTE = 253;
46
static uint8_t NEXT_EIGHT_BYTE = 254;
47
// the EXTRA_RESERVE_BYTE wanner to make sure _pos pointer is always in _buf memory
48
// used in reserve() for allocate current buffer
49
static size_t EXTRA_RESERVE_BYTE = 16;
50
51
// the first byte:
52
// <= 250: length
53
// = 251: nullptr
54
// = 252: the next two byte is length
55
// = 253: the next three byte is length
56
// = 254: the next eight byte is length
57
678
static char* pack_vlen(char* packet, uint64_t length) {
58
678
    if (length < 251ULL) {
59
662
        int1store(packet, length);
60
662
        return packet + 1;
61
662
    }
62
63
    /* 251 is reserved for nullptr */
64
16
    if (length < 65536ULL) {
65
16
        *packet++ = NEXT_TWO_BYTE;
66
16
        int2store(packet, length);
67
16
        return packet + 2;
68
16
    }
69
70
0
    if (length < 16777216ULL) {
71
0
        *packet++ = NEXT_THREE_BYTE;
72
0
        int3store(packet, length);
73
0
        return packet + 3;
74
0
    }
75
76
0
    *packet++ = NEXT_EIGHT_BYTE;
77
0
    int8store(packet, length);
78
0
    return packet + 8;
79
0
}
80
81
MysqlRowBuffer::MysqlRowBuffer()
82
220
        : _pos(_default_buf), _buf(_default_buf), _buf_size(sizeof(_default_buf)) {}
83
84
516
void MysqlRowBuffer::start_binary_row(uint64_t num_cols) {
85
516
    auto bit_fields = (num_cols + 9) / 8;
86
516
    reserve(bit_fields + 1);
87
516
    memset(_pos, 0, 1 + bit_fields);
88
516
    _pos += bit_fields + 1;
89
516
    _field_pos = 0;
90
516
}
91
92
220
MysqlRowBuffer::~MysqlRowBuffer() {
93
220
    if (_buf != _default_buf) {
94
0
        delete[] _buf;
95
0
        _buf = _default_buf;
96
0
    }
97
220
}
98
99
2.46k
int MysqlRowBuffer::reserve(int64_t size) {
100
2.46k
    DCHECK(size > 0);
101
102
2.46k
    int64_t need_size = size + (_pos - _buf);
103
104
2.46k
    if (need_size <= _buf_size) {
105
2.46k
        return 0;
106
2.46k
    }
107
108
0
    int64_t alloc_size = std::max(need_size, _buf_size * 2) + EXTRA_RESERVE_BYTE;
109
0
    char* new_buf = new char[alloc_size];
110
111
0
    size_t offset = _pos - _buf;
112
0
    memcpy(new_buf, _buf, offset);
113
114
0
    if (_buf != _default_buf) {
115
0
        delete[] _buf;
116
0
    }
117
118
0
    _pos = new_buf + offset;
119
0
    _buf = new_buf;
120
0
    _buf_size = alloc_size;
121
122
0
    return 0;
123
2.46k
}
124
125
0
static char* add_decimal(const DecimalV2Value& data, int round_scale, char* pos) {
126
0
    int length = data.to_buffer(pos + 1, round_scale);
127
0
    int1store(pos++, length);
128
0
    return pos + length;
129
0
}
130
131
1.26k
int MysqlRowBuffer::append(const char* data, int64_t len) {
132
1.26k
    reserve(len);
133
1.26k
    memcpy(_pos, data, len);
134
1.26k
    _pos += len;
135
1.26k
    return 0;
136
1.26k
}
137
138
0
int MysqlRowBuffer::append_var_string(const char* data, int64_t len) {
139
    /*
140
     The +9 comes from that strings of length longer than 16M require
141
     9 bytes to be stored (see net_store_length).
142
    */
143
0
    reserve(len + 9);
144
0
    _pos = pack_vlen(_pos, len);
145
0
    memcpy(_pos, data, len);
146
0
    _pos += len;
147
0
    return 0;
148
0
}
149
150
100
int MysqlRowBuffer::push_tinyint(int8_t data) {
151
100
    char buff[1];
152
100
    _field_pos++;
153
100
    int1store(buff, data);
154
100
    return append(buff, 1);
155
100
}
156
157
32
int MysqlRowBuffer::push_smallint(int16_t data) {
158
32
    char buff[2];
159
32
    _field_pos++;
160
32
    int2store(buff, data);
161
32
    return append(buff, 2);
162
32
}
163
164
367
int MysqlRowBuffer::push_int(int32_t data) {
165
367
    char buff[4];
166
367
    _field_pos++;
167
367
    int4store(buff, data);
168
367
    return append(buff, 4);
169
367
}
170
171
50
int MysqlRowBuffer::push_bigint(int64_t data) {
172
50
    char buff[8];
173
50
    _field_pos++;
174
50
    int8store(buff, data);
175
50
    return append(buff, 8);
176
50
}
177
178
0
int MysqlRowBuffer::push_unsigned_bigint(uint64_t data) {
179
0
    char buff[8];
180
0
    _field_pos++;
181
0
    int8store(buff, data);
182
0
    return append(buff, 8);
183
0
}
184
185
0
int MysqlRowBuffer::push_largeint(int128_t data) {
186
    // large int as type string
187
0
    std::string value = LargeIntValue::to_string(data);
188
0
    _field_pos++;
189
0
    return append_var_string(value.data(), value.size());
190
0
}
191
192
99
int MysqlRowBuffer::push_float(float data) {
193
99
    char buff[4];
194
99
    _field_pos++;
195
99
    float4store(buff, data);
196
99
    return append(buff, 4);
197
99
}
198
199
28
int MysqlRowBuffer::push_double(double data) {
200
28
    char buff[8];
201
28
    _field_pos++;
202
28
    float8store(buff, data);
203
28
    return append(buff, 8);
204
28
}
205
206
// Refer to https://dev.mysql.com/doc/refman/5.7/en/time.html
207
// Encode time into MySQL binary protocol format with support for scale (microsecond precision)
208
// Time value is limited between '-838:59:59' and '838:59:59'
209
11
static int encode_binary_timev2(char* buff, double time, int scale) {
210
    // Check if scale is valid (0 to 6)
211
11
    if (scale < 0 || scale > 6) {
212
0
        return -1; // Return error for invalid scale
213
0
    }
214
215
11
    int pos = 0;                      // Current position in the buffer
216
11
    bool is_negative = time < 0;      // Determine if the time is negative
217
11
    double abs_time = std::abs(time); // Convert time to absolute value
218
219
    // Maximum time in microseconds: 838 hours, 59 minutes, 59 seconds
220
11
    const int64_t MAX_TIME_MICROSECONDS = (838 * 3600 + 59 * 60 + 59) * 1000000LL;
221
222
    // Convert time into microseconds and enforce range limit
223
11
    auto total_microseconds = static_cast<int64_t>(abs_time); // Total microseconds
224
11
    total_microseconds = std::min(total_microseconds, MAX_TIME_MICROSECONDS);
225
226
    // Adjust microseconds precision based on scale
227
11
    total_microseconds /= static_cast<int64_t>(std::pow(10, 6 - scale)); // Scale adjustment
228
11
    total_microseconds *= static_cast<int64_t>(std::pow(10, 6 - scale)); // Truncate extra precision
229
230
    // Extract days, hours, minutes, seconds, and microseconds
231
11
    int64_t days = total_microseconds / (3600LL * 24 * 1000000); // Calculate days
232
11
    total_microseconds %= (3600LL * 24 * 1000000);
233
234
11
    int64_t hours = total_microseconds / (3600LL * 1000000); // Remaining hours
235
11
    total_microseconds %= (3600LL * 1000000);
236
237
11
    int64_t minutes = total_microseconds / (60LL * 1000000); // Remaining minutes
238
11
    total_microseconds %= (60LL * 1000000);
239
240
11
    int64_t seconds = total_microseconds / 1000000;      // Remaining seconds
241
11
    int64_t microseconds = total_microseconds % 1000000; // Remaining microseconds
242
243
    // MySQL binary protocol rules for time encoding
244
11
    if (days == 0 && hours == 0 && minutes == 0 && seconds == 0 && microseconds == 0) {
245
1
        buff[pos++] = 0; // All zero: length is 0
246
10
    } else if (microseconds == 0) {
247
7
        buff[pos++] = 8;                                    // No microseconds: length is 8
248
7
        buff[pos++] = is_negative ? 1 : 0;                  // Sign byte
249
7
        int4store(buff + pos, static_cast<uint32_t>(days)); // Store days (4 bytes)
250
7
        pos += 4;
251
7
        buff[pos++] = static_cast<char>(hours);   // Store hours (1 byte)
252
7
        buff[pos++] = static_cast<char>(minutes); // Store minutes (1 byte)
253
7
        buff[pos++] = static_cast<char>(seconds); // Store seconds (1 byte)
254
7
    } else {
255
3
        buff[pos++] = 12;                                   // Include microseconds: length is 12
256
3
        buff[pos++] = is_negative ? 1 : 0;                  // Sign byte
257
3
        int4store(buff + pos, static_cast<uint32_t>(days)); // Store days (4 bytes)
258
3
        pos += 4;
259
3
        buff[pos++] = static_cast<char>(hours);                     // Store hours (1 byte)
260
3
        buff[pos++] = static_cast<char>(minutes);                   // Store minutes (1 byte)
261
3
        buff[pos++] = static_cast<char>(seconds);                   // Store seconds (1 byte)
262
3
        int4store(buff + pos, static_cast<uint32_t>(microseconds)); // Store microseconds (4 bytes)
263
3
        pos += 4;
264
3
    }
265
266
11
    return pos; // Return total bytes written to buffer
267
11
}
268
269
11
int MysqlRowBuffer::push_timev2(double data, int scale) {
270
11
    char buff[13];
271
11
    _field_pos++;
272
11
    int length = encode_binary_timev2(buff, data, scale);
273
11
    return append(buff, length);
274
11
}
275
276
template <typename DateType>
277
581
int MysqlRowBuffer::push_vec_datetime(DateType& data, int scale) {
278
581
    return push_datetime(data, scale);
279
581
}
_ZN5doris14MysqlRowBuffer17push_vec_datetimeINS_11DateV2ValueINS_15DateV2ValueTypeEEEEEiRT_i
Line
Count
Source
277
216
int MysqlRowBuffer::push_vec_datetime(DateType& data, int scale) {
278
216
    return push_datetime(data, scale);
279
216
}
_ZN5doris14MysqlRowBuffer17push_vec_datetimeINS_11DateV2ValueINS_19DateTimeV2ValueTypeEEEEEiRT_i
Line
Count
Source
277
365
int MysqlRowBuffer::push_vec_datetime(DateType& data, int scale) {
278
365
    return push_datetime(data, scale);
279
365
}
Unexecuted instantiation: _ZN5doris14MysqlRowBuffer17push_vec_datetimeINS_16VecDateTimeValueEEEiRT_i
280
281
template <typename DateType>
282
581
int MysqlRowBuffer::push_datetime(const DateType& data, int scale) {
283
581
    char buff[12], *pos;
284
581
    size_t length;
285
581
    _field_pos++;
286
581
    pos = buff + 1;
287
288
581
    int2store(pos, data.year());
289
581
    pos[2] = (uchar)data.month();
290
581
    pos[3] = (uchar)data.day();
291
581
    pos[4] = (uchar)data.hour();
292
581
    pos[5] = (uchar)data.minute();
293
581
    pos[6] = (uchar)data.second();
294
581
    if (data.hour() || data.minute() || data.second()) {
295
275
        length = 7;
296
306
    } else if (data.year() || data.month() || data.day()) {
297
306
        length = 4;
298
306
    } else {
299
0
        length = 0;
300
0
    }
301
    if constexpr (std::is_same_v<DateType, DateV2Value<DateV2ValueType>> ||
302
581
                  std::is_same_v<DateType, DateV2Value<DateTimeV2ValueType>>) {
303
581
        if (scale > 0 || data.microsecond()) {
304
194
            int4store(pos + 7, data.microsecond());
305
194
            length = 11;
306
194
        }
307
581
    }
308
309
581
    buff[0] = (char)length; // Length is stored first
310
581
    return append(buff, length + 1);
311
581
}
_ZN5doris14MysqlRowBuffer13push_datetimeINS_11DateV2ValueINS_15DateV2ValueTypeEEEEEiRKT_i
Line
Count
Source
282
216
int MysqlRowBuffer::push_datetime(const DateType& data, int scale) {
283
216
    char buff[12], *pos;
284
216
    size_t length;
285
216
    _field_pos++;
286
216
    pos = buff + 1;
287
288
216
    int2store(pos, data.year());
289
216
    pos[2] = (uchar)data.month();
290
216
    pos[3] = (uchar)data.day();
291
216
    pos[4] = (uchar)data.hour();
292
216
    pos[5] = (uchar)data.minute();
293
216
    pos[6] = (uchar)data.second();
294
216
    if (data.hour() || data.minute() || data.second()) {
295
0
        length = 7;
296
216
    } else if (data.year() || data.month() || data.day()) {
297
216
        length = 4;
298
216
    } else {
299
0
        length = 0;
300
0
    }
301
    if constexpr (std::is_same_v<DateType, DateV2Value<DateV2ValueType>> ||
302
216
                  std::is_same_v<DateType, DateV2Value<DateTimeV2ValueType>>) {
303
216
        if (scale > 0 || data.microsecond()) {
304
0
            int4store(pos + 7, data.microsecond());
305
0
            length = 11;
306
0
        }
307
216
    }
308
309
216
    buff[0] = (char)length; // Length is stored first
310
216
    return append(buff, length + 1);
311
216
}
_ZN5doris14MysqlRowBuffer13push_datetimeINS_11DateV2ValueINS_19DateTimeV2ValueTypeEEEEEiRKT_i
Line
Count
Source
282
365
int MysqlRowBuffer::push_datetime(const DateType& data, int scale) {
283
365
    char buff[12], *pos;
284
365
    size_t length;
285
365
    _field_pos++;
286
365
    pos = buff + 1;
287
288
365
    int2store(pos, data.year());
289
365
    pos[2] = (uchar)data.month();
290
365
    pos[3] = (uchar)data.day();
291
365
    pos[4] = (uchar)data.hour();
292
365
    pos[5] = (uchar)data.minute();
293
365
    pos[6] = (uchar)data.second();
294
365
    if (data.hour() || data.minute() || data.second()) {
295
275
        length = 7;
296
275
    } else if (data.year() || data.month() || data.day()) {
297
90
        length = 4;
298
90
    } else {
299
0
        length = 0;
300
0
    }
301
    if constexpr (std::is_same_v<DateType, DateV2Value<DateV2ValueType>> ||
302
365
                  std::is_same_v<DateType, DateV2Value<DateTimeV2ValueType>>) {
303
365
        if (scale > 0 || data.microsecond()) {
304
194
            int4store(pos + 7, data.microsecond());
305
194
            length = 11;
306
194
        }
307
365
    }
308
309
365
    buff[0] = (char)length; // Length is stored first
310
365
    return append(buff, length + 1);
311
365
}
Unexecuted instantiation: _ZN5doris14MysqlRowBuffer13push_datetimeINS_16VecDateTimeValueEEEiRKT_i
312
313
0
int MysqlRowBuffer::push_decimal(const DecimalV2Value& data, int round_scale) {
314
0
    ++_field_pos;
315
0
    reserve(2 + MAX_DECIMAL_WIDTH);
316
0
    _pos = add_decimal(data, round_scale, _pos);
317
0
    return 0;
318
0
}
319
320
0
int MysqlRowBuffer::push_ipv4(const IPv4Value& ipv4_val) {
321
0
    auto ipv4_str = ipv4_val.to_string();
322
0
    return push_string(ipv4_str.c_str(), ipv4_str.length());
323
0
}
324
325
0
int MysqlRowBuffer::push_ipv6(const IPv6Value& ipv6_val) {
326
0
    auto ipv6_str = ipv6_val.to_string();
327
0
    return push_string(ipv6_str.c_str(), ipv6_str.length());
328
0
}
329
330
678
int MysqlRowBuffer::push_string(const char* str, int64_t length) {
331
678
    ++_field_pos;
332
678
    DCHECK(str != nullptr) << "input string is nullptr.";
333
678
    reserve(9 + length);
334
678
    _pos = pack_vlen(_pos, length);
335
678
    memcpy(_pos, str, length);
336
678
    _pos += length;
337
678
    return 0;
338
678
}
339
340
int MysqlRowBuffer::push_timestamptz(const TimestampTzValue& tz,
341
49
                                     const cctz::time_zone& local_time_zone, int scale) {
342
49
    auto tz_str = tz.to_string(local_time_zone, scale);
343
49
    return push_string(tz_str.c_str(), tz_str.length());
344
49
}
345
346
209
int MysqlRowBuffer::push_null() {
347
209
    uint offset = (_field_pos + 2) / 8 + 1;
348
209
    uint bit = (1 << ((_field_pos + 2) & 7));
349
    /* Room for this as it's allocated start_binary_row*/
350
209
    char* to = _buf + offset;
351
209
    *to = (char)((uchar)*to | (uchar)bit);
352
209
    _field_pos++;
353
209
    return 0;
354
209
}
355
356
template int MysqlRowBuffer::push_vec_datetime<DateV2Value<DateV2ValueType>>(
357
        DateV2Value<DateV2ValueType>& value, int scale);
358
template int MysqlRowBuffer::push_vec_datetime<DateV2Value<DateTimeV2ValueType>>(
359
        DateV2Value<DateTimeV2ValueType>& value, int scale);
360
template int MysqlRowBuffer::push_vec_datetime<VecDateTimeValue>(VecDateTimeValue& value,
361
                                                                 int scale);
362
363
} // namespace doris