Coverage Report

Created: 2026-03-16 19:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/vtablet_block_convertor.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/vtablet_block_convertor.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/FrontendService.h>
22
#include <glog/logging.h>
23
#include <google/protobuf/stubs/common.h>
24
25
#include <algorithm>
26
#include <memory>
27
#include <string>
28
#include <unordered_map>
29
#include <utility>
30
31
#include "common/compiler_util.h" // IWYU pragma: keep
32
#include "common/consts.h"
33
#include "common/status.h"
34
#include "core/assert_cast.h"
35
#include "core/binary_cast.hpp"
36
#include "core/block/block.h"
37
#include "core/column/column.h"
38
#include "core/column/column_array.h"
39
#include "core/column/column_const.h"
40
#include "core/column/column_decimal.h"
41
#include "core/column/column_map.h"
42
#include "core/column/column_nullable.h"
43
#include "core/column/column_string.h"
44
#include "core/column/column_struct.h"
45
#include "core/data_type/data_type.h"
46
#include "core/data_type/data_type_array.h"
47
#include "core/data_type/data_type_decimal.h"
48
#include "core/data_type/data_type_factory.hpp"
49
#include "core/data_type/data_type_map.h"
50
#include "core/data_type/data_type_nullable.h"
51
#include "core/data_type/data_type_struct.h"
52
#include "core/data_type/define_primitive_type.h"
53
#include "core/data_type/primitive_type.h"
54
#include "core/types.h"
55
#include "core/wide_integer_to_string.h"
56
#include "exprs/function/function_helpers.h"
57
#include "exprs/function/simple_function_factory.h"
58
#include "exprs/vexpr.h"
59
#include "exprs/vexpr_context.h"
60
#include "runtime/descriptors.h"
61
#include "runtime/runtime_state.h"
62
#include "service/brpc.h"
63
#include "storage/olap_common.h"
64
#include "util/brpc_client_cache.h"
65
#include "util/thread.h"
66
67
namespace doris {
68
#include "common/compile_check_begin.h"
69
70
// !FIXME: Here we should consider using MutableBlock, due to potential data reorganization
71
Status OlapTableBlockConvertor::validate_and_convert_block(RuntimeState* state, Block* input_block,
72
                                                           std::shared_ptr<Block>& block,
73
                                                           VExprContextSPtrs output_vexpr_ctxs,
74
9
                                                           size_t rows, bool& has_filtered_rows) {
75
9
    DCHECK(input_block->rows() > 0);
76
77
9
    block = Block::create_shared(input_block->get_columns_with_type_and_name());
78
9
    if (!output_vexpr_ctxs.empty()) {
79
        // Do vectorized expr here to speed up load
80
0
        RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(
81
0
                output_vexpr_ctxs, *input_block, block.get()));
82
0
    }
83
84
9
    if (_is_partial_update_and_auto_inc) {
85
        // If this load is partial update and this table has a auto inc column,
86
        // e.g. table schema: k1, v1, v2(auto inc)
87
        // 1. insert columns include auto inc column
88
        // e.g. insert into table (k1, v2) value(a, 1);
89
        // we do nothing.
90
        // 2. insert columns do not include auto inc column
91
        // e.g. insert into table (k1, v1) value(a, a);
92
        // we need to fill auto_inc_cols by creating a new column.
93
0
        if (!_auto_inc_col_idx.has_value()) {
94
0
            RETURN_IF_ERROR(_partial_update_fill_auto_inc_cols(block.get(), rows));
95
0
        }
96
9
    } else if (_auto_inc_col_idx.has_value()) {
97
        // fill the valus for auto-increment columns
98
0
        DCHECK_EQ(_is_partial_update_and_auto_inc, false);
99
0
        RETURN_IF_ERROR(_fill_auto_inc_cols(block.get(), rows));
100
0
    }
101
102
9
    int filtered_rows = 0;
103
9
    {
104
9
        SCOPED_RAW_TIMER(&_validate_data_ns);
105
9
        _filter_map.clear();
106
9
        _filter_map.resize(rows, 0);
107
9
        auto st = _validate_data(state, block.get(), rows, filtered_rows);
108
9
        _num_filtered_rows += filtered_rows;
109
9
        has_filtered_rows = filtered_rows > 0;
110
9
        if (!st.ok()) {
111
0
            return st;
112
0
        }
113
9
        _convert_to_dest_desc_block(block.get());
114
9
    }
115
116
0
    return Status::OK();
117
9
}
118
119
void OlapTableBlockConvertor::init_autoinc_info(int64_t db_id, int64_t table_id, int batch_size,
120
                                                bool is_partial_update_and_auto_inc,
121
2
                                                int32_t auto_increment_column_unique_id) {
122
2
    _batch_size = batch_size;
123
2
    if (is_partial_update_and_auto_inc) {
124
0
        _is_partial_update_and_auto_inc = is_partial_update_and_auto_inc;
125
0
        _auto_inc_id_buffer = GlobalAutoIncBuffers::GetInstance()->get_auto_inc_buffer(
126
0
                db_id, table_id, auto_increment_column_unique_id);
127
0
        return;
128
0
    }
129
4
    for (size_t idx = 0; idx < _output_tuple_desc->slots().size(); idx++) {
130
2
        if (_output_tuple_desc->slots()[idx]->is_auto_increment()) {
131
0
            _auto_inc_col_idx = idx;
132
0
            _auto_inc_id_buffer = GlobalAutoIncBuffers::GetInstance()->get_auto_inc_buffer(
133
0
                    db_id, table_id, _output_tuple_desc->slots()[idx]->col_unique_id());
134
0
            _auto_inc_id_buffer->set_batch_size_at_least(_batch_size);
135
0
            break;
136
0
        }
137
2
    }
138
2
}
139
140
template <bool is_min>
141
0
DecimalV2Value OlapTableBlockConvertor::_get_decimalv2_min_or_max(const DataTypePtr& type) {
142
0
    std::map<std::pair<int, int>, DecimalV2Value>* pmap;
143
0
    if constexpr (is_min) {
144
0
        pmap = &_min_decimalv2_val;
145
0
    } else {
146
0
        pmap = &_max_decimalv2_val;
147
0
    }
148
149
    // found
150
0
    auto iter = pmap->find(
151
0
            {remove_nullable(type)->get_precision(), remove_nullable(type)->get_scale()});
152
0
    if (iter != pmap->end()) {
153
0
        return iter->second;
154
0
    }
155
156
    // save min or max DecimalV2Value for next time
157
0
    DecimalV2Value value;
158
0
    if constexpr (is_min) {
159
0
        value.to_min_decimal(type->get_precision(), type->get_scale());
160
0
    } else {
161
0
        value.to_max_decimal(type->get_precision(), type->get_scale());
162
0
    }
163
0
    pmap->emplace(std::pair<int, int> {type->get_precision(), type->get_scale()}, value);
164
0
    return value;
165
0
}
Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv2_min_or_maxILb0EEENS_14DecimalV2ValueERKSt10shared_ptrIKNS_9IDataTypeEE
Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv2_min_or_maxILb1EEENS_14DecimalV2ValueERKSt10shared_ptrIKNS_9IDataTypeEE
166
167
template <typename DecimalType, bool IsMin>
168
0
DecimalType OlapTableBlockConvertor::_get_decimalv3_min_or_max(const DataTypePtr& type) {
169
0
    std::map<int, typename DecimalType::NativeType>* pmap;
170
0
    if constexpr (std::is_same_v<DecimalType, Decimal32>) {
171
0
        pmap = IsMin ? &_min_decimal32_val : &_max_decimal32_val;
172
0
    } else if constexpr (std::is_same_v<DecimalType, Decimal64>) {
173
0
        pmap = IsMin ? &_min_decimal64_val : &_max_decimal64_val;
174
0
    } else if constexpr (std::is_same_v<DecimalType, Decimal128V3>) {
175
0
        pmap = IsMin ? &_min_decimal128_val : &_max_decimal128_val;
176
0
    } else {
177
0
        pmap = IsMin ? &_min_decimal256_val : &_max_decimal256_val;
178
0
    }
179
180
    // found
181
0
    auto iter = pmap->find(type->get_precision());
182
0
    if (iter != pmap->end()) {
183
0
        return DecimalType(iter->second);
184
0
    }
185
186
0
    DecimalType value;
187
0
    if constexpr (IsMin) {
188
0
        value = min_decimal_value<DecimalType::PType>(type->get_precision());
189
0
    } else {
190
0
        value = max_decimal_value<DecimalType::PType>(type->get_precision());
191
0
    }
192
0
    pmap->emplace(type->get_precision(), value.value);
193
0
    return value;
194
0
}
Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv3_min_or_maxINS_7DecimalIiEELb0EEET_RKSt10shared_ptrIKNS_9IDataTypeEE
Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv3_min_or_maxINS_7DecimalIiEELb1EEET_RKSt10shared_ptrIKNS_9IDataTypeEE
Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv3_min_or_maxINS_7DecimalIlEELb0EEET_RKSt10shared_ptrIKNS_9IDataTypeEE
Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv3_min_or_maxINS_7DecimalIlEELb1EEET_RKSt10shared_ptrIKNS_9IDataTypeEE
Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv3_min_or_maxINS_12Decimal128V3ELb0EEET_RKSt10shared_ptrIKNS_9IDataTypeEE
Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv3_min_or_maxINS_12Decimal128V3ELb1EEET_RKSt10shared_ptrIKNS_9IDataTypeEE
Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv3_min_or_maxINS_7DecimalIN4wide7integerILm256EiEEEELb0EEET_RKSt10shared_ptrIKNS_9IDataTypeEE
Unexecuted instantiation: _ZN5doris23OlapTableBlockConvertor25_get_decimalv3_min_or_maxINS_7DecimalIN4wide7integerILm256EiEEEELb1EEET_RKSt10shared_ptrIKNS_9IDataTypeEE
195
196
Status OlapTableBlockConvertor::_internal_validate_column(RuntimeState* state, Block* block,
197
                                                          const DataTypePtr& type, ColumnPtr column,
198
                                                          size_t slot_index,
199
                                                          fmt::memory_buffer& error_prefix,
200
                                                          const size_t row_count,
201
9
                                                          IColumn::Permutation* rows) {
202
9
    DCHECK((rows == nullptr) || (rows->size() == row_count));
203
9
    fmt::memory_buffer error_msg;
204
9
    auto set_invalid_and_append_error_msg = [&](size_t row) {
205
0
        _filter_map[row] = true;
206
0
        auto ret = state->append_error_msg_to_file([]() -> std::string { return ""; },
207
0
                                                   [&error_prefix, &error_msg]() -> std::string {
208
0
                                                       return fmt::to_string(error_prefix) +
209
0
                                                              fmt::to_string(error_msg);
210
0
                                                   });
211
0
        error_msg.clear();
212
0
        return ret;
213
0
    };
214
215
9
    const auto* column_ptr = check_and_get_column<ColumnNullable>(*column);
216
9
    const auto& real_column_ptr =
217
9
            column_ptr == nullptr ? column : (column_ptr->get_nested_column_ptr());
218
9
    const auto* null_map = column_ptr == nullptr ? nullptr : column_ptr->get_null_map_data().data();
219
9
    auto need_to_validate = [](size_t j, size_t row, const std::vector<char>& filter_map,
220
9
                               const unsigned char* null_map) {
221
0
        return !filter_map[row] && (null_map == nullptr || null_map[j] == 0);
222
0
    };
223
224
    // may change orig_column if substring function is performed
225
9
    auto string_column_checker = [&state, &error_msg, need_to_validate,
226
9
                                  set_invalid_and_append_error_msg](
227
9
                                         ColumnPtr& orig_column, const DataTypePtr& orig_type,
228
9
                                         IColumn::Permutation* rows,
229
9
                                         const std::vector<char>& filter_map) {
230
0
        int limit = config::string_type_length_soft_limit_bytes;
231
0
        int len = -1;
232
        // when type.len is negative, std::min will return overflow value, so we need to check it
233
0
        const auto* type_str =
234
0
                check_and_get_data_type<DataTypeString>(remove_nullable(orig_type).get());
235
0
        if (type_str) {
236
0
            if (type_str->len() >= 0) {
237
0
                len = type_str->len();
238
0
                limit = std::min(limit, type_str->len());
239
0
            }
240
0
        }
241
242
0
        const auto* tmp_column_ptr = check_and_get_column<ColumnNullable>(*orig_column);
243
0
        const auto& tmp_real_column_ptr =
244
0
                tmp_column_ptr == nullptr ? orig_column : (tmp_column_ptr->get_nested_column_ptr());
245
0
        const auto* column_string = assert_cast<const ColumnString*>(tmp_real_column_ptr.get());
246
0
        const auto* null_map =
247
0
                tmp_column_ptr == nullptr ? nullptr : tmp_column_ptr->get_null_map_data().data();
248
249
0
        const auto* __restrict offsets = column_string->get_offsets().data();
250
0
        int invalid_count = 0;
251
0
        size_t row_count = orig_column->size();
252
0
        for (int64_t j = 0; j < row_count; ++j) {
253
0
            invalid_count += (offsets[j] - offsets[j - 1]) > limit;
254
0
        }
255
256
0
        if (invalid_count) {
257
            // For string column, if in non-strict load mode(for both insert stmt and stream load),
258
            // truncate the string to schema len.
259
            // After truncation, still need to check if byte len of each row exceed the schema len,
260
            // because currently the schema len is defined in bytes, and substring works by unit of chars.
261
            // This is a workaround for now, need to improve it after better support of multi-byte chars.
262
0
            if (type_str && !state->enable_insert_strict()) {
263
0
                ColumnsWithTypeAndName argument_template;
264
0
                auto input_type = remove_nullable(orig_type);
265
0
                auto pos_type = DataTypeFactory::instance().create_data_type(
266
0
                        FieldType::OLAP_FIELD_TYPE_INT, 0, 0);
267
0
                auto len_type = DataTypeFactory::instance().create_data_type(
268
0
                        FieldType::OLAP_FIELD_TYPE_INT, 0, 0);
269
0
                argument_template.emplace_back(nullptr, input_type, "string column");
270
0
                argument_template.emplace_back(nullptr, pos_type, "pos column");
271
0
                argument_template.emplace_back(nullptr, len_type, "len column");
272
0
                auto func = SimpleFunctionFactory::instance().get_function(
273
0
                        "substring", argument_template, input_type, {}, state->be_exec_version());
274
0
                if (!func) {
275
0
                    return Status::InternalError("get function substring failed");
276
0
                }
277
0
                auto pos_column = pos_type->create_column_const(row_count, to_field<TYPE_INT>(1));
278
0
                auto len_column =
279
0
                        len_type->create_column_const(row_count, to_field<TYPE_INT>(limit));
280
0
                Block tmp_block({{remove_nullable(orig_column), input_type, "string column"},
281
0
                                 {pos_column, pos_type, "pos"},
282
0
                                 {len_column, len_type, "len"},
283
0
                                 {nullptr, input_type, "result"}});
284
0
                RETURN_IF_ERROR(func->execute(nullptr, tmp_block, {0, 1, 2}, 3, row_count));
285
0
                column_string =
286
0
                        assert_cast<const ColumnString*>(tmp_block.get_by_position(3).column.get());
287
0
                orig_column =
288
0
                        orig_column->is_nullable()
289
0
                                ? ColumnNullable::create(tmp_block.get_by_position(3).column,
290
0
                                                         tmp_column_ptr->get_null_map_column_ptr())
291
0
                                : std::move(tmp_block.get_by_position(3).column);
292
0
            }
293
0
            for (size_t j = 0; j < row_count; ++j) {
294
0
                auto row = rows ? (*rows)[j] : j;
295
0
                if (need_to_validate(j, row, filter_map, null_map)) {
296
0
                    auto str_val = column_string->get_data_at(j);
297
0
                    bool invalid = str_val.size > limit;
298
0
                    if (invalid) {
299
0
                        if (str_val.size > len) {
300
0
                            fmt::format_to(error_msg, "{}",
301
0
                                           "the length of input is too long than schema. ");
302
0
                            fmt::format_to(error_msg, "first 32 bytes of input str: [{}] ",
303
0
                                           str_val.to_prefix(32));
304
0
                            fmt::format_to(error_msg, "schema length: {}; ", len);
305
0
                            fmt::format_to(error_msg, "actual length: {}; ", str_val.size);
306
0
                        } else if (str_val.size > limit) {
307
0
                            fmt::format_to(
308
0
                                    error_msg, "{}",
309
0
                                    "the length of input string is too long than vec schema. ");
310
0
                            fmt::format_to(error_msg, "first 32 bytes of input str: [{}] ",
311
0
                                           str_val.to_prefix(32));
312
0
                            fmt::format_to(error_msg, "schema length: {}; ", len);
313
0
                            fmt::format_to(error_msg, "limit length: {}; ", limit);
314
0
                            fmt::format_to(error_msg, "actual length: {}; ", str_val.size);
315
0
                        }
316
0
                        RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
317
0
                    }
318
0
                }
319
0
            }
320
0
        }
321
0
        return Status::OK();
322
0
    };
323
324
9
    switch (type->get_primitive_type()) {
325
0
    case TYPE_CHAR:
326
0
    case TYPE_VARCHAR:
327
0
    case TYPE_STRING: {
328
0
        RETURN_IF_ERROR(string_column_checker(column, type, rows, _filter_map));
329
0
        block->get_by_position(slot_index).column = std::move(column);
330
0
        break;
331
0
    }
332
0
    case TYPE_JSONB: {
333
0
        const auto* column_string = assert_cast<const ColumnString*>(real_column_ptr.get());
334
0
        for (size_t j = 0; j < row_count; ++j) {
335
0
            if (!_filter_map[j]) {
336
0
                if (type->is_nullable() && column_ptr && column_ptr->is_null_at(j)) {
337
0
                    continue;
338
0
                }
339
0
                auto str_val = column_string->get_data_at(j);
340
0
                bool invalid = str_val.size == 0;
341
0
                if (invalid) {
342
0
                    error_msg.clear();
343
0
                    fmt::format_to(error_msg, "{}", "jsonb with size 0 is invalid");
344
0
                    RETURN_IF_ERROR(set_invalid_and_append_error_msg(j));
345
0
                }
346
0
            }
347
0
        }
348
0
        break;
349
0
    }
350
0
    case TYPE_DECIMALV2: {
351
        // column_decimal utilizes the ColumnPtr from the block* block in _validate_data and can be modified.
352
0
        auto* column_decimal = const_cast<ColumnDecimal128V2*>(
353
0
                assert_cast<const ColumnDecimal128V2*>(real_column_ptr.get()));
354
0
        const auto& max_decimalv2 = _get_decimalv2_min_or_max<false>(type);
355
0
        const auto& min_decimalv2 = _get_decimalv2_min_or_max<true>(type);
356
0
        for (size_t j = 0; j < row_count; ++j) {
357
0
            auto row = rows ? (*rows)[j] : j;
358
0
            if (need_to_validate(j, row, _filter_map, null_map)) {
359
0
                auto dec_val = binary_cast<Int128, DecimalV2Value>(column_decimal->get_data()[j]);
360
0
                bool invalid = false;
361
362
0
                if (dec_val.greater_than_scale(type->get_scale())) {
363
0
                    auto code =
364
0
                            dec_val.round(&dec_val, remove_nullable(type)->get_scale(), HALF_UP);
365
0
                    column_decimal->get_data()[j] = dec_val;
366
367
0
                    if (code != E_DEC_OK) {
368
0
                        fmt::format_to(error_msg, "round one decimal failed.value={}; ",
369
0
                                       dec_val.to_string());
370
0
                        invalid = true;
371
0
                    }
372
0
                }
373
0
                if (dec_val > max_decimalv2 || dec_val < min_decimalv2) {
374
0
                    fmt::format_to(error_msg, "{}", "decimal value is not valid for definition");
375
0
                    fmt::format_to(error_msg, ", value={}", dec_val.to_string());
376
0
                    fmt::format_to(error_msg, ", precision={}, scale={}", type->get_precision(),
377
0
                                   type->get_scale());
378
0
                    fmt::format_to(error_msg, ", min={}, max={}; ", min_decimalv2.to_string(),
379
0
                                   max_decimalv2.to_string());
380
0
                    invalid = true;
381
0
                }
382
383
0
                if (invalid) {
384
0
                    RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
385
0
                }
386
0
            }
387
0
        }
388
0
        break;
389
0
    }
390
0
    case TYPE_DECIMAL32: {
391
0
#define CHECK_VALIDATION_FOR_DECIMALV3(DecimalType)                                               \
392
0
    auto column_decimal =                                                                         \
393
0
            assert_cast<const ColumnDecimal<DecimalType::PType>*>(real_column_ptr.get());         \
394
0
    const auto& max_decimal = _get_decimalv3_min_or_max<DecimalType, false>(type);                \
395
0
    const auto& min_decimal = _get_decimalv3_min_or_max<DecimalType, true>(type);                 \
396
0
    const auto* __restrict datas = column_decimal->get_data().data();                             \
397
0
    int invalid_count = 0;                                                                        \
398
0
    for (int j = 0; j < row_count; ++j) {                                                         \
399
0
        const auto dec_val = datas[j];                                                            \
400
0
        invalid_count += dec_val > max_decimal || dec_val < min_decimal;                          \
401
0
    }                                                                                             \
402
0
    if (invalid_count) {                                                                          \
403
0
        for (size_t j = 0; j < row_count; ++j) {                                                  \
404
0
            auto row = rows ? (*rows)[j] : j;                                                     \
405
0
            if (need_to_validate(j, row, _filter_map, null_map)) {                                \
406
0
                auto dec_val = column_decimal->get_data()[j];                                     \
407
0
                bool invalid = false;                                                             \
408
0
                if (dec_val > max_decimal || dec_val < min_decimal) {                             \
409
0
                    fmt::format_to(error_msg, "{}", "decimal value is not valid for definition"); \
410
0
                    fmt::format_to(error_msg, ", value={}", dec_val.value);                       \
411
0
                    fmt::format_to(error_msg, ", precision={}, scale={}", type->get_precision(),  \
412
0
                                   type->get_scale());                                            \
413
0
                    fmt::format_to(error_msg, ", min={}, max={}; ", min_decimal.value,            \
414
0
                                   max_decimal.value);                                            \
415
0
                    invalid = true;                                                               \
416
0
                }                                                                                 \
417
0
                if (invalid) {                                                                    \
418
0
                    RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));                       \
419
0
                }                                                                                 \
420
0
            }                                                                                     \
421
0
        }                                                                                         \
422
0
    }
423
0
        CHECK_VALIDATION_FOR_DECIMALV3(Decimal32);
424
0
        break;
425
0
    }
