be/src/exec/sink/vtablet_block_convertor.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/sink/vtablet_block_convertor.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/FrontendService.h> |
22 | | #include <glog/logging.h> |
23 | | #include <google/protobuf/stubs/common.h> |
24 | | |
25 | | #include <algorithm> |
26 | | #include <memory> |
27 | | #include <string> |
28 | | #include <unordered_map> |
29 | | #include <utility> |
30 | | |
31 | | #include "common/compiler_util.h" // IWYU pragma: keep |
32 | | #include "common/consts.h" |
33 | | #include "common/status.h" |
34 | | #include "core/assert_cast.h" |
35 | | #include "core/binary_cast.hpp" |
36 | | #include "core/block/block.h" |
37 | | #include "core/column/column.h" |
38 | | #include "core/column/column_array.h" |
39 | | #include "core/column/column_const.h" |
40 | | #include "core/column/column_decimal.h" |
41 | | #include "core/column/column_map.h" |
42 | | #include "core/column/column_nullable.h" |
43 | | #include "core/column/column_string.h" |
44 | | #include "core/column/column_struct.h" |
45 | | #include "core/data_type/data_type.h" |
46 | | #include "core/data_type/data_type_array.h" |
47 | | #include "core/data_type/data_type_decimal.h" |
48 | | #include "core/data_type/data_type_factory.hpp" |
49 | | #include "core/data_type/data_type_map.h" |
50 | | #include "core/data_type/data_type_nullable.h" |
51 | | #include "core/data_type/data_type_struct.h" |
52 | | #include "core/data_type/define_primitive_type.h" |
53 | | #include "core/data_type/primitive_type.h" |
54 | | #include "core/types.h" |
55 | | #include "core/wide_integer_to_string.h" |
56 | | #include "exprs/function/function_helpers.h" |
57 | | #include "exprs/function/simple_function_factory.h" |
58 | | #include "exprs/vexpr.h" |
59 | | #include "exprs/vexpr_context.h" |
60 | | #include "runtime/descriptors.h" |
61 | | #include "runtime/runtime_state.h" |
62 | | #include "service/brpc.h" |
63 | | #include "storage/olap_common.h" |
64 | | #include "util/brpc_client_cache.h" |
65 | | #include "util/thread.h" |
66 | | |
67 | | namespace doris { |
68 | | |
69 | | // !FIXME: Here we should consider using MutableBlock, due to potential data reorganization |
70 | | Status OlapTableBlockConvertor::validate_and_convert_block(RuntimeState* state, Block* input_block, |
71 | | std::shared_ptr<Block>& block, |
72 | | VExprContextSPtrs output_vexpr_ctxs, |
73 | 9 | size_t rows, bool& has_filtered_rows) { |
74 | 9 | DCHECK(input_block->rows() > 0); |
75 | | |
76 | 9 | block = Block::create_shared(input_block->get_columns_with_type_and_name()); |
77 | 9 | if (!output_vexpr_ctxs.empty()) { |
78 | | // Do vectorized expr here to speed up load |
79 | 0 | RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs( |
80 | 0 | output_vexpr_ctxs, *input_block, block.get())); |
81 | 0 | } |
82 | | |
83 | 9 | if (_is_partial_update_and_auto_inc) { |
84 | | // If this load is partial update and this table has a auto inc column, |
85 | | // e.g. table schema: k1, v1, v2(auto inc) |
86 | | // 1. insert columns include auto inc column |
87 | | // e.g. insert into table (k1, v2) value(a, 1); |
88 | | // we do nothing. |
89 | | // 2. insert columns do not include auto inc column |
90 | | // e.g. insert into table (k1, v1) value(a, a); |
91 | | // we need to fill auto_inc_cols by creating a new column. |
92 | 0 | if (!_auto_inc_col_idx.has_value()) { |
93 | 0 | RETURN_IF_ERROR(_partial_update_fill_auto_inc_cols(block.get(), rows)); |
94 | 0 | } |
95 | 9 | } else if (_auto_inc_col_idx.has_value()) { |
96 | | // fill the valus for auto-increment columns |
97 | 0 | DCHECK_EQ(_is_partial_update_and_auto_inc, false); |
98 | 0 | RETURN_IF_ERROR(_fill_auto_inc_cols(block.get(), rows)); |
99 | 0 | } |
100 | | |
101 | 9 | int filtered_rows = 0; |
102 | 9 | { |
103 | 9 | SCOPED_RAW_TIMER(&_validate_data_ns); |
104 | 9 | _filter_map.clear(); |
105 | 9 | _filter_map.resize(rows, 0); |
106 | 9 | auto st = _validate_data(state, block.get(), rows, filtered_rows); |
107 | 9 | _num_filtered_rows += filtered_rows; |
108 | 9 | has_filtered_rows = filtered_rows > 0; |
109 | 9 | if (!st.ok()) { |
110 | 0 | return st; |
111 | 0 | } |
112 | 9 | _convert_to_dest_desc_block(block.get()); |
113 | 9 | } |
114 | | |
115 | 0 | return Status::OK(); |
116 | 9 | } |
117 | | |
118 | | void OlapTableBlockConvertor::init_autoinc_info(int64_t db_id, int64_t table_id, int batch_size, |
119 | | bool is_partial_update_and_auto_inc, |
120 | 2 | int32_t auto_increment_column_unique_id) { |
121 | 2 | _batch_size = batch_size; |
122 | 2 | if (is_partial_update_and_auto_inc) { |
123 | 0 | _is_partial_update_and_auto_inc = is_partial_update_and_auto_inc; |
124 | 0 | _auto_inc_id_buffer = GlobalAutoIncBuffers::GetInstance()->get_auto_inc_buffer( |
125 | 0 | db_id, table_id, auto_increment_column_unique_id); |
126 | 0 | return; |
127 | 0 | } |
128 | 4 | for (size_t idx = 0; idx < _output_tuple_desc->slots().size(); idx++) { |
129 | 2 | if (_output_tuple_desc->slots()[idx]->is_auto_increment()) { |
130 | 0 | _auto_inc_col_idx = idx; |
131 | 0 | _auto_inc_id_buffer = GlobalAutoIncBuffers::GetInstance()->get_auto_inc_buffer( |
132 | 0 | db_id, table_id, _output_tuple_desc->slots()[idx]->col_unique_id()); |
133 | 0 | _auto_inc_id_buffer->set_batch_size_at_least(_batch_size); |
134 | 0 | break; |
135 | 0 | } |
136 | 2 | } |
137 | 2 | } |
138 | | |
139 | | template <bool is_min> |
140 | 0 | DecimalV2Value OlapTableBlockConvertor::_get_decimalv2_min_or_max(const DataTypePtr& type) { |
141 | 0 | std::map<std::pair<int, int>, DecimalV2Value>* pmap; |
142 | 0 | if constexpr (is_min) { |
143 | 0 | pmap = &_min_decimalv2_val; |
144 | 0 | } else { |
145 | 0 | pmap = &_max_decimalv2_val; |
146 | 0 | } |
147 | | |
148 | | // found |
149 | 0 | auto iter = pmap->find( |
150 | 0 | {remove_nullable(type)->get_precision(), remove_nullable(type)->get_scale()}); |
151 | 0 | if (iter != pmap->end()) { |
152 | 0 | return iter->second; |
153 | 0 | } |
154 | | |
155 | | // save min or max DecimalV2Value for next time |
156 | 0 | DecimalV2Value value; |
157 | 0 | if constexpr (is_min) { |
158 | 0 | value.to_min_decimal(type->get_precision(), type->get_scale()); |
159 | 0 | } else { |
160 | 0 | value.to_max_decimal(type->get_precision(), type->get_scale()); |
161 | 0 | } |
162 | 0 | pmap->emplace(std::pair<int, int> {type->get_precision(), type->get_scale()}, value); |
163 | 0 | return value; |
164 | 0 | } Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv2_min_or_maxILb0EEENS_14DecimalV2ValueERKSt10shared_ptrIKNS_9IDataTypeEE Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv2_min_or_maxILb1EEENS_14DecimalV2ValueERKSt10shared_ptrIKNS_9IDataTypeEE |
165 | | |
166 | | template <typename DecimalType, bool IsMin> |
167 | 0 | DecimalType OlapTableBlockConvertor::_get_decimalv3_min_or_max(const DataTypePtr& type) { |
168 | 0 | std::map<int, typename DecimalType::NativeType>* pmap; |
169 | 0 | if constexpr (std::is_same_v<DecimalType, Decimal32>) { |
170 | 0 | pmap = IsMin ? &_min_decimal32_val : &_max_decimal32_val; |
171 | 0 | } else if constexpr (std::is_same_v<DecimalType, Decimal64>) { |
172 | 0 | pmap = IsMin ? &_min_decimal64_val : &_max_decimal64_val; |
173 | 0 | } else if constexpr (std::is_same_v<DecimalType, Decimal128V3>) { |
174 | 0 | pmap = IsMin ? &_min_decimal128_val : &_max_decimal128_val; |
175 | 0 | } else { |
176 | 0 | pmap = IsMin ? &_min_decimal256_val : &_max_decimal256_val; |
177 | 0 | } |
178 | | |
179 | | // found |
180 | 0 | auto iter = pmap->find(type->get_precision()); |
181 | 0 | if (iter != pmap->end()) { |
182 | 0 | return DecimalType(iter->second); |
183 | 0 | } |
184 | | |
185 | 0 | DecimalType value; |
186 | 0 | if constexpr (IsMin) { |
187 | 0 | value = min_decimal_value<DecimalType::PType>(type->get_precision()); |
188 | 0 | } else { |
189 | 0 | value = max_decimal_value<DecimalType::PType>(type->get_precision()); |
190 | 0 | } |
191 | 0 | pmap->emplace(type->get_precision(), value.value); |
192 | 0 | return value; |
193 | 0 | } Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv3_min_or_maxINS_7DecimalIiEELb0EEET_RKSt10shared_ptrIKNS_9IDataTypeEE Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv3_min_or_maxINS_7DecimalIiEELb1EEET_RKSt10shared_ptrIKNS_9IDataTypeEE Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv3_min_or_maxINS_7DecimalIlEELb0EEET_RKSt10shared_ptrIKNS_9IDataTypeEE Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv3_min_or_maxINS_7DecimalIlEELb1EEET_RKSt10shared_ptrIKNS_9IDataTypeEE Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv3_min_or_maxINS_12Decimal128V3ELb0EEET_RKSt10shared_ptrIKNS_9IDataTypeEE Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv3_min_or_maxINS_12Decimal128V3ELb1EEET_RKSt10shared_ptrIKNS_9IDataTypeEE Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv3_min_or_maxINS_7DecimalIN4wide7integerILm256EiEEEELb0EEET_RKSt10shared_ptrIKNS_9IDataTypeEE Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv3_min_or_maxINS_7DecimalIN4wide7integerILm256EiEEEELb1EEET_RKSt10shared_ptrIKNS_9IDataTypeEE |
194 | | |
195 | | Status OlapTableBlockConvertor::_internal_validate_column(RuntimeState* state, Block* block, |
196 | | const DataTypePtr& type, ColumnPtr column, |
197 | | size_t slot_index, |
198 | | fmt::memory_buffer& error_prefix, |
199 | | const size_t row_count, |
200 | 9 | IColumn::Permutation* rows) { |
201 | 9 | DCHECK((rows == nullptr) || (rows->size() == row_count)); |
202 | 9 | fmt::memory_buffer error_msg; |
203 | 9 | auto set_invalid_and_append_error_msg = [&](size_t row) { |
204 | 0 | _filter_map[row] = true; |
205 | 0 | auto ret = state->append_error_msg_to_file([]() -> std::string { return ""; }, |
206 | 0 | [&error_prefix, &error_msg]() -> std::string { |
207 | 0 | return fmt::to_string(error_prefix) + |
208 | 0 | fmt::to_string(error_msg); |
209 | 0 | }); |
210 | 0 | error_msg.clear(); |
211 | 0 | return ret; |
212 | 0 | }; |
213 | | |
214 | 9 | const auto* column_ptr = check_and_get_column<ColumnNullable>(*column); |
215 | 9 | const auto& real_column_ptr = |
216 | 9 | column_ptr == nullptr ? column : (column_ptr->get_nested_column_ptr()); |
217 | 9 | const auto* null_map = column_ptr == nullptr ? nullptr : column_ptr->get_null_map_data().data(); |
218 | 9 | auto need_to_validate = [](size_t j, size_t row, const std::vector<char>& filter_map, |
219 | 9 | const unsigned char* null_map) { |
220 | 0 | return !filter_map[row] && (null_map == nullptr || null_map[j] == 0); |
221 | 0 | }; |
222 | | |
223 | | // may change orig_column if substring function is performed |
224 | 9 | auto string_column_checker = [&state, &error_msg, need_to_validate, |
225 | 9 | set_invalid_and_append_error_msg]( |
226 | 9 | ColumnPtr& orig_column, const DataTypePtr& orig_type, |
227 | 9 | IColumn::Permutation* rows, |
228 | 9 | const std::vector<char>& filter_map) { |
229 | 0 | int limit = config::string_type_length_soft_limit_bytes; |
230 | 0 | int len = -1; |
231 | | // when type.len is negative, std::min will return overflow value, so we need to check it |
232 | 0 | const auto* type_str = |
233 | 0 | check_and_get_data_type<DataTypeString>(remove_nullable(orig_type).get()); |
234 | 0 | if (type_str) { |
235 | 0 | if (type_str->len() >= 0) { |
236 | 0 | len = type_str->len(); |
237 | 0 | limit = std::min(limit, type_str->len()); |
238 | 0 | } |
239 | 0 | } |
240 | |
|
241 | 0 | const auto* tmp_column_ptr = check_and_get_column<ColumnNullable>(*orig_column); |
242 | 0 | const auto& tmp_real_column_ptr = |
243 | 0 | tmp_column_ptr == nullptr ? orig_column : (tmp_column_ptr->get_nested_column_ptr()); |
244 | 0 | const auto* column_string = assert_cast<const ColumnString*>(tmp_real_column_ptr.get()); |
245 | 0 | const auto* null_map = |
246 | 0 | tmp_column_ptr == nullptr ? nullptr : tmp_column_ptr->get_null_map_data().data(); |
247 | |
|
248 | 0 | const auto* __restrict offsets = column_string->get_offsets().data(); |
249 | 0 | int invalid_count = 0; |
250 | 0 | size_t row_count = orig_column->size(); |
251 | 0 | for (int64_t j = 0; j < row_count; ++j) { |
252 | 0 | invalid_count += (offsets[j] - offsets[j - 1]) > limit; |
253 | 0 | } |
254 | |
|
255 | 0 | if (invalid_count) { |
256 | | // For string column, if in non-strict load mode(for both insert stmt and stream load), |
257 | | // truncate the string to schema len. |
258 | | // After truncation, still need to check if byte len of each row exceed the schema len, |
259 | | // because currently the schema len is defined in bytes, and substring works by unit of chars. |
260 | | // This is a workaround for now, need to improve it after better support of multi-byte chars. |
261 | 0 | if (type_str && !state->enable_insert_strict()) { |
262 | 0 | ColumnsWithTypeAndName argument_template; |
263 | 0 | auto input_type = remove_nullable(orig_type); |
264 | 0 | auto pos_type = DataTypeFactory::instance().create_data_type( |
265 | 0 | FieldType::OLAP_FIELD_TYPE_INT, 0, 0); |
266 | 0 | auto len_type = DataTypeFactory::instance().create_data_type( |
267 | 0 | FieldType::OLAP_FIELD_TYPE_INT, 0, 0); |
268 | 0 | argument_template.emplace_back(nullptr, input_type, "string column"); |
269 | 0 | argument_template.emplace_back(nullptr, pos_type, "pos column"); |
270 | 0 | argument_template.emplace_back(nullptr, len_type, "len column"); |
271 | 0 | auto func = SimpleFunctionFactory::instance().get_function( |
272 | 0 | "substring", argument_template, input_type, {}, state->be_exec_version()); |
273 | 0 | if (!func) { |
274 | 0 | return Status::InternalError("get function substring failed"); |
275 | 0 | } |
276 | 0 | auto pos_column = pos_type->create_column_const(row_count, to_field<TYPE_INT>(1)); |
277 | 0 | auto len_column = |
278 | 0 | len_type->create_column_const(row_count, to_field<TYPE_INT>(limit)); |
279 | 0 | Block tmp_block({{remove_nullable(orig_column), input_type, "string column"}, |
280 | 0 | {pos_column, pos_type, "pos"}, |
281 | 0 | {len_column, len_type, "len"}, |
282 | 0 | {nullptr, input_type, "result"}}); |
283 | 0 | RETURN_IF_ERROR(func->execute(nullptr, tmp_block, {0, 1, 2}, 3, row_count)); |
284 | 0 | column_string = |
285 | 0 | assert_cast<const ColumnString*>(tmp_block.get_by_position(3).column.get()); |
286 | 0 | orig_column = |
287 | 0 | orig_column->is_nullable() |
288 | 0 | ? ColumnNullable::create(tmp_block.get_by_position(3).column, |
289 | 0 | tmp_column_ptr->get_null_map_column_ptr()) |
290 | 0 | : std::move(tmp_block.get_by_position(3).column); |
291 | 0 | } |
292 | 0 | for (size_t j = 0; j < row_count; ++j) { |
293 | 0 | auto row = rows ? (*rows)[j] : j; |
294 | 0 | if (need_to_validate(j, row, filter_map, null_map)) { |
295 | 0 | auto str_val = column_string->get_data_at(j); |
296 | 0 | bool invalid = str_val.size > limit; |
297 | 0 | if (invalid) { |
298 | 0 | if (str_val.size > len) { |
299 | 0 | fmt::format_to(error_msg, "{}", |
300 | 0 | "the length of input is too long than schema. "); |
301 | 0 | fmt::format_to(error_msg, "first 32 bytes of input str: [{}] ", |
302 | 0 | str_val.to_prefix(32)); |
303 | 0 | fmt::format_to(error_msg, "schema length: {}; ", len); |
304 | 0 | fmt::format_to(error_msg, "actual length: {}; ", str_val.size); |
305 | 0 | } else if (str_val.size > limit) { |
306 | 0 | fmt::format_to( |
307 | 0 | error_msg, "{}", |
308 | 0 | "the length of input string is too long than vec schema. "); |
309 | 0 | fmt::format_to(error_msg, "first 32 bytes of input str: [{}] ", |
310 | 0 | str_val.to_prefix(32)); |
311 | 0 | fmt::format_to(error_msg, "schema length: {}; ", len); |
312 | 0 | fmt::format_to(error_msg, "limit length: {}; ", limit); |
313 | 0 | fmt::format_to(error_msg, "actual length: {}; ", str_val.size); |
314 | 0 | } |
315 | 0 | RETURN_IF_ERROR(set_invalid_and_append_error_msg(row)); |
316 | 0 | } |
317 | 0 | } |
318 | 0 | } |
319 | 0 | } |
320 | 0 | return Status::OK(); |
321 | 0 | }; |
322 | | |
323 | 9 | switch (type->get_primitive_type()) { |
324 | 0 | case TYPE_CHAR: |
325 | 0 | case TYPE_VARCHAR: |
326 | 0 | case TYPE_STRING: { |
327 | 0 | RETURN_IF_ERROR(string_column_checker(column, type, rows, _filter_map)); |
328 | 0 | block->get_by_position(slot_index).column = std::move(column); |
329 | 0 | break; |
330 | 0 | } |
331 | 0 | case TYPE_JSONB: { |
332 | 0 | const auto* column_string = assert_cast<const ColumnString*>(real_column_ptr.get()); |
333 | 0 | for (size_t j = 0; j < row_count; ++j) { |
334 | 0 | if (!_filter_map[j]) { |
335 | 0 | if (type->is_nullable() && column_ptr && column_ptr->is_null_at(j)) { |
336 | 0 | continue; |
337 | 0 | } |
338 | 0 | auto str_val = column_string->get_data_at(j); |
339 | 0 | bool invalid = str_val.size == 0; |
340 | 0 | if (invalid) { |
341 | 0 | error_msg.clear(); |
342 | 0 | fmt::format_to(error_msg, "{}", "jsonb with size 0 is invalid"); |
343 | 0 | RETURN_IF_ERROR(set_invalid_and_append_error_msg(j)); |
344 | 0 | } |
345 | 0 | } |
346 | 0 | } |
347 | 0 | break; |
348 | 0 | } |
349 | 0 | case TYPE_DECIMALV2: { |
350 | | // column_decimal utilizes the ColumnPtr from the block* block in _validate_data and can be modified. |
351 | 0 | auto* column_decimal = const_cast<ColumnDecimal128V2*>( |
352 | 0 | assert_cast<const ColumnDecimal128V2*>(real_column_ptr.get())); |
353 | 0 | const auto& max_decimalv2 = _get_decimalv2_min_or_max<false>(type); |
354 | 0 | const auto& min_decimalv2 = _get_decimalv2_min_or_max<true>(type); |
355 | 0 | for (size_t j = 0; j < row_count; ++j) { |
356 | 0 | auto row = rows ? (*rows)[j] : j; |
357 | 0 | if (need_to_validate(j, row, _filter_map, null_map)) { |
358 | 0 | auto dec_val = binary_cast<Int128, DecimalV2Value>(column_decimal->get_data()[j]); |
359 | 0 | bool invalid = false; |
360 | |
|
361 | 0 | if (dec_val.greater_than_scale(type->get_scale())) { |
362 | 0 | auto code = |
363 | 0 | dec_val.round(&dec_val, remove_nullable(type)->get_scale(), HALF_UP); |
364 | 0 | column_decimal->get_data()[j] = dec_val; |
365 | |
|
366 | 0 | if (code != E_DEC_OK) { |
367 | 0 | fmt::format_to(error_msg, "round one decimal failed.value={}; ", |
368 | 0 | dec_val.to_string()); |
369 | 0 | invalid = true; |
370 | 0 | } |
371 | 0 | } |
372 | 0 | if (dec_val > max_decimalv2 || dec_val < min_decimalv2) { |
373 | 0 | fmt::format_to(error_msg, "{}", "decimal value is not valid for definition"); |
374 | 0 | fmt::format_to(error_msg, ", value={}", dec_val.to_string()); |
375 | 0 | fmt::format_to(error_msg, ", precision={}, scale={}", type->get_precision(), |
376 | 0 | type->get_scale()); |
377 | 0 | fmt::format_to(error_msg, ", min={}, max={}; ", min_decimalv2.to_string(), |
378 | 0 | max_decimalv2.to_string()); |
379 | 0 | invalid = true; |
380 | 0 | } |
381 | |
|
382 | 0 | if (invalid) { |
383 | 0 | RETURN_IF_ERROR(set_invalid_and_append_error_msg(row)); |
384 | 0 | } |
385 | 0 | } |
386 | 0 | } |
387 | 0 | break; |
388 | 0 | } |
389 | 0 | case TYPE_DECIMAL32: { |
390 | 0 | #define CHECK_VALIDATION_FOR_DECIMALV3(DecimalType) \ |
391 | 0 | auto column_decimal = \ |
392 | 0 | assert_cast<const ColumnDecimal<DecimalType::PType>*>(real_column_ptr.get()); \ |
393 | 0 | const auto& max_decimal = _get_decimalv3_min_or_max<DecimalType, false>(type); \ |
394 | 0 | const auto& min_decimal = _get_decimalv3_min_or_max<DecimalType, true>(type); \ |
395 | 0 | const auto* __restrict datas = column_decimal->get_data().data(); \ |
396 | 0 | int invalid_count = 0; \ |
397 | 0 | for (int j = 0; j < row_count; ++j) { \ |
398 | 0 | const auto dec_val = datas[j]; \ |
399 | 0 | invalid_count += dec_val > max_decimal || dec_val < min_decimal; \ |
400 | 0 | } \ |
401 | 0 | if (invalid_count) { \ |
402 | 0 | for (size_t j = 0; j < row_count; ++j) { \ |
403 | 0 | auto row = rows ? (*rows)[j] : j; \ |
404 | 0 | if (need_to_validate(j, row, _filter_map, null_map)) { \ |
405 | 0 | auto dec_val = column_decimal->get_data()[j]; \ |
406 | 0 | bool invalid = false; \ |
407 | 0 | if (dec_val > max_decimal || dec_val < min_decimal) { \ |
408 | 0 | fmt::format_to(error_msg, "{}", "decimal value is not valid for definition"); \ |
409 | 0 | fmt::format_to(error_msg, ", value={}", dec_val.value); \ |
410 | 0 | fmt::format_to(error_msg, ", precision={}, scale={}", type->get_precision(), \ |
411 | 0 | type->get_scale()); \ |
412 | 0 | fmt::format_to(error_msg, ", min={}, max={}; ", min_decimal.value, \ |
413 | 0 | max_decimal.value); \ |
414 | 0 | invalid = true; \ |
415 | 0 | } \ |
416 | 0 | if (invalid) { \ |
417 | 0 | RETURN_IF_ERROR(set_invalid_and_append_error_msg(row)); \ |
418 | 0 | } \ |
419 | 0 | } \ |
420 | 0 | } \ |
421 | 0 | } |
422 | 0 | CHECK_VALIDATION_FOR_DECIMALV3(Decimal32); |
423 | 0 | break; |
424 | 0 | } |
425 | 0 | case TYPE_DECIMAL64: { |
426 | 0 | CHECK_VALIDATION_FOR_DECIMALV3(Decimal64); |
427 | 0 | break; |
428 | 0 | } |
429 | 0 | case TYPE_DECIMAL128I: { |
430 | 0 | CHECK_VALIDATION_FOR_DECIMALV3(Decimal128V3); |
431 | 0 | break; |
432 | 0 | } |
433 | 0 | case TYPE_DECIMAL256: { |
434 | 0 | CHECK_VALIDATION_FOR_DECIMALV3(Decimal256); |
435 | 0 | break; |
436 | 0 | } |
437 | 0 | #undef CHECK_VALIDATION_FOR_DECIMALV3 |
438 | 0 | case TYPE_ARRAY: { |
439 | 0 | const auto* column_array = assert_cast<const ColumnArray*>(real_column_ptr.get()); |
440 | 0 | const auto* type_array = assert_cast<const DataTypeArray*>(remove_nullable(type).get()); |
441 | 0 | auto nested_type = type_array->get_nested_type(); |
442 | 0 | const auto& offsets = column_array->get_offsets(); |
443 | 0 | IColumn::Permutation permutation(offsets.back()); |
444 | 0 | for (size_t r = 0; r < row_count; ++r) { |
445 | 0 | for (size_t c = offsets[r - 1]; c < offsets[r]; ++c) { |
446 | 0 | permutation[c] = rows ? (*rows)[r] : r; |
447 | 0 | } |
448 | 0 | } |
449 | 0 | fmt::format_to(error_prefix, "ARRAY type failed: "); |
450 | 0 | auto data_column_ptr = column_array->get_data_ptr(); |
451 | 0 | switch (nested_type->get_primitive_type()) { |
452 | 0 | case TYPE_CHAR: |
453 | 0 | case TYPE_VARCHAR: |
454 | 0 | case TYPE_STRING: { |
455 | 0 | RETURN_IF_ERROR( |
456 | 0 | string_column_checker(data_column_ptr, nested_type, &permutation, _filter_map)); |
457 | 0 | const_cast<ColumnArray*>(column_array)->get_data_ptr() = std::move(data_column_ptr); |
458 | 0 | break; |
459 | 0 | } |
460 | 0 | default: |
461 | 0 | RETURN_IF_ERROR(_validate_column(state, block, nested_type, data_column_ptr, slot_index, |
462 | 0 | error_prefix, permutation.size(), &permutation)); |
463 | 0 | break; |
464 | 0 | } |
465 | 0 | break; |
466 | 0 | } |
467 | 0 | case TYPE_MAP: { |
468 | 0 | const auto* column_map = assert_cast<const ColumnMap*>(real_column_ptr.get()); |
469 | | // column_map utilizes the ColumnPtr from the block* block in _validate_data and can be modified. |
470 | 0 | RETURN_IF_ERROR((const_cast<ColumnMap*>(column_map))->deduplicate_keys(true)); |
471 | | |
472 | 0 | const auto* type_map = assert_cast<const DataTypeMap*>(remove_nullable(type).get()); |
473 | 0 | auto key_type = type_map->get_key_type(); |
474 | 0 | auto val_type = type_map->get_value_type(); |
475 | 0 | const auto& offsets = column_map->get_offsets(); |
476 | 0 | IColumn::Permutation permutation(offsets.back()); |
477 | 0 | for (size_t r = 0; r < row_count; ++r) { |
478 | 0 | for (size_t c = offsets[r - 1]; c < offsets[r]; ++c) { |
479 | 0 | permutation[c] = rows ? (*rows)[r] : r; |
480 | 0 | } |
481 | 0 | } |
482 | |
|
483 | 0 | fmt::format_to(error_prefix, "MAP type failed: "); |
484 | 0 | switch (key_type->get_primitive_type()) { |
485 | 0 | case TYPE_CHAR: |
486 | 0 | case TYPE_VARCHAR: |
487 | 0 | case TYPE_STRING: { |
488 | 0 | auto key_column_ptr = column_map->get_keys_ptr(); |
489 | 0 | RETURN_IF_ERROR( |
490 | 0 | string_column_checker(key_column_ptr, key_type, &permutation, _filter_map)); |
491 | 0 | const_cast<ColumnMap*>(column_map)->get_keys_ptr() = std::move(key_column_ptr); |
492 | 0 | break; |
493 | 0 | } |
494 | 0 | default: |
495 | 0 | RETURN_IF_ERROR(_validate_column(state, block, key_type, column_map->get_keys_ptr(), |
496 | 0 | slot_index, error_prefix, permutation.size(), |
497 | 0 | &permutation)); |
498 | 0 | break; |
499 | 0 | } |
500 | | |
501 | 0 | switch (val_type->get_primitive_type()) { |
502 | 0 | case TYPE_CHAR: |
503 | 0 | case TYPE_VARCHAR: |
504 | 0 | case TYPE_STRING: { |
505 | 0 | auto value_column_ptr = column_map->get_values_ptr(); |
506 | 0 | RETURN_IF_ERROR( |
507 | 0 | string_column_checker(value_column_ptr, val_type, &permutation, _filter_map)); |
508 | 0 | const_cast<ColumnMap*>(column_map)->get_values_ptr() = std::move(value_column_ptr); |
509 | 0 | break; |
510 | 0 | } |
511 | 0 | default: |
512 | 0 | RETURN_IF_ERROR(_validate_column(state, block, val_type, column_map->get_values_ptr(), |
513 | 0 | slot_index, error_prefix, permutation.size(), |
514 | 0 | &permutation)); |
515 | 0 | break; |
516 | 0 | } |
517 | 0 | break; |
518 | 0 | } |
519 | 0 | case TYPE_STRUCT: { |
520 | 0 | const auto column_struct = assert_cast<const ColumnStruct*>(real_column_ptr.get()); |
521 | 0 | const auto* type_struct = assert_cast<const DataTypeStruct*>(remove_nullable(type).get()); |
522 | 0 | DCHECK(type_struct->get_elements().size() == column_struct->tuple_size()); |
523 | 0 | fmt::format_to(error_prefix, "STRUCT type failed: "); |
524 | 0 | for (size_t sc = 0; sc < column_struct->tuple_size(); ++sc) { |
525 | 0 | auto element_type = type_struct->get_element(sc); |
526 | 0 | switch (element_type->get_primitive_type()) { |
527 | 0 | case TYPE_CHAR: |
528 | 0 | case TYPE_VARCHAR: |
529 | 0 | case TYPE_STRING: { |
530 | 0 | auto element_column_ptr = column_struct->get_column_ptr(sc); |
531 | 0 | RETURN_IF_ERROR(string_column_checker(element_column_ptr, element_type, nullptr, |
532 | 0 | _filter_map)); |
533 | 0 | const_cast<ColumnStruct*>(column_struct)->get_column_ptr(sc) = |
534 | 0 | std::move(element_column_ptr); |
535 | 0 | break; |
536 | 0 | } |
537 | 0 | default: |
538 | 0 | RETURN_IF_ERROR(_validate_column(state, block, type_struct->get_element(sc), |
539 | 0 | column_struct->get_column_ptr(sc), slot_index, |
540 | 0 | error_prefix, |
541 | 0 | column_struct->get_column_ptr(sc)->size())); |
542 | 0 | break; |
543 | 0 | } |
544 | 0 | } |
545 | 0 | break; |
546 | 0 | } |
547 | 0 | case TYPE_AGG_STATE: { |
548 | 0 | auto* column_string = check_and_get_column<ColumnString>(*real_column_ptr); |
549 | 0 | if (column_string) { |
550 | 0 | RETURN_IF_ERROR(string_column_checker(column, type, rows, _filter_map)); |
551 | 0 | } |
552 | 0 | break; |
553 | 0 | } |
554 | 9 | default: |
555 | 9 | break; |
556 | 9 | } |
557 | | |
558 | | // Dispose the column should do not contain the NULL value |
559 | | // Only two case: |
560 | | // 1. column is nullable but the desc is not nullable |
561 | | // 2. desc->type is BITMAP |
562 | 9 | if ((!type->is_nullable() || type->get_primitive_type() == TYPE_BITMAP) && column_ptr) { |
563 | 0 | for (int j = 0; j < row_count; ++j) { |
564 | 0 | auto row = rows ? (*rows)[j] : j; |
565 | 0 | if (null_map[j] && !_filter_map[row]) { |
566 | 0 | fmt::format_to(error_msg, "null value for not null column, type={}", |
567 | 0 | type->get_name()); |
568 | 0 | RETURN_IF_ERROR(set_invalid_and_append_error_msg(row)); |
569 | 0 | } |
570 | 0 | } |
571 | 0 | } |
572 | | |
573 | 9 | return Status::OK(); |
574 | 9 | } |
575 | | |
576 | | Status OlapTableBlockConvertor::_validate_data(RuntimeState* state, Block* block, const size_t rows, |
577 | 9 | int& filtered_rows) { |
578 | 9 | filtered_rows = 0; |
579 | 9 | Defer defer {[&] { |
580 | 26 | for (int i = 0; i < rows; ++i) { |
581 | 17 | filtered_rows += _filter_map[i]; |
582 | 17 | } |
583 | 9 | }}; |
584 | 18 | for (int i = 0; i < _output_tuple_desc->slots().size(); ++i) { |
585 | 9 | SlotDescriptor* desc = _output_tuple_desc->slots()[i]; |
586 | 9 | block->get_by_position(i).column = |
587 | 9 | block->get_by_position(i).column->convert_to_full_column_if_const(); |
588 | 9 | const auto& column = block->get_by_position(i).column; |
589 | | |
590 | 9 | fmt::memory_buffer error_prefix; |
591 | 9 | fmt::format_to(error_prefix, "column_name[{}], ", desc->col_name()); |
592 | 9 | RETURN_IF_ERROR( |
593 | 9 | _validate_column(state, block, desc->type(), column, i, error_prefix, rows)); |
594 | 9 | } |
595 | 9 | return Status::OK(); |
596 | 9 | } |
597 | | |
598 | 9 | void OlapTableBlockConvertor::_convert_to_dest_desc_block(doris::Block* block) { |
599 | 18 | for (int i = 0; i < _output_tuple_desc->slots().size() && i < block->columns(); ++i) { |
600 | 9 | SlotDescriptor* desc = _output_tuple_desc->slots()[i]; |
601 | 9 | if (desc->is_nullable() != block->get_by_position(i).type->is_nullable()) { |
602 | 0 | if (desc->is_nullable()) { |
603 | 0 | block->get_by_position(i).type = make_nullable(block->get_by_position(i).type); |
604 | 0 | block->get_by_position(i).column = make_nullable(block->get_by_position(i).column); |
605 | 0 | } else { |
606 | 0 | block->get_by_position(i).type = |
607 | 0 | assert_cast<const DataTypeNullable&>(*block->get_by_position(i).type) |
608 | 0 | .get_nested_type(); |
609 | 0 | block->get_by_position(i).column = |
610 | 0 | assert_cast<const ColumnNullable&>(*block->get_by_position(i).column) |
611 | 0 | .get_nested_column_ptr(); |
612 | 0 | } |
613 | 0 | } |
614 | 9 | } |
615 | 9 | } |
616 | | |
617 | 0 | Status OlapTableBlockConvertor::_fill_auto_inc_cols(Block* block, size_t rows) { |
618 | 0 | size_t idx = _auto_inc_col_idx.value(); |
619 | 0 | SlotDescriptor* slot = _output_tuple_desc->slots()[idx]; |
620 | 0 | DCHECK(slot->type()->get_primitive_type() == PrimitiveType::TYPE_BIGINT); |
621 | 0 | DCHECK(!slot->is_nullable()); |
622 | |
|
623 | 0 | size_t null_value_count = 0; |
624 | 0 | auto dst_column = ColumnInt64::create(); |
625 | 0 | ColumnInt64::Container& dst_values = dst_column->get_data(); |
626 | |
|
627 | 0 | ColumnPtr src_column_ptr = block->get_by_position(idx).column; |
628 | 0 | if (const auto* const_column = check_and_get_column<ColumnConst>(src_column_ptr.get())) { |
629 | | // for insert stmt like "insert into tbl1 select null,col1,col2,... from tbl2" or |
630 | | // "insert into tbl1 select 1,col1,col2,... from tbl2", the type of literal's column |
631 | | // will be `ColumnConst` |
632 | 0 | if (const_column->is_null_at(0)) { |
633 | | // the input of autoinc column are all null literals |
634 | | // fill the column with generated ids |
635 | 0 | null_value_count = rows; |
636 | 0 | std::vector<std::pair<int64_t, size_t>> res; |
637 | 0 | RETURN_IF_ERROR(_auto_inc_id_buffer->sync_request_ids(null_value_count, &res)); |
638 | 0 | for (auto [start, length] : res) { |
639 | 0 | _auto_inc_id_allocator.insert_ids(start, length); |
640 | 0 | } |
641 | |
|
642 | 0 | for (size_t i = 0; i < rows; i++) { |
643 | 0 | dst_values.emplace_back(_auto_inc_id_allocator.next_id()); |
644 | 0 | } |
645 | 0 | } else { |
646 | | // the input of autoinc column are all int64 literals |
647 | | // fill the column with that literal |
648 | 0 | int64_t value = const_column->get_int(0); |
649 | 0 | dst_values.resize_fill(rows, value); |
650 | 0 | } |
651 | 0 | } else if (const auto* src_nullable_column = |
652 | 0 | check_and_get_column<ColumnNullable>(src_column_ptr.get())) { |
653 | 0 | auto src_nested_column_ptr = src_nullable_column->get_nested_column_ptr(); |
654 | 0 | const auto& null_map_data = src_nullable_column->get_null_map_data(); |
655 | 0 | dst_values.reserve(rows); |
656 | 0 | for (size_t i = 0; i < rows; i++) { |
657 | 0 | null_value_count += null_map_data[i]; |
658 | 0 | } |
659 | 0 | std::vector<std::pair<int64_t, size_t>> res; |
660 | 0 | RETURN_IF_ERROR(_auto_inc_id_buffer->sync_request_ids(null_value_count, &res)); |
661 | 0 | for (auto [start, length] : res) { |
662 | 0 | _auto_inc_id_allocator.insert_ids(start, length); |
663 | 0 | } |
664 | |
|
665 | 0 | for (size_t i = 0; i < rows; i++) { |
666 | 0 | dst_values.emplace_back((null_map_data[i] != 0) ? _auto_inc_id_allocator.next_id() |
667 | 0 | : src_nested_column_ptr->get_int(i)); |
668 | 0 | } |
669 | 0 | } else { |
670 | 0 | return Status::OK(); |
671 | 0 | } |
672 | 0 | block->get_by_position(idx).column = std::move(dst_column); |
673 | 0 | block->get_by_position(idx).type = remove_nullable(slot->type()); |
674 | 0 | return Status::OK(); |
675 | 0 | } |
676 | | |
677 | 0 | Status OlapTableBlockConvertor::_partial_update_fill_auto_inc_cols(Block* block, size_t rows) { |
678 | 0 | auto dst_column = ColumnInt64::create(); |
679 | 0 | ColumnInt64::Container& dst_values = dst_column->get_data(); |
680 | 0 | size_t null_value_count = rows; |
681 | 0 | std::vector<std::pair<int64_t, size_t>> res; |
682 | 0 | RETURN_IF_ERROR(_auto_inc_id_buffer->sync_request_ids(null_value_count, &res)); |
683 | 0 | for (auto [start, length] : res) { |
684 | 0 | _auto_inc_id_allocator.insert_ids(start, length); |
685 | 0 | } |
686 | |
|
687 | 0 | for (size_t i = 0; i < rows; i++) { |
688 | 0 | dst_values.emplace_back(_auto_inc_id_allocator.next_id()); |
689 | 0 | } |
690 | 0 | block->insert(ColumnWithTypeAndName(std::move(dst_column), std::make_shared<DataTypeInt64>(), |
691 | 0 | BeConsts::PARTIAL_UPDATE_AUTO_INC_COL)); |
692 | 0 | return Status::OK(); |
693 | 0 | } |
694 | | |
695 | | } // namespace doris |