be/src/storage/row_cursor.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/row_cursor.h" |
19 | | |
20 | | #include <glog/logging.h> |
21 | | #include <stdlib.h> |
22 | | |
23 | | #include <algorithm> |
24 | | #include <new> |
25 | | #include <numeric> |
26 | | #include <ostream> |
27 | | |
28 | | #include "common/cast_set.h" |
29 | | #include "storage/field.h" |
30 | | #include "storage/olap_common.h" |
31 | | #include "storage/olap_define.h" |
32 | | #include "storage/tablet/tablet_schema.h" |
33 | | #include "util/slice.h" |
34 | | |
35 | | using std::nothrow; |
36 | | using std::string; |
37 | | using std::vector; |
38 | | |
39 | | namespace doris { |
40 | | #include "common/compile_check_begin.h" |
41 | | using namespace ErrorCode; |
42 | | RowCursor::RowCursor() |
43 | 3.11M | : _fixed_len(0), _variable_len(0), _string_field_count(0), _long_text_buf(nullptr) {} |
44 | | |
45 | 3.11M | RowCursor::~RowCursor() { |
46 | 3.11M | delete[] _owned_fixed_buf; |
47 | 3.11M | delete[] _variable_buf; |
48 | 3.11M | if (_string_field_count > 0 && _long_text_buf != nullptr) { |
49 | 0 | for (int i = 0; i < _string_field_count; ++i) { |
50 | 0 | free(_long_text_buf[i]); |
51 | 0 | } |
52 | 0 | free(_long_text_buf); |
53 | 0 | } |
54 | 3.11M | } |
55 | | |
56 | 3.10M | Status RowCursor::_init(const std::vector<uint32_t>& columns) { |
57 | 3.10M | _variable_len = 0; |
58 | 5.73M | for (auto cid : columns) { |
59 | 5.73M | if (_schema->column(cid) == nullptr) { |
60 | 0 | return Status::Error<INIT_FAILED>("Fail to malloc _fixed_buf."); |
61 | 0 | } |
62 | 5.73M | _variable_len += column_schema(cid)->get_variable_len(); |
63 | 5.73M | if (_schema->column(cid)->type() == FieldType::OLAP_FIELD_TYPE_STRING) { |
64 | 0 | ++_string_field_count; |
65 | 0 | } |
66 | 5.73M | } |
67 | | |
68 | 3.10M | _fixed_len = _schema->schema_size(); |
69 | 3.10M | _fixed_buf = new (nothrow) char[_fixed_len](); |
70 | 3.10M | if (_fixed_buf == nullptr) { |
71 | 0 | return Status::Error<MEM_ALLOC_FAILED>("Fail to malloc _fixed_buf."); |
72 | 0 | } |
73 | 3.10M | _owned_fixed_buf = _fixed_buf; |
74 | | |
75 | 3.10M | return Status::OK(); |
76 | 3.10M | } |
77 | | |
78 | | Status RowCursor::_init(const std::shared_ptr<Schema>& shared_schema, |
79 | 3.11M | const std::vector<uint32_t>& columns) { |
80 | 3.11M | _schema.reset(new Schema(*shared_schema)); |
81 | 3.11M | return _init(columns); |
82 | 3.11M | } |
83 | | |
84 | | Status RowCursor::_init(const std::vector<TabletColumnPtr>& schema, |
85 | 327 | const std::vector<uint32_t>& columns) { |
86 | 327 | _schema.reset(new Schema(schema, columns)); |
87 | 327 | return _init(columns); |
88 | 327 | } |
89 | | |
90 | | Status RowCursor::_init_scan_key(TabletSchemaSPtr schema, |
91 | 3.11M | const std::vector<std::string>& scan_keys) { |
92 | | // NOTE: cid equal with column index |
93 | | // Hyperloglog cannot be key, no need to handle it |
94 | 3.11M | _variable_len = 0; |
95 | 5.73M | for (auto cid : _schema->column_ids()) { |
96 | 5.73M | const TabletColumn& column = schema->column(cid); |
97 | 5.73M | FieldType type = column.type(); |
98 | 5.73M | if (type == FieldType::OLAP_FIELD_TYPE_VARCHAR) { |
99 | 3.43M | _variable_len += scan_keys[cid].length(); |
100 | 3.43M | } else if (type == FieldType::OLAP_FIELD_TYPE_CHAR || |
101 | 2.29M | type == FieldType::OLAP_FIELD_TYPE_ARRAY) { |
102 | 1.39k | _variable_len += |
103 | 1.39k | std::max(scan_keys[cid].length(), static_cast<size_t>(column.length())); |
104 | 2.29M | } else if (type == FieldType::OLAP_FIELD_TYPE_STRING) { |
105 | 0 | ++_string_field_count; |
106 | 0 | } |
107 | 5.73M | } |
108 | | |
109 | | // variable_len for null bytes |
110 | 3.11M | RETURN_IF_ERROR(_alloc_buf()); |
111 | 3.11M | char* fixed_ptr = _fixed_buf; |
112 | 3.11M | char* variable_ptr = _variable_buf; |
113 | 3.11M | char** long_text_ptr = _long_text_buf; |
114 | 5.71M | for (auto cid : _schema->column_ids()) { |
115 | 5.71M | const TabletColumn& column = schema->column(cid); |
116 | 5.71M | fixed_ptr = _fixed_buf + _schema->column_offset(cid); |
117 | 5.71M | FieldType type = column.type(); |
118 | 5.71M | if (type == FieldType::OLAP_FIELD_TYPE_VARCHAR) { |
119 | | // Use memcpy to avoid misaligned store on fixed_ptr + 1 |
120 | 3.43M | Slice slice(variable_ptr, scan_keys[cid].length()); |
121 | 3.43M | memcpy(fixed_ptr + 1, &slice, sizeof(Slice)); |
122 | 3.43M | variable_ptr += scan_keys[cid].length(); |
123 | 3.43M | } else if (type == FieldType::OLAP_FIELD_TYPE_CHAR) { |
124 | | // Use memcpy to avoid misaligned store on fixed_ptr + 1 |
125 | 1.39k | size_t len = std::max(scan_keys[cid].length(), static_cast<size_t>(column.length())); |
126 | 1.39k | Slice slice(variable_ptr, len); |
127 | 1.39k | memcpy(fixed_ptr + 1, &slice, sizeof(Slice)); |
128 | 1.39k | variable_ptr += len; |
129 | 2.27M | } else if (type == FieldType::OLAP_FIELD_TYPE_STRING) { |
130 | | // Use memcpy to avoid misaligned store on fixed_ptr + 1 |
131 | 0 | _schema->mutable_column(cid)->set_long_text_buf(long_text_ptr); |
132 | 0 | Slice slice(*(long_text_ptr), DEFAULT_TEXT_LENGTH); |
133 | 0 | memcpy(fixed_ptr + 1, &slice, sizeof(Slice)); |
134 | 0 | ++long_text_ptr; |
135 | 0 | } |
136 | 5.71M | } |
137 | | |
138 | 3.11M | return Status::OK(); |
139 | 3.11M | } |
140 | | |
141 | 64 | Status RowCursor::_init(TabletSchemaSPtr schema, uint32_t column_count) { |
142 | 64 | if (column_count > schema->num_columns()) { |
143 | 0 | return Status::Error<INVALID_ARGUMENT>( |
144 | 0 | "Input param are invalid. Column count is bigger than num_columns of schema. " |
145 | 0 | "column_count={}, schema.num_columns={}", |
146 | 0 | column_count, schema->num_columns()); |
147 | 0 | } |
148 | 64 | std::vector<uint32_t> columns; |
149 | 2.27k | for (auto i = 0; i < column_count; ++i) { |
150 | 2.20k | columns.push_back(i); |
151 | 2.20k | } |
152 | 64 | RETURN_IF_ERROR(_init(schema->columns(), columns)); |
153 | 64 | return Status::OK(); |
154 | 64 | } |
155 | | |
156 | | Status RowCursor::init_scan_key(TabletSchemaSPtr schema, |
157 | 263 | const std::vector<std::string>& scan_keys) { |
158 | 263 | size_t scan_key_size = scan_keys.size(); |
159 | 263 | if (scan_key_size > schema->num_columns()) { |
160 | 0 | return Status::Error<INVALID_ARGUMENT>( |
161 | 0 | "Input param are invalid. Column count is bigger than num_columns of schema. " |
162 | 0 | "column_count={}, schema.num_columns={}", |
163 | 0 | scan_key_size, schema->num_columns()); |
164 | 0 | } |
165 | | |
166 | 263 | std::vector<uint32_t> columns(scan_key_size); |
167 | 263 | std::iota(columns.begin(), columns.end(), 0); |
168 | | |
169 | 263 | RETURN_IF_ERROR(_init(schema->columns(), columns)); |
170 | | |
171 | 263 | return _init_scan_key(schema, scan_keys); |
172 | 263 | } |
173 | | |
174 | | Status RowCursor::init_scan_key(TabletSchemaSPtr schema, const std::vector<std::string>& scan_keys, |
175 | 3.11M | const std::shared_ptr<Schema>& shared_schema) { |
176 | 3.11M | size_t scan_key_size = scan_keys.size(); |
177 | | |
178 | 3.11M | std::vector<uint32_t> columns; |
179 | 8.85M | for (uint32_t i = 0; i < scan_key_size; ++i) { |
180 | 5.74M | columns.push_back(i); |
181 | 5.74M | } |
182 | | |
183 | 3.11M | RETURN_IF_ERROR(_init(shared_schema, columns)); |
184 | | |
185 | 3.11M | return _init_scan_key(schema, scan_keys); |
186 | 3.11M | } |
187 | | |
188 | 3.10M | Status RowCursor::from_tuple(const OlapTuple& tuple) { |
189 | 3.10M | if (tuple.size() != _schema->num_column_ids()) { |
190 | 0 | return Status::Error<INVALID_ARGUMENT>( |
191 | 0 | "column count does not match. tuple_size={}, field_count={}", tuple.size(), |
192 | 0 | _schema->num_column_ids()); |
193 | 0 | } |
194 | 3.10M | _row_string.resize(tuple.size()); |
195 | | |
196 | 8.83M | for (size_t i = 0; i < tuple.size(); ++i) { |
197 | 5.73M | auto cid = _schema->column_ids()[i]; |
198 | 5.73M | const StorageField* field = column_schema(cid); |
199 | 5.73M | if (tuple.is_null(i)) { |
200 | 245k | _set_null(cid); |
201 | 245k | continue; |
202 | 245k | } |
203 | 5.49M | _set_not_null(cid); |
204 | 5.49M | _row_string[i] = tuple.get_value(i); |
205 | 5.49M | char* buf = _cell_ptr(cid); |
206 | 5.49M | Status res = field->from_string(buf, tuple.get_value(i), field->get_precision(), |
207 | 5.49M | field->get_scale()); |
208 | 5.49M | if (!res.ok()) { |
209 | 0 | LOG(WARNING) << "fail to convert field from string. string=" << tuple.get_value(i) |
210 | 0 | << ", res=" << res; |
211 | 0 | return res; |
212 | 0 | } |
213 | 5.49M | } |
214 | | |
215 | 3.10M | return Status::OK(); |
216 | 3.10M | } |
217 | | |
218 | 8.07M | std::string RowCursor::to_string() const { |
219 | 8.07M | std::string result; |
220 | 8.07M | size_t i = 0; |
221 | 18.7M | for (auto cid : _schema->column_ids()) { |
222 | 18.7M | if (i > 0) { |
223 | 10.6M | result.append("|"); |
224 | 10.6M | } |
225 | | |
226 | 18.7M | result.append(std::to_string(_is_null(cid))); |
227 | 18.7M | result.append("&"); |
228 | 18.7M | if (_is_null(cid)) { |
229 | 504k | result.append("NULL"); |
230 | 18.2M | } else { |
231 | 18.2M | result.append(_row_string[i]); |
232 | 18.2M | } |
233 | 18.7M | ++i; |
234 | 18.7M | } |
235 | | |
236 | 8.07M | return result; |
237 | 8.07M | } |
238 | 3.09M | Status RowCursor::_alloc_buf() { |
239 | | // variable_len for null bytes |
240 | 3.09M | _variable_buf = new (nothrow) char[_variable_len](); |
241 | 3.09M | if (_variable_buf == nullptr) { |
242 | 0 | return Status::Error<MEM_ALLOC_FAILED>("Fail to malloc _variable_buf."); |
243 | 0 | } |
244 | 3.09M | if (_string_field_count > 0) { |
245 | 0 | _long_text_buf = (char**)malloc(_string_field_count * sizeof(char*)); |
246 | 0 | if (_long_text_buf == nullptr) { |
247 | 0 | return Status::Error<MEM_ALLOC_FAILED>("Fail to malloc _long_text_buf."); |
248 | 0 | } |
249 | 0 | for (int i = 0; i < _string_field_count; ++i) { |
250 | 0 | _long_text_buf[i] = (char*)malloc(DEFAULT_TEXT_LENGTH * sizeof(char)); |
251 | 0 | if (_long_text_buf[i] == nullptr) { |
252 | 0 | return Status::Error<MEM_ALLOC_FAILED>("Fail to malloc _long_text_buf."); |
253 | 0 | } |
254 | 0 | } |
255 | 0 | } |
256 | 3.09M | return Status::OK(); |
257 | 3.09M | } |
258 | | |
259 | | #include "common/compile_check_end.h" |
260 | | } // namespace doris |