426
0
    case TYPE_DECIMAL64: {
427
0
        CHECK_VALIDATION_FOR_DECIMALV3(Decimal64);
428
0
        break;
429
0
    }
430
0
    case TYPE_DECIMAL128I: {
431
0
        CHECK_VALIDATION_FOR_DECIMALV3(Decimal128V3);
432
0
        break;
433
0
    }
434
0
    case TYPE_DECIMAL256: {
435
0
        CHECK_VALIDATION_FOR_DECIMALV3(Decimal256);
436
0
        break;
437
0
    }
438
0
#undef CHECK_VALIDATION_FOR_DECIMALV3
439
0
    case TYPE_ARRAY: {
440
0
        const auto* column_array = assert_cast<const ColumnArray*>(real_column_ptr.get());
441
0
        const auto* type_array = assert_cast<const DataTypeArray*>(remove_nullable(type).get());
442
0
        auto nested_type = type_array->get_nested_type();
443
0
        const auto& offsets = column_array->get_offsets();
444
0
        IColumn::Permutation permutation(offsets.back());
445
0
        for (size_t r = 0; r < row_count; ++r) {
446
0
            for (size_t c = offsets[r - 1]; c < offsets[r]; ++c) {
447
0
                permutation[c] = rows ? (*rows)[r] : r;
448
0
            }
449
0
        }
450
0
        fmt::format_to(error_prefix, "ARRAY type failed: ");
451
0
        auto data_column_ptr = column_array->get_data_ptr();
452
0
        switch (nested_type->get_primitive_type()) {
453
0
        case TYPE_CHAR:
454
0
        case TYPE_VARCHAR:
455
0
        case TYPE_STRING: {
456
0
            RETURN_IF_ERROR(
457
0
                    string_column_checker(data_column_ptr, nested_type, &permutation, _filter_map));
458
0
            const_cast<ColumnArray*>(column_array)->get_data_ptr() = std::move(data_column_ptr);
459
0
            break;
460
0
        }
461
0
        default:
462
0
            RETURN_IF_ERROR(_validate_column(state, block, nested_type, data_column_ptr, slot_index,
463
0
                                             error_prefix, permutation.size(), &permutation));
464
0
            break;
465
0
        }
466
0
        break;
467
0
    }
