be/src/exec/sink/vtablet_block_convertor.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/sink/vtablet_block_convertor.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/FrontendService.h> |
22 | | #include <glog/logging.h> |
23 | | #include <google/protobuf/stubs/common.h> |
24 | | |
25 | | #include <algorithm> |
26 | | #include <memory> |
27 | | #include <string> |
28 | | #include <unordered_map> |
29 | | #include <utility> |
30 | | |
31 | | #include "common/compiler_util.h" // IWYU pragma: keep |
32 | | #include "common/consts.h" |
33 | | #include "common/status.h" |
34 | | #include "core/assert_cast.h" |
35 | | #include "core/binary_cast.hpp" |
36 | | #include "core/block/block.h" |
37 | | #include "core/column/column.h" |
38 | | #include "core/column/column_array.h" |
39 | | #include "core/column/column_const.h" |
40 | | #include "core/column/column_decimal.h" |
41 | | #include "core/column/column_map.h" |
42 | | #include "core/column/column_nullable.h" |
43 | | #include "core/column/column_string.h" |
44 | | #include "core/column/column_struct.h" |
45 | | #include "core/data_type/data_type.h" |
46 | | #include "core/data_type/data_type_array.h" |
47 | | #include "core/data_type/data_type_decimal.h" |
48 | | #include "core/data_type/data_type_factory.hpp" |
49 | | #include "core/data_type/data_type_map.h" |
50 | | #include "core/data_type/data_type_nullable.h" |
51 | | #include "core/data_type/data_type_struct.h" |
52 | | #include "core/data_type/define_primitive_type.h" |
53 | | #include "core/data_type/primitive_type.h" |
54 | | #include "core/types.h" |
55 | | #include "core/wide_integer_to_string.h" |
56 | | #include "exprs/function/function_helpers.h" |
57 | | #include "exprs/function/simple_function_factory.h" |
58 | | #include "exprs/vexpr.h" |
59 | | #include "exprs/vexpr_context.h" |
60 | | #include "runtime/descriptors.h" |
61 | | #include "runtime/runtime_state.h" |
62 | | #include "service/brpc.h" |
63 | | #include "storage/olap_common.h" |
64 | | #include "util/brpc_client_cache.h" |
65 | | #include "util/thread.h" |
66 | | |
67 | | namespace doris { |
68 | | #include "common/compile_check_begin.h" |
69 | | |
70 | | // !FIXME: Here we should consider using MutableBlock, due to potential data reorganization |
71 | | Status OlapTableBlockConvertor::validate_and_convert_block(RuntimeState* state, Block* input_block, |
72 | | std::shared_ptr<Block>& block, |
73 | | VExprContextSPtrs output_vexpr_ctxs, |
74 | 9 | size_t rows, bool& has_filtered_rows) { |
75 | 9 | DCHECK(input_block->rows() > 0); |
76 | | |
77 | 9 | block = Block::create_shared(input_block->get_columns_with_type_and_name()); |
78 | 9 | if (!output_vexpr_ctxs.empty()) { |
79 | | // Do vectorized expr here to speed up load |
80 | 0 | RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs( |
81 | 0 | output_vexpr_ctxs, *input_block, block.get())); |
82 | 0 | } |
83 | | |
84 | 9 | if (_is_partial_update_and_auto_inc) { |
85 | | // If this load is partial update and this table has a auto inc column, |
86 | | // e.g. table schema: k1, v1, v2(auto inc) |
87 | | // 1. insert columns include auto inc column |
88 | | // e.g. insert into table (k1, v2) value(a, 1); |
89 | | // we do nothing. |
90 | | // 2. insert columns do not include auto inc column |
91 | | // e.g. insert into table (k1, v1) value(a, a); |
92 | | // we need to fill auto_inc_cols by creating a new column. |
93 | 0 | if (!_auto_inc_col_idx.has_value()) { |
94 | 0 | RETURN_IF_ERROR(_partial_update_fill_auto_inc_cols(block.get(), rows)); |
95 | 0 | } |
96 | 9 | } else if (_auto_inc_col_idx.has_value()) { |
97 | | // fill the valus for auto-increment columns |
98 | 0 | DCHECK_EQ(_is_partial_update_and_auto_inc, false); |
99 | 0 | RETURN_IF_ERROR(_fill_auto_inc_cols(block.get(), rows)); |
100 | 0 | } |
101 | | |
102 | 9 | int filtered_rows = 0; |
103 | 9 | { |
104 | 9 | SCOPED_RAW_TIMER(&_validate_data_ns); |
105 | 9 | _filter_map.clear(); |
106 | 9 | _filter_map.resize(rows, 0); |
107 | 9 | auto st = _validate_data(state, block.get(), rows, filtered_rows); |
108 | 9 | _num_filtered_rows += filtered_rows; |
109 | 9 | has_filtered_rows = filtered_rows > 0; |
110 | 9 | if (!st.ok()) { |
111 | 0 | return st; |
112 | 0 | } |
113 | 9 | _convert_to_dest_desc_block(block.get()); |
114 | 9 | } |
115 | | |
116 | 0 | return Status::OK(); |
117 | 9 | } |
118 | | |
119 | | void OlapTableBlockConvertor::init_autoinc_info(int64_t db_id, int64_t table_id, int batch_size, |
120 | | bool is_partial_update_and_auto_inc, |
121 | 2 | int32_t auto_increment_column_unique_id) { |
122 | 2 | _batch_size = batch_size; |
123 | 2 | if (is_partial_update_and_auto_inc) { |
124 | 0 | _is_partial_update_and_auto_inc = is_partial_update_and_auto_inc; |
125 | 0 | _auto_inc_id_buffer = GlobalAutoIncBuffers::GetInstance()->get_auto_inc_buffer( |
126 | 0 | db_id, table_id, auto_increment_column_unique_id); |
127 | 0 | return; |
128 | 0 | } |
129 | 4 | for (size_t idx = 0; idx < _output_tuple_desc->slots().size(); idx++) { |
130 | 2 | if (_output_tuple_desc->slots()[idx]->is_auto_increment()) { |
131 | 0 | _auto_inc_col_idx = idx; |
132 | 0 | _auto_inc_id_buffer = GlobalAutoIncBuffers::GetInstance()->get_auto_inc_buffer( |
133 | 0 | db_id, table_id, _output_tuple_desc->slots()[idx]->col_unique_id()); |
134 | 0 | _auto_inc_id_buffer->set_batch_size_at_least(_batch_size); |
135 | 0 | break; |
136 | 0 | } |
137 | 2 | } |
138 | 2 | } |
139 | | |
140 | | template <bool is_min> |
141 | 0 | DecimalV2Value OlapTableBlockConvertor::_get_decimalv2_min_or_max(const DataTypePtr& type) { |
142 | 0 | std::map<std::pair<int, int>, DecimalV2Value>* pmap; |
143 | 0 | if constexpr (is_min) { |
144 | 0 | pmap = &_min_decimalv2_val; |
145 | 0 | } else { |
146 | 0 | pmap = &_max_decimalv2_val; |
147 | 0 | } |
148 | | |
149 | | // found |
150 | 0 | auto iter = pmap->find( |
151 | 0 | {remove_nullable(type)->get_precision(), remove_nullable(type)->get_scale()}); |
152 | 0 | if (iter != pmap->end()) { |
153 | 0 | return iter->second; |
154 | 0 | } |
155 | | |
156 | | // save min or max DecimalV2Value for next time |
157 | 0 | DecimalV2Value value; |
158 | 0 | if constexpr (is_min) { |
159 | 0 | value.to_min_decimal(type->get_precision(), type->get_scale()); |
160 | 0 | } else { |
161 | 0 | value.to_max_decimal(type->get_precision(), type->get_scale()); |
162 | 0 | } |
163 | 0 | pmap->emplace(std::pair<int, int> {type->get_precision(), type->get_scale()}, value); |
164 | 0 | return value; |
165 | 0 | } Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv2_min_or_maxILb0EEENS_14DecimalV2ValueERKSt10shared_ptrIKNS_9IDataTypeEE Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv2_min_or_maxILb1EEENS_14DecimalV2ValueERKSt10shared_ptrIKNS_9IDataTypeEE |
166 | | |
167 | | template <typename DecimalType, bool IsMin> |
168 | 0 | DecimalType OlapTableBlockConvertor::_get_decimalv3_min_or_max(const DataTypePtr& type) { |
169 | 0 | std::map<int, typename DecimalType::NativeType>* pmap; |
170 | 0 | if constexpr (std::is_same_v<DecimalType, Decimal32>) { |
171 | 0 | pmap = IsMin ? &_min_decimal32_val : &_max_decimal32_val; |
172 | 0 | } else if constexpr (std::is_same_v<DecimalType, Decimal64>) { |
173 | 0 | pmap = IsMin ? &_min_decimal64_val : &_max_decimal64_val; |
174 | 0 | } else if constexpr (std::is_same_v<DecimalType, Decimal128V3>) { |
175 | 0 | pmap = IsMin ? &_min_decimal128_val : &_max_decimal128_val; |
176 | 0 | } else { |
177 | 0 | pmap = IsMin ? &_min_decimal256_val : &_max_decimal256_val; |
178 | 0 | } |
179 | | |
180 | | // found |
181 | 0 | auto iter = pmap->find(type->get_precision()); |
182 | 0 | if (iter != pmap->end()) { |
183 | 0 | return DecimalType(iter->second); |
184 | 0 | } |
185 | | |
186 | 0 | DecimalType value; |
187 | 0 | if constexpr (IsMin) { |
188 | 0 | value = min_decimal_value<DecimalType::PType>(type->get_precision()); |
189 | 0 | } else { |
190 | 0 | value = max_decimal_value<DecimalType::PType>(type->get_precision()); |
191 | 0 | } |
192 | 0 | pmap->emplace(type->get_precision(), value.value); |
193 | 0 | return value; |
194 | 0 | } Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv3_min_or_maxINS_7DecimalIiEELb0EEET_RKSt10shared_ptrIKNS_9IDataTypeEE Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv3_min_or_maxINS_7DecimalIiEELb1EEET_RKSt10shared_ptrIKNS_9IDataTypeEE Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv3_min_or_maxINS_7DecimalIlEELb0EEET_RKSt10shared_ptrIKNS_9IDataTypeEE Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv3_min_or_maxINS_7DecimalIlEELb1EEET_RKSt10shared_ptrIKNS_9IDataTypeEE Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv3_min_or_maxINS_12Decimal128V3ELb0EEET_RKSt10shared_ptrIKNS_9IDataTypeEE Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv3_min_or_maxINS_12Decimal128V3ELb1EEET_RKSt10shared_ptrIKNS_9IDataTypeEE Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv3_min_or_maxINS_7DecimalIN4wide7integerILm256EiEEEELb0EEET_RKSt10shared_ptrIKNS_9IDataTypeEE Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv3_min_or_maxINS_7DecimalIN4wide7integerILm256EiEEEELb1EEET_RKSt10shared_ptrIKNS_9IDataTypeEE |
195 | | |
196 | | Status OlapTableBlockConvertor::_internal_validate_column(RuntimeState* state, Block* block, |
197 | | const DataTypePtr& type, ColumnPtr column, |
198 | | size_t slot_index, |
199 | | fmt::memory_buffer& error_prefix, |
200 | | const size_t row_count, |
201 | 9 | IColumn::Permutation* rows) { |
202 | 9 | DCHECK((rows == nullptr) || (rows->size() == row_count)); |
203 | 9 | fmt::memory_buffer error_msg; |
204 | 9 | auto set_invalid_and_append_error_msg = [&](size_t row) { |
205 | 0 | _filter_map[row] = true; |
206 | 0 | auto ret = state->append_error_msg_to_file([]() -> std::string { return ""; }, |
207 | 0 | [&error_prefix, &error_msg]() -> std::string { |
208 | 0 | return fmt::to_string(error_prefix) + |
209 | 0 | fmt::to_string(error_msg); |
210 | 0 | }); |
211 | 0 | error_msg.clear(); |
212 | 0 | return ret; |
213 | 0 | }; |
214 | | |
215 | 9 | const auto* column_ptr = check_and_get_column<ColumnNullable>(*column); |
216 | 9 | const auto& real_column_ptr = |
217 | 9 | column_ptr == nullptr ? column : (column_ptr->get_nested_column_ptr()); |
218 | 9 | const auto* null_map = column_ptr == nullptr ? nullptr : column_ptr->get_null_map_data().data(); |
219 | 9 | auto need_to_validate = [](size_t j, size_t row, const std::vector<char>& filter_map, |
220 | 9 | const unsigned char* null_map) { |
221 | 0 | return !filter_map[row] && (null_map == nullptr || null_map[j] == 0); |
222 | 0 | }; |
223 | | |
224 | | // may change orig_column if substring function is performed |
225 | 9 | auto string_column_checker = [&state, &error_msg, need_to_validate, |
226 | 9 | set_invalid_and_append_error_msg]( |
227 | 9 | ColumnPtr& orig_column, const DataTypePtr& orig_type, |
228 | 9 | IColumn::Permutation* rows, |
229 | 9 | const std::vector<char>& filter_map) { |
230 | 0 | int limit = config::string_type_length_soft_limit_bytes; |
231 | 0 | int len = -1; |
232 | | // when type.len is negative, std::min will return overflow value, so we need to check it |
233 | 0 | const auto* type_str = |
234 | 0 | check_and_get_data_type<DataTypeString>(remove_nullable(orig_type).get()); |
235 | 0 | if (type_str) { |
236 | 0 | if (type_str->len() >= 0) { |
237 | 0 | len = type_str->len(); |
238 | 0 | limit = std::min(limit, type_str->len()); |
239 | 0 | } |
240 | 0 | } |
241 | |
|
242 | 0 | const auto* tmp_column_ptr = check_and_get_column<ColumnNullable>(*orig_column); |
243 | 0 | const auto& tmp_real_column_ptr = |
244 | 0 | tmp_column_ptr == nullptr ? orig_column : (tmp_column_ptr->get_nested_column_ptr()); |
245 | 0 | const auto* column_string = assert_cast<const ColumnString*>(tmp_real_column_ptr.get()); |
246 | 0 | const auto* null_map = |
247 | 0 | tmp_column_ptr == nullptr ? nullptr : tmp_column_ptr->get_null_map_data().data(); |
248 | |
|
249 | 0 | const auto* __restrict offsets = column_string->get_offsets().data(); |
250 | 0 | int invalid_count = 0; |
251 | 0 | size_t row_count = orig_column->size(); |
252 | 0 | for (int64_t j = 0; j < row_count; ++j) { |
253 | 0 | invalid_count += (offsets[j] - offsets[j - 1]) > limit; |
254 | 0 | } |
255 | |
|
256 | 0 | if (invalid_count) { |
257 | | // For string column, if in non-strict load mode(for both insert stmt and stream load), |
258 | | // truncate the string to schema len. |
259 | | // After truncation, still need to check if byte len of each row exceed the schema len, |
260 | | // because currently the schema len is defined in bytes, and substring works by unit of chars. |
261 | | // This is a workaround for now, need to improve it after better support of multi-byte chars. |
262 | 0 | if (type_str && !state->enable_insert_strict()) { |
263 | 0 | ColumnsWithTypeAndName argument_template; |
264 | 0 | auto input_type = remove_nullable(orig_type); |
265 | 0 | auto pos_type = DataTypeFactory::instance().create_data_type( |
266 | 0 | FieldType::OLAP_FIELD_TYPE_INT, 0, 0); |
267 | 0 | auto len_type = DataTypeFactory::instance().create_data_type( |
268 | 0 | FieldType::OLAP_FIELD_TYPE_INT, 0, 0); |
269 | 0 | argument_template.emplace_back(nullptr, input_type, "string column"); |
270 | 0 | argument_template.emplace_back(nullptr, pos_type, "pos column"); |
271 | 0 | argument_template.emplace_back(nullptr, len_type, "len column"); |
272 | 0 | auto func = SimpleFunctionFactory::instance().get_function( |
273 | 0 | "substring", argument_template, input_type, {}, state->be_exec_version()); |
274 | 0 | if (!func) { |
275 | 0 | return Status::InternalError("get function substring failed"); |
276 | 0 | } |
277 | 0 | auto pos_column = pos_type->create_column_const(row_count, to_field<TYPE_INT>(1)); |
278 | 0 | auto len_column = |
279 | 0 | len_type->create_column_const(row_count, to_field<TYPE_INT>(limit)); |
280 | 0 | Block tmp_block({{remove_nullable(orig_column), input_type, "string column"}, |
281 | 0 | {pos_column, pos_type, "pos"}, |
282 | 0 | {len_column, len_type, "len"}, |
283 | 0 | {nullptr, input_type, "result"}}); |
284 | 0 | RETURN_IF_ERROR(func->execute(nullptr, tmp_block, {0, 1, 2}, 3, row_count)); |
285 | 0 | column_string = |
286 | 0 | assert_cast<const ColumnString*>(tmp_block.get_by_position(3).column.get()); |
287 | 0 | orig_column = |
288 | 0 | orig_column->is_nullable() |
289 | 0 | ? ColumnNullable::create(tmp_block.get_by_position(3).column, |
290 | 0 | tmp_column_ptr->get_null_map_column_ptr()) |
291 | 0 | : std::move(tmp_block.get_by_position(3).column); |
292 | 0 | } |
293 | 0 | for (size_t j = 0; j < row_count; ++j) { |
294 | 0 | auto row = rows ? (*rows)[j] : j; |
295 | 0 | if (need_to_validate(j, row, filter_map, null_map)) { |
296 | 0 | auto str_val = column_string->get_data_at(j); |
297 | 0 | bool invalid = str_val.size > limit; |
298 | 0 | if (invalid) { |
299 | 0 | if (str_val.size > len) { |
300 | 0 | fmt::format_to(error_msg, "{}", |
301 | 0 | "the length of input is too long than schema. "); |
302 | 0 | fmt::format_to(error_msg, "first 32 bytes of input str: [{}] ", |
303 | 0 | str_val.to_prefix(32)); |
304 | 0 | fmt::format_to(error_msg, "schema length: {}; ", len); |
305 | 0 | fmt::format_to(error_msg, "actual length: {}; ", str_val.size); |
306 | 0 | } else if (str_val.size > limit) { |
307 | 0 | fmt::format_to( |
308 | 0 | error_msg, "{}", |
309 | 0 | "the length of input string is too long than vec schema. "); |
310 | 0 | fmt::format_to(error_msg, "first 32 bytes of input str: [{}] ", |
311 | 0 | str_val.to_prefix(32)); |
312 | 0 | fmt::format_to(error_msg, "schema length: {}; ", len); |
313 | 0 | fmt::format_to(error_msg, "limit length: {}; ", limit); |
314 | 0 | fmt::format_to(error_msg, "actual length: {}; ", str_val.size); |
315 | 0 | } |
316 | 0 | RETURN_IF_ERROR(set_invalid_and_append_error_msg(row)); |
317 | 0 | } |
318 | 0 | } |
319 | 0 | } |
320 | 0 | } |
321 | 0 | return Status::OK(); |
322 | 0 | }; |
323 | | |
324 | 9 | switch (type->get_primitive_type()) { |
325 | 0 | case TYPE_CHAR: |
326 | 0 | case TYPE_VARCHAR: |
327 | 0 | case TYPE_STRING: { |
328 | 0 | RETURN_IF_ERROR(string_column_checker(column, type, rows, _filter_map)); |
329 | 0 | block->get_by_position(slot_index).column = std::move(column); |
330 | 0 | break; |
331 | 0 | } |
332 | 0 | case TYPE_JSONB: { |
333 | 0 | const auto* column_string = assert_cast<const ColumnString*>(real_column_ptr.get()); |
334 | 0 | for (size_t j = 0; j < row_count; ++j) { |
335 | 0 | if (!_filter_map[j]) { |
336 | 0 | if (type->is_nullable() && column_ptr && column_ptr->is_null_at(j)) { |
337 | 0 | continue; |
338 | 0 | } |
339 | 0 | auto str_val = column_string->get_data_at(j); |
340 | 0 | bool invalid = str_val.size == 0; |
341 | 0 | if (invalid) { |
342 | 0 | error_msg.clear(); |
343 | 0 | fmt::format_to(error_msg, "{}", "jsonb with size 0 is invalid"); |
344 | 0 | RETURN_IF_ERROR(set_invalid_and_append_error_msg(j)); |
345 | 0 | } |
346 | 0 | } |
347 | 0 | } |
348 | 0 | break; |
349 | 0 | } |
350 | 0 | case TYPE_DECIMALV2: { |
351 | | // column_decimal utilizes the ColumnPtr from the block* block in _validate_data and can be modified. |
352 | 0 | auto* column_decimal = const_cast<ColumnDecimal128V2*>( |
353 | 0 | assert_cast<const ColumnDecimal128V2*>(real_column_ptr.get())); |
354 | 0 | const auto& max_decimalv2 = _get_decimalv2_min_or_max<false>(type); |
355 | 0 | const auto& min_decimalv2 = _get_decimalv2_min_or_max<true>(type); |
356 | 0 | for (size_t j = 0; j < row_count; ++j) { |
357 | 0 | auto row = rows ? (*rows)[j] : j; |
358 | 0 | if (need_to_validate(j, row, _filter_map, null_map)) { |
359 | 0 | auto dec_val = binary_cast<Int128, DecimalV2Value>(column_decimal->get_data()[j]); |
360 | 0 | bool invalid = false; |
361 | |
|
362 | 0 | if (dec_val.greater_than_scale(type->get_scale())) { |
363 | 0 | auto code = |
364 | 0 | dec_val.round(&dec_val, remove_nullable(type)->get_scale(), HALF_UP); |
365 | 0 | column_decimal->get_data()[j] = dec_val; |
366 | |
|
367 | 0 | if (code != E_DEC_OK) { |
368 | 0 | fmt::format_to(error_msg, "round one decimal failed.value={}; ", |
369 | 0 | dec_val.to_string()); |
370 | 0 | invalid = true; |
371 | 0 | } |
372 | 0 | } |
373 | 0 | if (dec_val > max_decimalv2 || dec_val < min_decimalv2) { |
374 | 0 | fmt::format_to(error_msg, "{}", "decimal value is not valid for definition"); |
375 | 0 | fmt::format_to(error_msg, ", value={}", dec_val.to_string()); |
376 | 0 | fmt::format_to(error_msg, ", precision={}, scale={}", type->get_precision(), |
377 | 0 | type->get_scale()); |
378 | 0 | fmt::format_to(error_msg, ", min={}, max={}; ", min_decimalv2.to_string(), |
379 | 0 | max_decimalv2.to_string()); |
380 | 0 | invalid = true; |
381 | 0 | } |
382 | |
|
383 | 0 | if (invalid) { |
384 | 0 | RETURN_IF_ERROR(set_invalid_and_append_error_msg(row)); |
385 | 0 | } |
386 | 0 | } |
387 | 0 | } |
388 | 0 | break; |
389 | 0 | } |
390 | 0 | case TYPE_DECIMAL32: { |
391 | 0 | #define CHECK_VALIDATION_FOR_DECIMALV3(DecimalType) \ |
392 | 0 | auto column_decimal = \ |
393 | 0 | assert_cast<const ColumnDecimal<DecimalType::PType>*>(real_column_ptr.get()); \ |
394 | 0 | const auto& max_decimal = _get_decimalv3_min_or_max<DecimalType, false>(type); \ |
395 | 0 | const auto& min_decimal = _get_decimalv3_min_or_max<DecimalType, true>(type); \ |
396 | 0 | const auto* __restrict datas = column_decimal->get_data().data(); \ |
397 | 0 | int invalid_count = 0; \ |
398 | 0 | for (int j = 0; j < row_count; ++j) { \ |
399 | 0 | const auto dec_val = datas[j]; \ |
400 | 0 | invalid_count += dec_val > max_decimal || dec_val < min_decimal; \ |
401 | 0 | } \ |
402 | 0 | if (invalid_count) { \ |
403 | 0 | for (size_t j = 0; j < row_count; ++j) { \ |
404 | 0 | auto row = rows ? (*rows)[j] : j; \ |
405 | 0 | if (need_to_validate(j, row, _filter_map, null_map)) { \ |
406 | 0 | auto dec_val = column_decimal->get_data()[j]; \ |
407 | 0 | bool invalid = false; \ |
408 | 0 | if (dec_val > max_decimal || dec_val < min_decimal) { \ |
409 | 0 | fmt::format_to(error_msg, "{}", "decimal value is not valid for definition"); \ |
410 | 0 | fmt::format_to(error_msg, ", value={}", dec_val.value); \ |
411 | 0 | fmt::format_to(error_msg, ", precision={}, scale={}", type->get_precision(), \ |
412 | 0 | type->get_scale()); \ |
413 | 0 | fmt::format_to(error_msg, ", min={}, max={}; ", min_decimal.value, \ |
414 | 0 | max_decimal.value); \ |
415 | 0 | invalid = true; \ |
416 | 0 | } \ |
417 | 0 | if (invalid) { \ |
418 | 0 | RETURN_IF_ERROR(set_invalid_and_append_error_msg(row)); \ |
419 | 0 | } \ |
420 | 0 | } \ |
421 | 0 | } \ |
422 | 0 | } |
423 | 0 | CHECK_VALIDATION_FOR_DECIMALV3(Decimal32); |
424 | 0 | break; |
425 | 0 | } |
426 | 0 | case TYPE_DECIMAL64: { |
427 | 0 | CHECK_VALIDATION_FOR_DECIMALV3(Decimal64); |
428 | 0 | break; |
429 | 0 | } |
430 | 0 | case TYPE_DECIMAL128I: { |
431 | 0 | CHECK_VALIDATION_FOR_DECIMALV3(Decimal128V3); |
432 | 0 | break; |
433 | 0 | } |
434 | 0 | case TYPE_DECIMAL256: { |
435 | 0 | CHECK_VALIDATION_FOR_DECIMALV3(Decimal256); |
436 | 0 | break; |
437 | 0 | } |
438 | 0 | #undef CHECK_VALIDATION_FOR_DECIMALV3 |
439 | 0 | case TYPE_ARRAY: { |
440 | 0 | const auto* column_array = assert_cast<const ColumnArray*>(real_column_ptr.get()); |
441 | 0 | const auto* type_array = assert_cast<const DataTypeArray*>(remove_nullable(type).get()); |
442 | 0 | auto nested_type = type_array->get_nested_type(); |
443 | 0 | const auto& offsets = column_array->get_offsets(); |
444 | 0 | IColumn::Permutation permutation(offsets.back()); |
445 | 0 | for (size_t r = 0; r < row_count; ++r) { |
446 | 0 | for (size_t c = offsets[r - 1]; c < offsets[r]; ++c) { |
447 | 0 | permutation[c] = rows ? (*rows)[r] : r; |
448 | 0 | } |
449 | 0 | } |
450 | 0 | fmt::format_to(error_prefix, "ARRAY type failed: "); |
451 | 0 | auto data_column_ptr = column_array->get_data_ptr(); |
452 | 0 | switch (nested_type->get_primitive_type()) { |
453 | 0 | case TYPE_CHAR: |
454 | 0 | case TYPE_VARCHAR: |
455 | 0 | case TYPE_STRING: { |
456 | 0 | RETURN_IF_ERROR( |
457 | 0 | string_column_checker(data_column_ptr, nested_type, &permutation, _filter_map)); |
458 | 0 | const_cast<ColumnArray*>(column_array)->get_data_ptr() = std::move(data_column_ptr); |
459 | 0 | break; |
460 | 0 | } |
461 | 0 | default: |
462 | 0 | RETURN_IF_ERROR(_validate_column(state, block, nested_type, data_column_ptr, slot_index, |
463 | 0 | error_prefix, permutation.size(), &permutation)); |
464 | 0 | break; |
465 | 0 | } |
466 | 0 | break; |
467 | 0 | } |
468 | 0 | case TYPE_MAP: { |
469 | 0 | const auto* column_map = assert_cast<const ColumnMap*>(real_column_ptr.get()); |
470 | | // column_map utilizes the ColumnPtr from the block* block in _validate_data and can be modified. |
471 | 0 | RETURN_IF_ERROR((const_cast<ColumnMap*>(column_map))->deduplicate_keys(true)); |
472 | | |
473 | 0 | const auto* type_map = assert_cast<const DataTypeMap*>(remove_nullable(type).get()); |
474 | 0 | auto key_type = type_map->get_key_type(); |
475 | 0 | auto val_type = type_map->get_value_type(); |
476 | 0 | const auto& offsets = column_map->get_offsets(); |
477 | 0 | IColumn::Permutation permutation(offsets.back()); |
478 | 0 | for (size_t r = 0; r < row_count; ++r) { |
479 | 0 | for (size_t c = offsets[r - 1]; c < offsets[r]; ++c) { |
480 | 0 | permutation[c] = rows ? (*rows)[r] : r; |
481 | 0 | } |
482 | 0 | } |
483 | |
|
484 | 0 | fmt::format_to(error_prefix, "MAP type failed: "); |
485 | 0 | switch (key_type->get_primitive_type()) { |
486 | 0 | case TYPE_CHAR: |
487 | 0 | case TYPE_VARCHAR: |
488 | 0 | case TYPE_STRING: { |
489 | 0 | auto key_column_ptr = column_map->get_keys_ptr(); |
490 | 0 | RETURN_IF_ERROR( |
491 | 0 | string_column_checker(key_column_ptr, key_type, &permutation, _filter_map)); |
492 | 0 | const_cast<ColumnMap*>(column_map)->get_keys_ptr() = std::move(key_column_ptr); |
493 | 0 | break; |
494 | 0 | } |
495 | 0 | default: |
496 | 0 | RETURN_IF_ERROR(_validate_column(state, block, key_type, column_map->get_keys_ptr(), |
497 | 0 | slot_index, error_prefix, permutation.size(), |
498 | 0 | &permutation)); |
499 | 0 | break; |
500 | 0 | } |
501 | | |
502 | 0 | switch (val_type->get_primitive_type()) { |
503 | 0 | case TYPE_CHAR: |
504 | 0 | case TYPE_VARCHAR: |
505 | 0 | case TYPE_STRING: { |
506 | 0 | auto value_column_ptr = column_map->get_values_ptr(); |
507 | 0 | RETURN_IF_ERROR( |
508 | 0 | string_column_checker(value_column_ptr, val_type, &permutation, _filter_map)); |
509 | 0 | const_cast<ColumnMap*>(column_map)->get_values_ptr() = std::move(value_column_ptr); |
510 | 0 | break; |
511 | 0 | } |
512 | 0 | default: |
513 | 0 | RETURN_IF_ERROR(_validate_column(state, block, val_type, column_map->get_values_ptr(), |
514 | 0 | slot_index, error_prefix, permutation.size(), |
515 | 0 | &permutation)); |
516 | 0 | break; |
517 | 0 | } |
518 | 0 | break; |
519 | 0 | } |
520 | 0 | case TYPE_STRUCT: { |
521 | 0 | const auto column_struct = assert_cast<const ColumnStruct*>(real_column_ptr.get()); |
522 | 0 | const auto* type_struct = assert_cast<const DataTypeStruct*>(remove_nullable(type).get()); |
523 | 0 | DCHECK(type_struct->get_elements().size() == column_struct->tuple_size()); |
524 | 0 | fmt::format_to(error_prefix, "STRUCT type failed: "); |
525 | 0 | for (size_t sc = 0; sc < column_struct->tuple_size(); ++sc) { |
526 | 0 | auto element_type = type_struct->get_element(sc); |
527 | 0 | switch (element_type->get_primitive_type()) { |
528 | 0 | case TYPE_CHAR: |
529 | 0 | case TYPE_VARCHAR: |
530 | 0 | case TYPE_STRING: { |
531 | 0 | auto element_column_ptr = column_struct->get_column_ptr(sc); |
532 | 0 | RETURN_IF_ERROR(string_column_checker(element_column_ptr, element_type, nullptr, |
533 | 0 | _filter_map)); |
534 | 0 | const_cast<ColumnStruct*>(column_struct)->get_column_ptr(sc) = |
535 | 0 | std::move(element_column_ptr); |
536 | 0 | break; |
537 | 0 | } |
538 | 0 | default: |
539 | 0 | RETURN_IF_ERROR(_validate_column(state, block, type_struct->get_element(sc), |
540 | 0 | column_struct->get_column_ptr(sc), slot_index, |
541 | 0 | error_prefix, |
542 | 0 | column_struct->get_column_ptr(sc)->size())); |
543 | 0 | break; |
544 | 0 | } |
545 | 0 | } |
546 | 0 | break; |
547 | 0 | } |
548 | 0 | case TYPE_AGG_STATE: { |
549 | 0 | auto* column_string = check_and_get_column<ColumnString>(*real_column_ptr); |
550 | 0 | if (column_string) { |
551 | 0 | RETURN_IF_ERROR(string_column_checker(column, type, rows, _filter_map)); |
552 | 0 | } |
553 | 0 | break; |
554 | 0 | } |
555 | 9 | default: |
556 | 9 | break; |
557 | 9 | } |
558 | | |
559 | | // Dispose the column should do not contain the NULL value |
560 | | // Only two case: |
561 | | // 1. column is nullable but the desc is not nullable |
562 | | // 2. desc->type is BITMAP |
563 | 9 | if ((!type->is_nullable() || type->get_primitive_type() == TYPE_BITMAP) && column_ptr) { |
564 | 0 | for (int j = 0; j < row_count; ++j) { |
565 | 0 | auto row = rows ? (*rows)[j] : j; |
566 | 0 | if (null_map[j] && !_filter_map[row]) { |
567 | 0 | fmt::format_to(error_msg, "null value for not null column, type={}", |
568 | 0 | type->get_name()); |
569 | 0 | RETURN_IF_ERROR(set_invalid_and_append_error_msg(row)); |
570 | 0 | } |
571 | 0 | } |
572 | 0 | } |
573 | | |
574 | 9 | return Status::OK(); |
575 | 9 | } |
576 | | |
577 | | Status OlapTableBlockConvertor::_validate_data(RuntimeState* state, Block* block, const size_t rows, |
578 | 9 | int& filtered_rows) { |
579 | 9 | filtered_rows = 0; |
580 | 9 | Defer defer {[&] { |
581 | 26 | for (int i = 0; i < rows; ++i) { |
582 | 17 | filtered_rows += _filter_map[i]; |
583 | 17 | } |
584 | 9 | }}; |
585 | 18 | for (int i = 0; i < _output_tuple_desc->slots().size(); ++i) { |
586 | 9 | SlotDescriptor* desc = _output_tuple_desc->slots()[i]; |
587 | 9 | block->get_by_position(i).column = |
588 | 9 | block->get_by_position(i).column->convert_to_full_column_if_const(); |
589 | 9 | const auto& column = block->get_by_position(i).column; |
590 | | |
591 | 9 | fmt::memory_buffer error_prefix; |
592 | 9 | fmt::format_to(error_prefix, "column_name[{}], ", desc->col_name()); |
593 | 9 | RETURN_IF_ERROR( |
594 | 9 | _validate_column(state, block, desc->type(), column, i, error_prefix, rows)); |
595 | 9 | } |
596 | 9 | return Status::OK(); |
597 | 9 | } |
598 | | |
599 | 9 | void OlapTableBlockConvertor::_convert_to_dest_desc_block(doris::Block* block) { |
600 | 18 | for (int i = 0; i < _output_tuple_desc->slots().size() && i < block->columns(); ++i) { |
601 | 9 | SlotDescriptor* desc = _output_tuple_desc->slots()[i]; |
602 | 9 | if (desc->is_nullable() != block->get_by_position(i).type->is_nullable()) { |
603 | 0 | if (desc->is_nullable()) { |
604 | 0 | block->get_by_position(i).type = make_nullable(block->get_by_position(i).type); |
605 | 0 | block->get_by_position(i).column = make_nullable(block->get_by_position(i).column); |
606 | 0 | } else { |
607 | 0 | block->get_by_position(i).type = |
608 | 0 | assert_cast<const DataTypeNullable&>(*block->get_by_position(i).type) |
609 | 0 | .get_nested_type(); |
610 | 0 | block->get_by_position(i).column = |
611 | 0 | assert_cast<const ColumnNullable&>(*block->get_by_position(i).column) |
612 | 0 | .get_nested_column_ptr(); |
613 | 0 | } |
614 | 0 | } |
615 | 9 | } |
616 | 9 | } |
617 | | |
618 | 0 | Status OlapTableBlockConvertor::_fill_auto_inc_cols(Block* block, size_t rows) { |
619 | 0 | size_t idx = _auto_inc_col_idx.value(); |
620 | 0 | SlotDescriptor* slot = _output_tuple_desc->slots()[idx]; |
621 | 0 | DCHECK(slot->type()->get_primitive_type() == PrimitiveType::TYPE_BIGINT); |
622 | 0 | DCHECK(!slot->is_nullable()); |
623 | |
|
624 | 0 | size_t null_value_count = 0; |
625 | 0 | auto dst_column = ColumnInt64::create(); |
626 | 0 | ColumnInt64::Container& dst_values = dst_column->get_data(); |
627 | |
|
628 | 0 | ColumnPtr src_column_ptr = block->get_by_position(idx).column; |
629 | 0 | if (const auto* const_column = check_and_get_column<ColumnConst>(src_column_ptr.get())) { |
630 | | // for insert stmt like "insert into tbl1 select null,col1,col2,... from tbl2" or |
631 | | // "insert into tbl1 select 1,col1,col2,... from tbl2", the type of literal's column |
632 | | // will be `ColumnConst` |
633 | 0 | if (const_column->is_null_at(0)) { |
634 | | // the input of autoinc column are all null literals |
635 | | // fill the column with generated ids |
636 | 0 | null_value_count = rows; |
637 | 0 | std::vector<std::pair<int64_t, size_t>> res; |
638 | 0 | RETURN_IF_ERROR(_auto_inc_id_buffer->sync_request_ids(null_value_count, &res)); |
639 | 0 | for (auto [start, length] : res) { |
640 | 0 | _auto_inc_id_allocator.insert_ids(start, length); |
641 | 0 | } |
642 | |
|
643 | 0 | for (size_t i = 0; i < rows; i++) { |
644 | 0 | dst_values.emplace_back(_auto_inc_id_allocator.next_id()); |
645 | 0 | } |
646 | 0 | } else { |
647 | | // the input of autoinc column are all int64 literals |
648 | | // fill the column with that literal |
649 | 0 | int64_t value = const_column->get_int(0); |
650 | 0 | dst_values.resize_fill(rows, value); |
651 | 0 | } |
652 | 0 | } else if (const auto* src_nullable_column = |
653 | 0 | check_and_get_column<ColumnNullable>(src_column_ptr.get())) { |
654 | 0 | auto src_nested_column_ptr = src_nullable_column->get_nested_column_ptr(); |
655 | 0 | const auto& null_map_data = src_nullable_column->get_null_map_data(); |
656 | 0 | dst_values.reserve(rows); |
657 | 0 | for (size_t i = 0; i < rows; i++) { |
658 | 0 | null_value_count += null_map_data[i]; |
659 | 0 | } |
660 | 0 | std::vector<std::pair<int64_t, size_t>> res; |
661 | 0 | RETURN_IF_ERROR(_auto_inc_id_buffer->sync_request_ids(null_value_count, &res)); |
662 | 0 | for (auto [start, length] : res) { |
663 | 0 | _auto_inc_id_allocator.insert_ids(start, length); |
664 | 0 | } |
665 | |
|
666 | 0 | for (size_t i = 0; i < rows; i++) { |
667 | 0 | dst_values.emplace_back((null_map_data[i] != 0) ? _auto_inc_id_allocator.next_id() |
668 | 0 | : src_nested_column_ptr->get_int(i)); |
669 | 0 | } |
670 | 0 | } else { |
671 | 0 | return Status::OK(); |
672 | 0 | } |
673 | 0 | block->get_by_position(idx).column = std::move(dst_column); |
674 | 0 | block->get_by_position(idx).type = remove_nullable(slot->type()); |
675 | 0 | return Status::OK(); |
676 | 0 | } |
677 | | |
678 | 0 | Status OlapTableBlockConvertor::_partial_update_fill_auto_inc_cols(Block* block, size_t rows) { |
679 | 0 | auto dst_column = ColumnInt64::create(); |
680 | 0 | ColumnInt64::Container& dst_values = dst_column->get_data(); |
681 | 0 | size_t null_value_count = rows; |
682 | 0 | std::vector<std::pair<int64_t, size_t>> res; |
683 | 0 | RETURN_IF_ERROR(_auto_inc_id_buffer->sync_request_ids(null_value_count, &res)); |
684 | 0 | for (auto [start, length] : res) { |
685 | 0 | _auto_inc_id_allocator.insert_ids(start, length); |
686 | 0 | } |
687 | |
|
688 | 0 | for (size_t i = 0; i < rows; i++) { |
689 | 0 | dst_values.emplace_back(_auto_inc_id_allocator.next_id()); |
690 | 0 | } |
691 | 0 | block->insert(ColumnWithTypeAndName(std::move(dst_column), std::make_shared<DataTypeInt64>(), |
692 | 0 | BeConsts::PARTIAL_UPDATE_AUTO_INC_COL)); |
693 | 0 | return Status::OK(); |
694 | 0 | } |
695 | | |
696 | | } // namespace doris |