Coverage Report

Created: 2026-03-12 17:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/row_cursor.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/row_cursor.h"
19
20
#include <glog/logging.h>
21
#include <stdlib.h>
22
23
#include <algorithm>
24
#include <new>
25
#include <numeric>
26
#include <ostream>
27
28
#include "common/cast_set.h"
29
#include "storage/field.h"
30
#include "storage/olap_common.h"
31
#include "storage/olap_define.h"
32
#include "storage/tablet/tablet_schema.h"
33
#include "util/slice.h"
34
35
using std::nothrow;
36
using std::string;
37
using std::vector;
38
39
namespace doris {
40
#include "common/compile_check_begin.h"
41
using namespace ErrorCode;
42
RowCursor::RowCursor()
43
3.11M
        : _fixed_len(0), _variable_len(0), _string_field_count(0), _long_text_buf(nullptr) {}
44
45
3.11M
RowCursor::~RowCursor() {
46
3.11M
    delete[] _owned_fixed_buf;
47
3.11M
    delete[] _variable_buf;
48
3.11M
    if (_string_field_count > 0 && _long_text_buf != nullptr) {
49
0
        for (int i = 0; i < _string_field_count; ++i) {
50
0
            free(_long_text_buf[i]);
51
0
        }
52
0
        free(_long_text_buf);
53
0
    }
54
3.11M
}
55
56
3.10M
Status RowCursor::_init(const std::vector<uint32_t>& columns) {
57
3.10M
    _variable_len = 0;
58
5.73M
    for (auto cid : columns) {
59
5.73M
        if (_schema->column(cid) == nullptr) {
60
0
            return Status::Error<INIT_FAILED>("Fail to malloc _fixed_buf.");
61
0
        }
62
5.73M
        _variable_len += column_schema(cid)->get_variable_len();
63
5.73M
        if (_schema->column(cid)->type() == FieldType::OLAP_FIELD_TYPE_STRING) {
64
0
            ++_string_field_count;
65
0
        }
66
5.73M
    }
67
68
3.10M
    _fixed_len = _schema->schema_size();
69
3.10M
    _fixed_buf = new (nothrow) char[_fixed_len]();
70
3.10M
    if (_fixed_buf == nullptr) {
71
0
        return Status::Error<MEM_ALLOC_FAILED>("Fail to malloc _fixed_buf.");
72
0
    }
73
3.10M
    _owned_fixed_buf = _fixed_buf;
74
75
3.10M
    return Status::OK();
76
3.10M
}
77
78
Status RowCursor::_init(const std::shared_ptr<Schema>& shared_schema,
79
3.11M
                        const std::vector<uint32_t>& columns) {
80
3.11M
    _schema.reset(new Schema(*shared_schema));
81
3.11M
    return _init(columns);
82
3.11M
}
83
84
Status RowCursor::_init(const std::vector<TabletColumnPtr>& schema,
85
327
                        const std::vector<uint32_t>& columns) {
86
327
    _schema.reset(new Schema(schema, columns));
87
327
    return _init(columns);
88
327
}
89
90
Status RowCursor::_init_scan_key(TabletSchemaSPtr schema,
91
3.11M
                                 const std::vector<std::string>& scan_keys) {
92
    // NOTE: cid equal with column index
93
    // Hyperloglog cannot be key, no need to handle it
94
3.11M
    _variable_len = 0;
95
5.73M
    for (auto cid : _schema->column_ids()) {
96
5.73M
        const TabletColumn& column = schema->column(cid);
97
5.73M
        FieldType type = column.type();
98
5.73M
        if (type == FieldType::OLAP_FIELD_TYPE_VARCHAR) {
99
3.43M
            _variable_len += scan_keys[cid].length();
100
3.43M
        } else if (type == FieldType::OLAP_FIELD_TYPE_CHAR ||
101
2.29M
                   type == FieldType::OLAP_FIELD_TYPE_ARRAY) {
102
1.39k
            _variable_len +=
103
1.39k
                    std::max(scan_keys[cid].length(), static_cast<size_t>(column.length()));
104
2.29M
        } else if (type == FieldType::OLAP_FIELD_TYPE_STRING) {
105
0
            ++_string_field_count;
106
0
        }
107
5.73M
    }
108
109
    // variable_len for null bytes
110
3.11M
    RETURN_IF_ERROR(_alloc_buf());
111
3.11M
    char* fixed_ptr = _fixed_buf;
112
3.11M
    char* variable_ptr = _variable_buf;
113
3.11M
    char** long_text_ptr = _long_text_buf;
114
5.71M
    for (auto cid : _schema->column_ids()) {
115
5.71M
        const TabletColumn& column = schema->column(cid);
116
5.71M
        fixed_ptr = _fixed_buf + _schema->column_offset(cid);
117
5.71M
        FieldType type = column.type();
118
5.71M
        if (type == FieldType::OLAP_FIELD_TYPE_VARCHAR) {
119
            // Use memcpy to avoid misaligned store on fixed_ptr + 1
120
3.43M
            Slice slice(variable_ptr, scan_keys[cid].length());
121
3.43M
            memcpy(fixed_ptr + 1, &slice, sizeof(Slice));
122
3.43M
            variable_ptr += scan_keys[cid].length();
123
3.43M
        } else if (type == FieldType::OLAP_FIELD_TYPE_CHAR) {
124
            // Use memcpy to avoid misaligned store on fixed_ptr + 1
125
1.39k
            size_t len = std::max(scan_keys[cid].length(), static_cast<size_t>(column.length()));
126
1.39k
            Slice slice(variable_ptr, len);
127
1.39k
            memcpy(fixed_ptr + 1, &slice, sizeof(Slice));
128
1.39k
            variable_ptr += len;
129
2.27M
        } else if (type == FieldType::OLAP_FIELD_TYPE_STRING) {
130
            // Use memcpy to avoid misaligned store on fixed_ptr + 1
131
0
            _schema->mutable_column(cid)->set_long_text_buf(long_text_ptr);
132
0
            Slice slice(*(long_text_ptr), DEFAULT_TEXT_LENGTH);
133
0
            memcpy(fixed_ptr + 1, &slice, sizeof(Slice));
134
0
            ++long_text_ptr;
135
0
        }
136
5.71M
    }
137
138
3.11M
    return Status::OK();
139
3.11M
}
140
141
64
Status RowCursor::_init(TabletSchemaSPtr schema, uint32_t column_count) {
142
64
    if (column_count > schema->num_columns()) {
143
0
        return Status::Error<INVALID_ARGUMENT>(
144
0
                "Input param are invalid. Column count is bigger than num_columns of schema. "
145
0
                "column_count={}, schema.num_columns={}",
146
0
                column_count, schema->num_columns());
147
0
    }
148
64
    std::vector<uint32_t> columns;
149
2.27k
    for (auto i = 0; i < column_count; ++i) {
150
2.20k
        columns.push_back(i);
151
2.20k
    }
152
64
    RETURN_IF_ERROR(_init(schema->columns(), columns));
153
64
    return Status::OK();
154
64
}
155
156
Status RowCursor::init_scan_key(TabletSchemaSPtr schema,
157
263
                                const std::vector<std::string>& scan_keys) {
158
263
    size_t scan_key_size = scan_keys.size();
159
263
    if (scan_key_size > schema->num_columns()) {
160
0
        return Status::Error<INVALID_ARGUMENT>(
161
0
                "Input param are invalid. Column count is bigger than num_columns of schema. "
162
0
                "column_count={}, schema.num_columns={}",
163
0
                scan_key_size, schema->num_columns());
164
0
    }
165
166
263
    std::vector<uint32_t> columns(scan_key_size);
167
263
    std::iota(columns.begin(), columns.end(), 0);
168
169
263
    RETURN_IF_ERROR(_init(schema->columns(), columns));
170
171
263
    return _init_scan_key(schema, scan_keys);
172
263
}
173
174
Status RowCursor::init_scan_key(TabletSchemaSPtr schema, const std::vector<std::string>& scan_keys,
175
3.11M
                                const std::shared_ptr<Schema>& shared_schema) {
176
3.11M
    size_t scan_key_size = scan_keys.size();
177
178
3.11M
    std::vector<uint32_t> columns;
179
8.85M
    for (uint32_t i = 0; i < scan_key_size; ++i) {
180
5.74M
        columns.push_back(i);
181
5.74M
    }
182
183
3.11M
    RETURN_IF_ERROR(_init(shared_schema, columns));
184
185
3.11M
    return _init_scan_key(schema, scan_keys);
186
3.11M
}
187
188
3.10M
Status RowCursor::from_tuple(const OlapTuple& tuple) {
189
3.10M
    if (tuple.size() != _schema->num_column_ids()) {
190
0
        return Status::Error<INVALID_ARGUMENT>(
191
0
                "column count does not match. tuple_size={}, field_count={}", tuple.size(),
192
0
                _schema->num_column_ids());
193
0
    }
194
3.10M
    _row_string.resize(tuple.size());
195
196
8.83M
    for (size_t i = 0; i < tuple.size(); ++i) {
197
5.73M
        auto cid = _schema->column_ids()[i];
198
5.73M
        const StorageField* field = column_schema(cid);
199
5.73M
        if (tuple.is_null(i)) {
200
245k
            _set_null(cid);
201
245k
            continue;
202
245k
        }
203
5.49M
        _set_not_null(cid);
204
5.49M
        _row_string[i] = tuple.get_value(i);
205
5.49M
        char* buf = _cell_ptr(cid);
206
5.49M
        Status res = field->from_string(buf, tuple.get_value(i), field->get_precision(),
207
5.49M
                                        field->get_scale());
208
5.49M
        if (!res.ok()) {
209
0
            LOG(WARNING) << "fail to convert field from string. string=" << tuple.get_value(i)
210
0
                         << ", res=" << res;
211
0
            return res;
212
0
        }
213
5.49M
    }
214
215
3.10M
    return Status::OK();
216
3.10M
}
217
218
8.07M
std::string RowCursor::to_string() const {
219
8.07M
    std::string result;
220
8.07M
    size_t i = 0;
221
18.7M
    for (auto cid : _schema->column_ids()) {
222
18.7M
        if (i > 0) {
223
10.6M
            result.append("|");
224
10.6M
        }
225
226
18.7M
        result.append(std::to_string(_is_null(cid)));
227
18.7M
        result.append("&");
228
18.7M
        if (_is_null(cid)) {
229
504k
            result.append("NULL");
230
18.2M
        } else {
231
18.2M
            result.append(_row_string[i]);
232
18.2M
        }
233
18.7M
        ++i;
234
18.7M
    }
235
236
8.07M
    return result;
237
8.07M
}
238
3.09M
Status RowCursor::_alloc_buf() {
239
    // variable_len for null bytes
240
3.09M
    _variable_buf = new (nothrow) char[_variable_len]();
241
3.09M
    if (_variable_buf == nullptr) {
242
0
        return Status::Error<MEM_ALLOC_FAILED>("Fail to malloc _variable_buf.");
243
0
    }
244
3.09M
    if (_string_field_count > 0) {
245
0
        _long_text_buf = (char**)malloc(_string_field_count * sizeof(char*));
246
0
        if (_long_text_buf == nullptr) {
247
0
            return Status::Error<MEM_ALLOC_FAILED>("Fail to malloc _long_text_buf.");
248
0
        }
249
0
        for (int i = 0; i < _string_field_count; ++i) {
250
0
            _long_text_buf[i] = (char*)malloc(DEFAULT_TEXT_LENGTH * sizeof(char));
251
0
            if (_long_text_buf[i] == nullptr) {
252
0
                return Status::Error<MEM_ALLOC_FAILED>("Fail to malloc _long_text_buf.");
253
0
            }
254
0
        }
255
0
    }
256
3.09M
    return Status::OK();
257
3.09M
}
258
259
#include "common/compile_check_end.h"
260
} // namespace doris