468
0
    case TYPE_MAP: {
469
0
        const auto* column_map = assert_cast<const ColumnMap*>(real_column_ptr.get());
470
        // column_map utilizes the ColumnPtr from the block* block in _validate_data and can be modified.
471
0
        RETURN_IF_ERROR((const_cast<ColumnMap*>(column_map))->deduplicate_keys(true));
472
473
0
        const auto* type_map = assert_cast<const DataTypeMap*>(remove_nullable(type).get());
474
0
        auto key_type = type_map->get_key_type();
475
0
        auto val_type = type_map->get_value_type();
476
0
        const auto& offsets = column_map->get_offsets();
477
0
        IColumn::Permutation permutation(offsets.back());
478
0
        for (size_t r = 0; r < row_count; ++r) {
479
0
            for (size_t c = offsets[r - 1]; c < offsets[r]; ++c) {
480
0
                permutation[c] = rows ? (*rows)[r] : r;
481
0
            }
482
0
        }
483
484
0
        fmt::format_to(error_prefix, "MAP type failed: ");
485
0
        switch (key_type->get_primitive_type()) {
486
0
        case TYPE_CHAR:
487
0
        case TYPE_VARCHAR:
488
0
        case TYPE_STRING: {
489
0
            auto key_column_ptr = column_map->get_keys_ptr();
490
0
            RETURN_IF_ERROR(
491
0
                    string_column_checker(key_column_ptr, key_type, &permutation, _filter_map));
492
0
            const_cast<ColumnMap*>(column_map)->get_keys_ptr() = std::move(key_column_ptr);
493
0
            break;
494
0
        }
495
0
        default:
496
0
            RETURN_IF_ERROR(_validate_column(state, block, key_type, column_map->get_keys_ptr(),
497
0
                                             slot_index, error_prefix, permutation.size(),
498
0
                                             &permutation));
499
0
            break;
500
0
        }
501
502
0
        switch (val_type->get_primitive_type()) {
503
0
        case TYPE_CHAR:
504
0
        case TYPE_VARCHAR:
505
0
        case TYPE_STRING: {
506
0
            auto value_column_ptr = column_map->get_values_ptr();
507
0
            RETURN_IF_ERROR(
508
0
                    string_column_checker(value_column_ptr, val_type, &permutation, _filter_map));
509
0
            const_cast<ColumnMap*>(column_map)->get_values_ptr() = std::move(value_column_ptr);
510
0
            break;
511
0
        }
512
0
        default:
513
0
            RETURN_IF_ERROR(_validate_column(state, block, val_type, column_map->get_values_ptr(),
514
0
                                             slot_index, error_prefix, permutation.size(),
515
0
                                             &permutation));
516
0
            break;
517
0
        }
518
0
        break;
519
0
    }
