Coverage Report

Created: 2026-03-16 11:17

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/row_cursor.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/row_cursor.h"
19
20
#include <glog/logging.h>
21
#include <stdlib.h>
22
23
#include <algorithm>
24
#include <new>
25
#include <numeric>
26
#include <ostream>
27
28
#include "common/cast_set.h"
29
#include "storage/field.h"
30
#include "storage/olap_common.h"
31
#include "storage/olap_define.h"
32
#include "storage/tablet/tablet_schema.h"
33
#include "util/slice.h"
34
35
using std::nothrow;
36
using std::string;
37
using std::vector;
38
39
namespace doris {
40
#include "common/compile_check_begin.h"
41
using namespace ErrorCode;
42
RowCursor::RowCursor()
43
2.90M
        : _fixed_len(0), _variable_len(0), _string_field_count(0), _long_text_buf(nullptr) {}
44
45
2.91M
RowCursor::~RowCursor() {
46
2.91M
    delete[] _owned_fixed_buf;
47
2.91M
    delete[] _variable_buf;
48
2.91M
    if (_string_field_count > 0 && _long_text_buf != nullptr) {
49
0
        for (int i = 0; i < _string_field_count; ++i) {
50
0
            free(_long_text_buf[i]);
51
0
        }
52
0
        free(_long_text_buf);
53
0
    }
54
2.91M
}
55
56
2.90M
Status RowCursor::_init(const std::vector<uint32_t>& columns) {
57
2.90M
    _variable_len = 0;
58
5.47M
    for (auto cid : columns) {
59
5.47M
        if (_schema->column(cid) == nullptr) {
60
0
            return Status::Error<INIT_FAILED>("Fail to malloc _fixed_buf.");
61
0
        }
62
5.47M
        _variable_len += column_schema(cid)->get_variable_len();
63
5.47M
        if (_schema->column(cid)->type() == FieldType::OLAP_FIELD_TYPE_STRING) {
64
0
            ++_string_field_count;
65
0
        }
66
5.47M
    }
67
68
2.90M
    _fixed_len = _schema->schema_size();
69
2.90M
    _fixed_buf = new (nothrow) char[_fixed_len]();
70
2.90M
    if (_fixed_buf == nullptr) {
71
0
        return Status::Error<MEM_ALLOC_FAILED>("Fail to malloc _fixed_buf.");
72
0
    }
73
2.90M
    _owned_fixed_buf = _fixed_buf;
74
75
2.90M
    return Status::OK();
76
2.90M
}
77
78
Status RowCursor::_init(const std::shared_ptr<Schema>& shared_schema,
79
2.91M
                        const std::vector<uint32_t>& columns) {
80
2.91M
    _schema.reset(new Schema(*shared_schema));
81
2.91M
    return _init(columns);
82
2.91M
}
83
84
Status RowCursor::_init(const std::vector<TabletColumnPtr>& schema,
85
327
                        const std::vector<uint32_t>& columns) {
86
327
    _schema.reset(new Schema(schema, columns));
87
327
    return _init(columns);
88
327
}
89
90
Status RowCursor::_init_scan_key(TabletSchemaSPtr schema,
91
2.91M
                                 const std::vector<std::string>& scan_keys) {
92
    // NOTE: cid equal with column index
93
    // Hyperloglog cannot be key, no need to handle it
94
2.91M
    _variable_len = 0;
95
5.46M
    for (auto cid : _schema->column_ids()) {
96
5.46M
        const TabletColumn& column = schema->column(cid);
97
5.46M
        FieldType type = column.type();
98
5.46M
        if (type == FieldType::OLAP_FIELD_TYPE_VARCHAR) {
99
3.42M
            _variable_len += scan_keys[cid].length();
100
3.42M
        } else if (type == FieldType::OLAP_FIELD_TYPE_CHAR ||
101
2.04M
                   type == FieldType::OLAP_FIELD_TYPE_ARRAY) {
102
1.30k
            _variable_len +=
103
1.30k
                    std::max(scan_keys[cid].length(), static_cast<size_t>(column.length()));
104
2.03M
        } else if (type == FieldType::OLAP_FIELD_TYPE_STRING) {
105
0
            ++_string_field_count;
106
0
        }
107
5.46M
    }
108
109
    // variable_len for null bytes
110
2.91M
    RETURN_IF_ERROR(_alloc_buf());
111
2.91M
    char* fixed_ptr = _fixed_buf;
112
2.91M
    char* variable_ptr = _variable_buf;
113
2.91M
    char** long_text_ptr = _long_text_buf;
114
5.44M
    for (auto cid : _schema->column_ids()) {
115
5.44M
        const TabletColumn& column = schema->column(cid);
116
5.44M
        fixed_ptr = _fixed_buf + _schema->column_offset(cid);
117
5.44M
        FieldType type = column.type();
118
5.44M
        if (type == FieldType::OLAP_FIELD_TYPE_VARCHAR) {
119
            // Use memcpy to avoid misaligned store on fixed_ptr + 1
120
3.43M
            Slice slice(variable_ptr, scan_keys[cid].length());
121
3.43M
            memcpy(fixed_ptr + 1, &slice, sizeof(Slice));
122
3.43M
            variable_ptr += scan_keys[cid].length();
123
3.43M
        } else if (type == FieldType::OLAP_FIELD_TYPE_CHAR) {
124
            // Use memcpy to avoid misaligned store on fixed_ptr + 1
125
1.31k
            size_t len = std::max(scan_keys[cid].length(), static_cast<size_t>(column.length()));
126
1.31k
            Slice slice(variable_ptr, len);
127
1.31k
            memcpy(fixed_ptr + 1, &slice, sizeof(Slice));
128
1.31k
            variable_ptr += len;
129
2.01M
        } else if (type == FieldType::OLAP_FIELD_TYPE_STRING) {
130
            // Use memcpy to avoid misaligned store on fixed_ptr + 1
131
0
            _schema->mutable_column(cid)->set_long_text_buf(long_text_ptr);
132
0
            Slice slice(*(long_text_ptr), DEFAULT_TEXT_LENGTH);
133
0
            memcpy(fixed_ptr + 1, &slice, sizeof(Slice));
134
0
            ++long_text_ptr;
135
0
        }
136
5.44M
    }
137
138
2.91M
    return Status::OK();
139
2.91M
}
140
141
64
Status RowCursor::_init(TabletSchemaSPtr schema, uint32_t column_count) {
142
64
    if (column_count > schema->num_columns()) {
143
0
        return Status::Error<INVALID_ARGUMENT>(
144
0
                "Input param are invalid. Column count is bigger than num_columns of schema. "
145
0
                "column_count={}, schema.num_columns={}",
146
0
                column_count, schema->num_columns());
147
0
    }
148
64
    std::vector<uint32_t> columns;
149
2.27k
    for (auto i = 0; i < column_count; ++i) {
150
2.20k
        columns.push_back(i);
151
2.20k
    }
152
64
    RETURN_IF_ERROR(_init(schema->columns(), columns));
153
64
    return Status::OK();
154
64
}
155
156
Status RowCursor::init_scan_key(TabletSchemaSPtr schema,
157
263
                                const std::vector<std::string>& scan_keys) {
158
263
    size_t scan_key_size = scan_keys.size();
159
263
    if (scan_key_size > schema->num_columns()) {
160
0
        return Status::Error<INVALID_ARGUMENT>(
161
0
                "Input param are invalid. Column count is bigger than num_columns of schema. "
162
0
                "column_count={}, schema.num_columns={}",
163
0
                scan_key_size, schema->num_columns());
164
0
    }
165
166
263
    std::vector<uint32_t> columns(scan_key_size);
167
263
    std::iota(columns.begin(), columns.end(), 0);
168
169
263
    RETURN_IF_ERROR(_init(schema->columns(), columns));
170
171
263
    return _init_scan_key(schema, scan_keys);
172
263
}
173
174
Status RowCursor::init_scan_key(TabletSchemaSPtr schema, const std::vector<std::string>& scan_keys,
175
2.90M
                                const std::shared_ptr<Schema>& shared_schema) {
176
2.90M
    size_t scan_key_size = scan_keys.size();
177
178
2.90M
    std::vector<uint32_t> columns;
179
8.37M
    for (uint32_t i = 0; i < scan_key_size; ++i) {
180
5.46M
        columns.push_back(i);
181
5.46M
    }
182
183
2.90M
    RETURN_IF_ERROR(_init(shared_schema, columns));
184
185
2.90M
    return _init_scan_key(schema, scan_keys);
186
2.90M
}
187
188
2.90M
Status RowCursor::from_tuple(const OlapTuple& tuple) {
189
2.90M
    if (tuple.size() != _schema->num_column_ids()) {
190
0
        return Status::Error<INVALID_ARGUMENT>(
191
0
                "column count does not match. tuple_size={}, field_count={}", tuple.size(),
192
0
                _schema->num_column_ids());
193
0
    }
194
2.90M
    _row_string.resize(tuple.size());
195
196
8.36M
    for (size_t i = 0; i < tuple.size(); ++i) {
197
5.46M
        auto cid = _schema->column_ids()[i];
198
5.46M
        const StorageField* field = column_schema(cid);
199
5.46M
        if (tuple.is_null(i)) {
200
195k
            _set_null(cid);
201
195k
            continue;
202
195k
        }
203
5.27M
        _set_not_null(cid);
204
5.27M
        _row_string[i] = tuple.get_value(i);
205
5.27M
        char* buf = _cell_ptr(cid);
206
5.27M
        Status res = field->from_string(buf, tuple.get_value(i), field->get_precision(),
207
5.27M
                                        field->get_scale());
208
5.27M
        if (!res.ok()) {
209
0
            LOG(WARNING) << "fail to convert field from string. string=" << tuple.get_value(i)
210
0
                         << ", res=" << res;
211
0
            return res;
212
0
        }
213
5.27M
    }
214
215
2.90M
    return Status::OK();
216
2.90M
}
217
218
7.47M
std::string RowCursor::to_string() const {
219
7.47M
    std::string result;
220
7.47M
    size_t i = 0;
221
17.6M
    for (auto cid : _schema->column_ids()) {
222
17.6M
        if (i > 0) {
223
10.1M
            result.append("|");
224
10.1M
        }
225
226
17.6M
        result.append(std::to_string(_is_null(cid)));
227
17.6M
        result.append("&");
228
17.6M
        if (_is_null(cid)) {
229
396k
            result.append("NULL");
230
17.2M
        } else {
231
17.2M
            result.append(_row_string[i]);
232
17.2M
        }
233
17.6M
        ++i;
234
17.6M
    }
235
236
7.47M
    return result;
237
7.47M
}
238
2.90M
Status RowCursor::_alloc_buf() {
239
    // variable_len for null bytes
240
2.90M
    _variable_buf = new (nothrow) char[_variable_len]();
241
2.90M
    if (_variable_buf == nullptr) {
242
0
        return Status::Error<MEM_ALLOC_FAILED>("Fail to malloc _variable_buf.");
243
0
    }
244
2.90M
    if (_string_field_count > 0) {
245
0
        _long_text_buf = (char**)malloc(_string_field_count * sizeof(char*));
246
0
        if (_long_text_buf == nullptr) {
247
0
            return Status::Error<MEM_ALLOC_FAILED>("Fail to malloc _long_text_buf.");
248
0
        }
249
0
        for (int i = 0; i < _string_field_count; ++i) {
250
0
            _long_text_buf[i] = (char*)malloc(DEFAULT_TEXT_LENGTH * sizeof(char));
251
0
            if (_long_text_buf[i] == nullptr) {
252
0
                return Status::Error<MEM_ALLOC_FAILED>("Fail to malloc _long_text_buf.");
253
0
            }
254
0
        }
255
0
    }
256
2.90M
    return Status::OK();
257
2.90M
}
258
259
#include "common/compile_check_end.h"
260
} // namespace doris