520
0
    case TYPE_STRUCT: {
521
0
        const auto column_struct = assert_cast<const ColumnStruct*>(real_column_ptr.get());
522
0
        const auto* type_struct = assert_cast<const DataTypeStruct*>(remove_nullable(type).get());
523
0
        DCHECK(type_struct->get_elements().size() == column_struct->tuple_size());
524
0
        fmt::format_to(error_prefix, "STRUCT type failed: ");
525
0
        for (size_t sc = 0; sc < column_struct->tuple_size(); ++sc) {
526
0
            auto element_type = type_struct->get_element(sc);
527
0
            switch (element_type->get_primitive_type()) {
528
0
            case TYPE_CHAR:
529
0
            case TYPE_VARCHAR:
530
0
            case TYPE_STRING: {
531
0
                auto element_column_ptr = column_struct->get_column_ptr(sc);
532
0
                RETURN_IF_ERROR(string_column_checker(element_column_ptr, element_type, nullptr,
533
0
                                                      _filter_map));
534
0
                const_cast<ColumnStruct*>(column_struct)->get_column_ptr(sc) =
535
0
                        std::move(element_column_ptr);
536
0
                break;
537
0
            }
538
0
            default:
539
0
                RETURN_IF_ERROR(_validate_column(state, block, type_struct->get_element(sc),
540
0
                                                 column_struct->get_column_ptr(sc), slot_index,
541
0
                                                 error_prefix,
542
0
                                                 column_struct->get_column_ptr(sc)->size()));
543
0
                break;
544
0
            }
545
0
        }
546
0
        break;
547
0
    }
548
0
    case TYPE_AGG_STATE: {
549
0
        auto* column_string = check_and_get_column<ColumnString>(*real_column_ptr);
550
0
        if (column_string) {
551
0
            RETURN_IF_ERROR(string_column_checker(column, type, rows, _filter_map));
552
0
        }
553
0
        break;
554
0
    }
555
9
    default:
556
9
        break;
557
9
    }
558
559
    // Dispose the column should do not contain the NULL value
560
    // Only two case:
561
    // 1. column is nullable but the desc is not nullable
562
    // 2. desc->type is BITMAP
563
9
    if ((!type->is_nullable() || type->get_primitive_type() == TYPE_BITMAP) && column_ptr) {
564
0
        for (int j = 0; j < row_count; ++j) {
565
0
            auto row = rows ? (*rows)[j] : j;
566
0
            if (null_map[j] && !_filter_map[row]) {
567
0
                fmt::format_to(error_msg, "null value for not null column, type={}",
568
0
                               type->get_name());
569
0
                RETURN_IF_ERROR(set_invalid_and_append_error_msg(row));
570
0
            }
571
0
        }
572
0
    }
573
574
9
    return Status::OK();
575
9
}
576
577
Status OlapTableBlockConvertor::_validate_data(RuntimeState* state, Block* block, const size_t rows,
578
9
                                               int& filtered_rows) {
579
9
    filtered_rows = 0;
580
9
    Defer defer {[&] {
581
26
        for (int i = 0; i < rows; ++i) {
582
17
            filtered_rows += _filter_map[i];
583
17
        }
584
9
    }};
585
18
    for (int i = 0; i < _output_tuple_desc->slots().size(); ++i) {
586
9
        SlotDescriptor* desc = _output_tuple_desc->slots()[i];
587
9
        block->get_by_position(i).column =
588
9
                block->get_by_position(i).column->convert_to_full_column_if_const();
589
9
        const auto& column = block->get_by_position(i).column;
590
591
9
        fmt::memory_buffer error_prefix;
592
9
        fmt::format_to(error_prefix, "column_name[{}], ", desc->col_name());
593
9
        RETURN_IF_ERROR(
594
9
                _validate_column(state, block, desc->type(), column, i, error_prefix, rows));
595
9
    }
596
9
    return Status::OK();
597
9
}
598
599
9
void OlapTableBlockConvertor::_convert_to_dest_desc_block(doris::Block* block) {
600
18
    for (int i = 0; i < _output_tuple_desc->slots().size() && i < block->columns(); ++i) {
601
9
        SlotDescriptor* desc = _output_tuple_desc->slots()[i];
602
9
        if (desc->is_nullable() != block->get_by_position(i).type->is_nullable()) {
603
0
            if (desc->is_nullable()) {
604
0
                block->get_by_position(i).type = make_nullable(block->get_by_position(i).type);
605
0
                block->get_by_position(i).column = make_nullable(block->get_by_position(i).column);
606
0
            } else {
607
0
                block->get_by_position(i).type =
608
0
                        assert_cast<const DataTypeNullable&>(*block->get_by_position(i).type)
609
0
                                .get_nested_type();
610
0
                block->get_by_position(i).column =
611
0
                        assert_cast<const ColumnNullable&>(*block->get_by_position(i).column)
612
0
                                .get_nested_column_ptr();
613
0
            }
614
0
        }
615
9
    }
616
9
}
617
618
0
Status OlapTableBlockConvertor::_fill_auto_inc_cols(Block* block, size_t rows) {
619
0
    size_t idx = _auto_inc_col_idx.value();
620
0
    SlotDescriptor* slot = _output_tuple_desc->slots()[idx];
621
0
    DCHECK(slot->type()->get_primitive_type() == PrimitiveType::TYPE_BIGINT);
622
0
    DCHECK(!slot->is_nullable());
623
624
0
    size_t null_value_count = 0;
625
0
    auto dst_column = ColumnInt64::create();
626
0
    ColumnInt64::Container& dst_values = dst_column->get_data();
627
628
0
    ColumnPtr src_column_ptr = block->get_by_position(idx).column;
629
0
    if (const auto* const_column = check_and_get_column<ColumnConst>(src_column_ptr.get())) {
630
        // for insert stmt like "insert into tbl1 select null,col1,col2,... from tbl2" or
631
        // "insert into tbl1 select 1,col1,col2,... from tbl2", the type of literal's column
632
        // will be `ColumnConst`
633
0
        if (const_column->is_null_at(0)) {
634
            // the input of autoinc column are all null literals
635
            // fill the column with generated ids
636
0
            null_value_count = rows;
637
0
            std::vector<std::pair<int64_t, size_t>> res;
638
0
            RETURN_IF_ERROR(_auto_inc_id_buffer->sync_request_ids(null_value_count, &res));
639
0
            for (auto [start, length] : res) {
640
0
                _auto_inc_id_allocator.insert_ids(start, length);
641
0
            }
642
643
0
            for (size_t i = 0; i < rows; i++) {
644
0
                dst_values.emplace_back(_auto_inc_id_allocator.next_id());
645
0
            }
646
0
        } else {
647
            // the input of autoinc column are all int64 literals
648
            // fill the column with that literal
649
0
            int64_t value = const_column->get_int(0);
650
0
            dst_values.resize_fill(rows, value);
651
0
        }
652
0
    } else if (const auto* src_nullable_column =
653
0
                       check_and_get_column<ColumnNullable>(src_column_ptr.get())) {
654
0
        auto src_nested_column_ptr = src_nullable_column->get_nested_column_ptr();
655
0
        const auto& null_map_data = src_nullable_column->get_null_map_data();
656
0
        dst_values.reserve(rows);
657
0
        for (size_t i = 0; i < rows; i++) {
658
0
            null_value_count += null_map_data[i];
659
0
        }
660
0
        std::vector<std::pair<int64_t, size_t>> res;
661
0
        RETURN_IF_ERROR(_auto_inc_id_buffer->sync_request_ids(null_value_count, &res));
662
0
        for (auto [start, length] : res) {
663
0
            _auto_inc_id_allocator.insert_ids(start, length);
664
0
        }
665
666
0
        for (size_t i = 0; i < rows; i++) {
667
0
            dst_values.emplace_back((null_map_data[i] != 0) ? _auto_inc_id_allocator.next_id()
668
0
                                                            : src_nested_column_ptr->get_int(i));
669
0
        }
670
0
    } else {
671
0
        return Status::OK();
672
0
    }
673
0
    block->get_by_position(idx).column = std::move(dst_column);
674
0
    block->get_by_position(idx).type = remove_nullable(slot->type());
675
0
    return Status::OK();
676
0
}
677
678
0
Status OlapTableBlockConvertor::_partial_update_fill_auto_inc_cols(Block* block, size_t rows) {
679
0
    auto dst_column = ColumnInt64::create();
680
0
    ColumnInt64::Container& dst_values = dst_column->get_data();
681
0
    size_t null_value_count = rows;
682
0
    std::vector<std::pair<int64_t, size_t>> res;
683
0
    RETURN_IF_ERROR(_auto_inc_id_buffer->sync_request_ids(null_value_count, &res));
684
0
    for (auto [start, length] : res) {
685
0
        _auto_inc_id_allocator.insert_ids(start, length);
686
0
    }
687
688
0
    for (size_t i = 0; i < rows; i++) {
689
0
        dst_values.emplace_back(_auto_inc_id_allocator.next_id());
690
0
    }
691
0
    block->insert(ColumnWithTypeAndName(std::move(dst_column), std::make_shared<DataTypeInt64>(),
692
0
                                        BeConsts::PARTIAL_UPDATE_AUTO_INC_COL));
693
0
    return Status::OK();
694
0
}
695
696
} // namespace doris