Coverage Report

Created: 2025-03-13 11:28

/root/doris/be/src/exec/tablet_info.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/tablet_info.h"
19
20
#include <gen_cpp/Descriptors_types.h>
21
#include <gen_cpp/Exprs_types.h>
22
#include <gen_cpp/Partitions_types.h>
23
#include <gen_cpp/Types_types.h>
24
#include <gen_cpp/descriptors.pb.h>
25
#include <glog/logging.h>
26
27
#include <algorithm>
28
#include <cstddef>
29
#include <cstdint>
30
#include <memory>
31
#include <ostream>
32
#include <string>
33
#include <tuple>
34
35
#include "common/exception.h"
36
#include "common/logging.h"
37
#include "common/status.h"
38
#include "olap/tablet_schema.h"
39
#include "runtime/define_primitive_type.h"
40
#include "runtime/descriptors.h"
41
#include "runtime/large_int_value.h"
42
#include "runtime/memory/mem_tracker.h"
43
#include "runtime/primitive_type.h"
44
#include "runtime/raw_value.h"
45
#include "runtime/types.h"
46
#include "util/string_parser.hpp"
47
#include "util/string_util.h"
48
#include "vec/columns/column.h"
49
// NOLINTNEXTLINE(unused-includes)
50
#include "vec/exprs/vexpr_context.h" // IWYU pragma: keep
51
#include "vec/exprs/vliteral.h"
52
#include "vec/runtime/vdatetime_value.h"
53
54
namespace doris {
55
56
37
void OlapTableIndexSchema::to_protobuf(POlapTableIndexSchema* pindex) const {
57
37
    pindex->set_id(index_id);
58
37
    pindex->set_schema_hash(schema_hash);
59
37
    for (auto* slot : slots) {
60
0
        pindex->add_columns(slot->col_name());
61
0
    }
62
37
    for (auto* column : columns) {
63
0
        column->to_schema_pb(pindex->add_columns_desc());
64
0
    }
65
37
    for (auto* index : indexes) {
66
0
        index->to_schema_pb(pindex->add_indexes_desc());
67
0
    }
68
37
}
69
70
bool VOlapTablePartKeyComparator::operator()(const BlockRowWithIndicator& lhs,
71
15
                                             const BlockRowWithIndicator& rhs) const {
72
15
    vectorized::Block* l_block = std::get<0>(lhs);
73
15
    vectorized::Block* r_block = std::get<0>(rhs);
74
15
    int32_t l_row = std::get<1>(lhs);
75
15
    int32_t r_row = std::get<1>(rhs);
76
15
    bool l_use_new = std::get<2>(lhs);
77
15
    bool r_use_new = std::get<2>(rhs);
78
79
15
    VLOG_TRACE << '\n' << l_block->dump_data() << '\n' << r_block->dump_data();
80
81
15
    if (l_row == -1) {
82
0
        return false;
83
15
    } else if (r_row == -1) {
84
15
        return true;
85
15
    }
86
87
0
    if (_param_locs.empty()) { // no transform, use origin column
88
0
        for (auto slot_loc : _slot_locs) {
89
0
            auto res = l_block->get_by_position(slot_loc).column->compare_at(
90
0
                    l_row, r_row, *r_block->get_by_position(slot_loc).column, -1);
91
0
            if (res != 0) {
92
0
                return res < 0;
93
0
            }
94
0
        }
95
0
    } else { // use transformed column to compare
96
0
        DCHECK(_slot_locs.size() == _param_locs.size())
97
0
                << _slot_locs.size() << ' ' << _param_locs.size();
98
99
0
        const std::vector<uint16_t>* l_index = l_use_new ? &_param_locs : &_slot_locs;
100
0
        const std::vector<uint16_t>* r_index = r_use_new ? &_param_locs : &_slot_locs;
101
102
0
        for (int i = 0; i < _slot_locs.size(); i++) {
103
0
            vectorized::ColumnPtr l_col = l_block->get_by_position((*l_index)[i]).column;
104
0
            vectorized::ColumnPtr r_col = r_block->get_by_position((*r_index)[i]).column;
105
106
0
            auto res = l_col->compare_at(l_row, r_row, *r_col, -1);
107
0
            if (res != 0) {
108
0
                return res < 0;
109
0
            }
110
0
        }
111
0
    }
112
113
    // equal, return false
114
0
    return false;
115
0
}
116
117
14
Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
118
14
    _db_id = pschema.db_id();
119
14
    _table_id = pschema.table_id();
120
14
    _version = pschema.version();
121
14
    _is_partial_update = pschema.partial_update();
122
14
    _is_strict_mode = pschema.is_strict_mode();
123
14
    if (_is_partial_update) {
124
0
        _auto_increment_column = pschema.auto_increment_column();
125
0
        if (!_auto_increment_column.empty() && pschema.auto_increment_column_unique_id() == -1) {
126
0
            return Status::InternalError(
127
0
                    "Auto increment column id is not set in FE. Maybe FE is an older version "
128
0
                    "different from BE.");
129
0
        }
130
0
        _auto_increment_column_unique_id = pschema.auto_increment_column_unique_id();
131
0
    }
132
14
    _timestamp_ms = pschema.timestamp_ms();
133
14
    if (pschema.has_nano_seconds()) {
134
14
        _nano_seconds = pschema.nano_seconds();
135
14
    }
136
14
    _timezone = pschema.timezone();
137
138
14
    for (const auto& col : pschema.partial_update_input_columns()) {
139
0
        _partial_update_input_columns.insert(col);
140
0
    }
141
14
    std::unordered_map<std::string, SlotDescriptor*> slots_map;
142
143
14
    _tuple_desc = _obj_pool.add(new TupleDescriptor(pschema.tuple_desc()));
144
145
84
    for (const auto& p_slot_desc : pschema.slot_descs()) {
146
84
        auto* slot_desc = _obj_pool.add(new SlotDescriptor(p_slot_desc));
147
84
        _tuple_desc->add_slot(slot_desc);
148
84
        string data_type;
149
84
        EnumToString(TPrimitiveType, to_thrift(slot_desc->col_type()), data_type);
150
84
        std::string is_null_str = slot_desc->is_nullable() ? "true" : "false";
151
84
        std::string data_type_str =
152
84
                std::to_string(int64_t(TabletColumn::get_field_type_by_string(data_type)));
153
84
        slots_map.emplace(to_lower(slot_desc->col_name()) + "+" + data_type_str + is_null_str,
154
84
                          slot_desc);
155
84
    }
156
157
28
    for (const auto& p_index : pschema.indexes()) {
158
28
        auto* index = _obj_pool.add(new OlapTableIndexSchema());
159
28
        index->index_id = p_index.id();
160
28
        index->schema_hash = p_index.schema_hash();
161
28
        for (const auto& pcolumn_desc : p_index.columns_desc()) {
162
0
            if (!_is_partial_update ||
163
0
                _partial_update_input_columns.contains(pcolumn_desc.name())) {
164
0
                std::string is_null_str = pcolumn_desc.is_nullable() ? "true" : "false";
165
0
                std::string data_type_str = std::to_string(
166
0
                        int64_t(TabletColumn::get_field_type_by_string(pcolumn_desc.type())));
167
0
                auto it = slots_map.find(to_lower(pcolumn_desc.name()) + "+" + data_type_str +
168
0
                                         is_null_str);
169
0
                if (it == std::end(slots_map)) {
170
0
                    return Status::InternalError("unknown index column, column={}, type={}",
171
0
                                                 pcolumn_desc.name(), pcolumn_desc.type());
172
0
                }
173
0
                index->slots.emplace_back(it->second);
174
0
            }
175
0
            TabletColumn* tc = _obj_pool.add(new TabletColumn());
176
0
            tc->init_from_pb(pcolumn_desc);
177
0
            index->columns.emplace_back(tc);
178
0
        }
179
28
        for (const auto& pindex_desc : p_index.indexes_desc()) {
180
0
            TabletIndex* ti = _obj_pool.add(new TabletIndex());
181
0
            ti->init_from_pb(pindex_desc);
182
0
            index->indexes.emplace_back(ti);
183
0
        }
184
28
        _indexes.emplace_back(index);
185
28
    }
186
187
14
    std::sort(_indexes.begin(), _indexes.end(),
188
28
              [](const OlapTableIndexSchema* lhs, const OlapTableIndexSchema* rhs) {
189
28
                  return lhs->index_id < rhs->index_id;
190
28
              });
191
14
    return Status::OK();
192
14
}
193
194
21
Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) {
195
21
    _db_id = tschema.db_id;
196
21
    _table_id = tschema.table_id;
197
21
    _version = tschema.version;
198
21
    _is_partial_update = tschema.is_partial_update;
199
21
    if (tschema.__isset.is_strict_mode) {
200
21
        _is_strict_mode = tschema.is_strict_mode;
201
21
    }
202
21
    if (_is_partial_update) {
203
0
        _auto_increment_column = tschema.auto_increment_column;
204
0
        if (!_auto_increment_column.empty() && tschema.auto_increment_column_unique_id == -1) {
205
0
            return Status::InternalError(
206
0
                    "Auto increment column id is not set in FE. Maybe FE is an older version "
207
0
                    "different from BE.");
208
0
        }
209
0
        _auto_increment_column_unique_id = tschema.auto_increment_column_unique_id;
210
0
    }
211
212
21
    for (const auto& tcolumn : tschema.partial_update_input_columns) {
213
0
        _partial_update_input_columns.insert(tcolumn);
214
0
    }
215
21
    std::unordered_map<std::string, SlotDescriptor*> slots_map;
216
21
    _tuple_desc = _obj_pool.add(new TupleDescriptor(tschema.tuple_desc));
217
122
    for (const auto& t_slot_desc : tschema.slot_descs) {
218
122
        auto* slot_desc = _obj_pool.add(new SlotDescriptor(t_slot_desc));
219
122
        _tuple_desc->add_slot(slot_desc);
220
122
        std::string is_null_str = slot_desc->is_nullable() ? "true" : "false";
221
122
        std::string data_type_str = std::to_string(int64_t(slot_desc->col_type()));
222
122
        slots_map.emplace(to_lower(slot_desc->col_name()) + "+" + data_type_str + is_null_str,
223
122
                          slot_desc);
224
122
    }
225
226
37
    for (const auto& t_index : tschema.indexes) {
227
37
        std::unordered_map<std::string, int32_t> index_slots_map;
228
37
        auto* index = _obj_pool.add(new OlapTableIndexSchema());
229
37
        index->index_id = t_index.id;
230
37
        index->schema_hash = t_index.schema_hash;
231
37
        for (const auto& tcolumn_desc : t_index.columns_desc) {
232
0
            if (!_is_partial_update ||
233
0
                _partial_update_input_columns.contains(tcolumn_desc.column_name)) {
234
0
                std::string is_null_str = tcolumn_desc.is_allow_null ? "true" : "false";
235
0
                std::string data_type_str =
236
0
                        std::to_string(int64_t(thrift_to_type(tcolumn_desc.column_type.type)));
237
0
                auto it = slots_map.find(to_lower(tcolumn_desc.column_name) + "+" + data_type_str +
238
0
                                         is_null_str);
239
0
                if (it == slots_map.end()) {
240
0
                    return Status::InternalError("unknown index column, column={}, type={}",
241
0
                                                 tcolumn_desc.column_name,
242
0
                                                 tcolumn_desc.column_type.type);
243
0
                }
244
0
                index->slots.emplace_back(it->second);
245
0
            }
246
0
            index_slots_map.emplace(to_lower(tcolumn_desc.column_name), tcolumn_desc.col_unique_id);
247
0
            TabletColumn* tc = _obj_pool.add(new TabletColumn());
248
0
            tc->init_from_thrift(tcolumn_desc);
249
0
            index->columns.emplace_back(tc);
250
0
        }
251
37
        if (t_index.__isset.indexes_desc) {
252
0
            for (const auto& tindex_desc : t_index.indexes_desc) {
253
0
                std::vector<int32_t> column_unique_ids(tindex_desc.columns.size());
254
0
                for (size_t i = 0; i < tindex_desc.columns.size(); i++) {
255
0
                    auto it = index_slots_map.find(to_lower(tindex_desc.columns[i]));
256
0
                    if (it != index_slots_map.end()) {
257
0
                        column_unique_ids[i] = it->second;
258
0
                    }
259
0
                }
260
0
                TabletIndex* ti = _obj_pool.add(new TabletIndex());
261
0
                ti->init_from_thrift(tindex_desc, column_unique_ids);
262
0
                index->indexes.emplace_back(ti);
263
0
            }
264
0
        }
265
37
        if (t_index.__isset.where_clause) {
266
0
            RETURN_IF_ERROR(
267
0
                    vectorized::VExpr::create_expr_tree(t_index.where_clause, index->where_clause));
268
0
        }
269
37
        _indexes.emplace_back(index);
270
37
    }
271
272
21
    std::sort(_indexes.begin(), _indexes.end(),
273
32
              [](const OlapTableIndexSchema* lhs, const OlapTableIndexSchema* rhs) {
274
32
                  return lhs->index_id < rhs->index_id;
275
32
              });
276
21
    return Status::OK();
277
21
}
278
279
21
void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const {
280
21
    pschema->set_db_id(_db_id);
281
21
    pschema->set_table_id(_table_id);
282
21
    pschema->set_version(_version);
283
21
    pschema->set_partial_update(_is_partial_update);
284
21
    pschema->set_is_strict_mode(_is_strict_mode);
285
21
    pschema->set_auto_increment_column(_auto_increment_column);
286
21
    pschema->set_auto_increment_column_unique_id(_auto_increment_column_unique_id);
287
21
    pschema->set_timestamp_ms(_timestamp_ms);
288
21
    pschema->set_timezone(_timezone);
289
21
    pschema->set_nano_seconds(_nano_seconds);
290
21
    for (auto col : _partial_update_input_columns) {
291
0
        *pschema->add_partial_update_input_columns() = col;
292
0
    }
293
21
    _tuple_desc->to_protobuf(pschema->mutable_tuple_desc());
294
122
    for (auto* slot : _tuple_desc->slots()) {
295
122
        slot->to_protobuf(pschema->add_slot_descs());
296
122
    }
297
37
    for (auto* index : _indexes) {
298
37
        index->to_protobuf(pschema->add_indexes());
299
37
    }
300
21
}
301
302
0
std::string OlapTableSchemaParam::debug_string() const {
303
0
    std::stringstream ss;
304
0
    ss << "tuple_desc=" << _tuple_desc->debug_string();
305
0
    return ss.str();
306
0
}
307
308
VOlapTablePartitionParam::VOlapTablePartitionParam(std::shared_ptr<OlapTableSchemaParam>& schema,
309
                                                   const TOlapTablePartitionParam& t_param)
310
        : _schema(schema),
311
          _t_param(t_param),
312
          _slots(_schema->tuple_desc()->slots()),
313
          _mem_tracker(std::make_unique<MemTracker>("OlapTablePartitionParam")),
314
5
          _part_type(t_param.partition_type) {
315
5
    if (t_param.__isset.enable_automatic_partition && t_param.enable_automatic_partition) {
316
0
        _is_auto_partition = true;
317
0
        auto size = t_param.partition_function_exprs.size();
318
0
        _part_func_ctx.resize(size);
319
0
        _partition_function.resize(size);
320
0
        DCHECK((t_param.partition_type == TPartitionType::RANGE_PARTITIONED && size == 1) ||
321
0
               (t_param.partition_type == TPartitionType::LIST_PARTITIONED && size >= 1))
322
0
                << "now support only 1 partition column for auto range partitions. "
323
0
                << t_param.partition_type << " " << size;
324
0
        for (int i = 0; i < size; ++i) {
325
0
            Status st = vectorized::VExpr::create_expr_tree(t_param.partition_function_exprs[i],
326
0
                                                            _part_func_ctx[i]);
327
0
            if (!st.ok()) {
328
0
                throw Exception(Status::InternalError("Partition function expr is not valid"),
329
0
                                "Partition function expr is not valid");
330
0
            }
331
0
            _partition_function[i] = _part_func_ctx[i]->root();
332
0
        }
333
0
    }
334
335
5
    if (t_param.__isset.enable_auto_detect_overwrite && t_param.enable_auto_detect_overwrite) {
336
0
        _is_auto_detect_overwrite = true;
337
0
        DCHECK(t_param.__isset.overwrite_group_id);
338
0
        _overwrite_group_id = t_param.overwrite_group_id;
339
0
    }
340
341
5
    if (_is_auto_partition) {
342
        // the nullable mode depends on partition_exprs. not column slots. so use them.
343
0
        DCHECK(_partition_function.size() <= _slots.size())
344
0
                << _partition_function.size() << ", " << _slots.size();
345
346
        // suppose (k0, [k1], [k2]), so get [k1, 0], [k2, 1]
347
0
        std::map<std::string, int> partition_slots_map; // name to idx in part_exprs
348
0
        for (size_t i = 0; i < t_param.partition_columns.size(); i++) {
349
0
            partition_slots_map.emplace(t_param.partition_columns[i], i);
350
0
        }
351
352
        // here we rely on the same order and number of the _part_funcs and _slots in the prefix
353
        // _part_block contains all slots of table.
354
0
        for (auto* slot : _slots) {
355
            // try to replace with partition expr.
356
0
            if (auto it = partition_slots_map.find(slot->col_name());
357
0
                it != partition_slots_map.end()) { // it's a partition column slot
358
0
                auto& expr_type = _partition_function[it->second]->data_type();
359
0
                _partition_block.insert({expr_type->create_column(), expr_type, slot->col_name()});
360
0
            } else {
361
0
                _partition_block.insert({slot->get_empty_mutable_column(),
362
0
                                         slot->get_data_type_ptr(), slot->col_name()});
363
0
            }
364
0
        }
365
0
        VLOG_TRACE << _partition_block.dump_structure();
366
5
    } else {
367
        // we insert all. but not all will be used. it will controlled by _partition_slot_locs
368
26
        for (auto* slot : _slots) {
369
26
            _partition_block.insert({slot->get_empty_mutable_column(), slot->get_data_type_ptr(),
370
26
                                     slot->col_name()});
371
26
        }
372
5
    }
373
5
}
374
375
5
VOlapTablePartitionParam::~VOlapTablePartitionParam() {
376
5
    _mem_tracker->release(_mem_usage);
377
5
}
378
379
5
Status VOlapTablePartitionParam::init() {
380
5
    std::vector<std::string> slot_column_names;
381
26
    for (auto* slot_desc : _schema->tuple_desc()->slots()) {
382
26
        slot_column_names.emplace_back(slot_desc->col_name());
383
26
    }
384
385
5
    auto find_slot_locs = [&slot_column_names](const std::string& slot_name,
386
5
                                               std::vector<uint16_t>& locs,
387
9
                                               const std::string& column_type) {
388
9
        auto it = std::find(slot_column_names.begin(), slot_column_names.end(), slot_name);
389
9
        if (it == slot_column_names.end()) {
390
0
            return Status::InternalError("{} column not found, column ={}", column_type, slot_name);
391
0
        }
392
9
        locs.emplace_back(it - slot_column_names.begin());
393
9
        return Status::OK();
394
9
    };
395
396
    // here we find the partition columns. others maybe non-partition columns/special columns.
397
5
    if (_t_param.__isset.partition_columns) {
398
0
        for (auto& part_col : _t_param.partition_columns) {
399
0
            RETURN_IF_ERROR(find_slot_locs(part_col, _partition_slot_locs, "partition"));
400
0
        }
401
0
    }
402
403
5
    _partitions_map = std::make_unique<
404
5
            std::map<BlockRowWithIndicator, VOlapTablePartition*, VOlapTablePartKeyComparator>>(
405
5
            VOlapTablePartKeyComparator(_partition_slot_locs, _transformed_slot_locs));
406
5
    if (_t_param.__isset.distributed_columns) {
407
9
        for (auto& col : _t_param.distributed_columns) {
408
9
            RETURN_IF_ERROR(find_slot_locs(col, _distributed_slot_locs, "distributed"));
409
9
        }
410
5
    }
411
412
    // for both auto/non-auto partition table.
413
5
    _is_in_partition = _part_type == TPartitionType::type::LIST_PARTITIONED;
414
415
    // initial partitions. if meet dummy partitions only for open BE nodes, not generate key of them for finding
416
5
    for (const auto& t_part : _t_param.partitions) {
417
5
        VOlapTablePartition* part = nullptr;
418
5
        RETURN_IF_ERROR(generate_partition_from(t_part, part));
419
5
        _partitions.emplace_back(part);
420
421
5
        if (!_t_param.partitions_is_fake) {
422
5
            if (_is_in_partition) {
423
0
                for (auto& in_key : part->in_keys) {
424
0
                    _partitions_map->emplace(std::tuple {in_key.first, in_key.second, false}, part);
425
0
                }
426
5
            } else {
427
5
                _partitions_map->emplace(
428
5
                        std::tuple {part->end_key.first, part->end_key.second, false}, part);
429
5
            }
430
5
        }
431
5
    }
432
433
5
    _mem_usage = _partition_block.allocated_bytes();
434
5
    _mem_tracker->consume(_mem_usage);
435
5
    return Status::OK();
436
5
}
437
438
bool VOlapTablePartitionParam::_part_contains(VOlapTablePartition* part,
439
15
                                              BlockRowWithIndicator key) const {
440
15
    VOlapTablePartKeyComparator comparator(_partition_slot_locs, _transformed_slot_locs);
441
    // we have used upper_bound to find to ensure key < part.right and this part is closest(right - key is min)
442
    // now we only have to check (key >= part.left). the comparator(a,b) means a < b, so we use anti
443
15
    return part->start_key.second == -1 /* spj: start_key.second == -1 means only single partition*/
444
15
           || !comparator(key, std::tuple {part->start_key.first, part->start_key.second, false});
445
15
}
446
447
// insert value into _partition_block's column
448
// NOLINTBEGIN(readability-function-size)
449
0
static Status _create_partition_key(const TExprNode& t_expr, BlockRow* part_key, uint16_t pos) {
450
0
    auto column = std::move(*part_key->first->get_by_position(pos).column).mutate();
451
    //TODO: use assert_cast before insert_data
452
0
    switch (t_expr.node_type) {
453
0
    case TExprNodeType::DATE_LITERAL: {
454
0
        if (TypeDescriptor::from_thrift(t_expr.type).is_date_v2_type()) {
455
0
            DateV2Value<DateV2ValueType> dt;
456
0
            if (!dt.from_date_str(t_expr.date_literal.value.c_str(),
457
0
                                  t_expr.date_literal.value.size())) {
458
0
                std::stringstream ss;
459
0
                ss << "invalid date literal in partition column, date=" << t_expr.date_literal;
460
0
                return Status::InternalError(ss.str());
461
0
            }
462
0
            column->insert_data(reinterpret_cast<const char*>(&dt), 0);
463
0
        } else if (TypeDescriptor::from_thrift(t_expr.type).is_datetime_v2_type()) {
464
0
            DateV2Value<DateTimeV2ValueType> dt;
465
0
            const int32_t scale =
466
0
                    t_expr.type.types.empty() ? -1 : t_expr.type.types.front().scalar_type.scale;
467
0
            if (!dt.from_date_str(t_expr.date_literal.value.c_str(),
468
0
                                  t_expr.date_literal.value.size(), scale)) {
469
0
                std::stringstream ss;
470
0
                ss << "invalid date literal in partition column, date=" << t_expr.date_literal;
471
0
                return Status::InternalError(ss.str());
472
0
            }
473
0
            column->insert_data(reinterpret_cast<const char*>(&dt), 0);
474
0
        } else {
475
0
            VecDateTimeValue dt;
476
0
            if (!dt.from_date_str(t_expr.date_literal.value.c_str(),
477
0
                                  t_expr.date_literal.value.size())) {
478
0
                std::stringstream ss;
479
0
                ss << "invalid date literal in partition column, date=" << t_expr.date_literal;
480
0
                return Status::InternalError(ss.str());
481
0
            }
482
0
            column->insert_data(reinterpret_cast<const char*>(&dt), 0);
483
0
        }
484
0
        break;
485
0
    }
486
0
    case TExprNodeType::INT_LITERAL: {
487
0
        switch (t_expr.type.types[0].scalar_type.type) {
488
0
        case TPrimitiveType::TINYINT: {
489
0
            int8_t value = t_expr.int_literal.value;
490
0
            column->insert_data(reinterpret_cast<const char*>(&value), 0);
491
0
            break;
492
0
        }
493
0
        case TPrimitiveType::SMALLINT: {
494
0
            int16_t value = t_expr.int_literal.value;
495
0
            column->insert_data(reinterpret_cast<const char*>(&value), 0);
496
0
            break;
497
0
        }
498
0
        case TPrimitiveType::INT: {
499
0
            int32_t value = t_expr.int_literal.value;
500
0
            column->insert_data(reinterpret_cast<const char*>(&value), 0);
501
0
            break;
502
0
        }
503
0
        default:
504
0
            int64_t value = t_expr.int_literal.value;
505
0
            column->insert_data(reinterpret_cast<const char*>(&value), 0);
506
0
        }
507
0
        break;
508
0
    }
509
0
    case TExprNodeType::LARGE_INT_LITERAL: {
510
0
        StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS;
511
0
        auto value = StringParser::string_to_int<__int128>(t_expr.large_int_literal.value.c_str(),
512
0
                                                           t_expr.large_int_literal.value.size(),
513
0
                                                           &parse_result);
514
0
        if (parse_result != StringParser::PARSE_SUCCESS) {
515
0
            value = MAX_INT128;
516
0
        }
517
0
        column->insert_data(reinterpret_cast<const char*>(&value), 0);
518
0
        break;
519
0
    }
520
0
    case TExprNodeType::STRING_LITERAL: {
521
0
        int len = t_expr.string_literal.value.size();
522
0
        const char* str_val = t_expr.string_literal.value.c_str();
523
0
        column->insert_data(str_val, len);
524
0
        break;
525
0
    }
526
0
    case TExprNodeType::BOOL_LITERAL: {
527
0
        column->insert_data(reinterpret_cast<const char*>(&t_expr.bool_literal.value), 0);
528
0
        break;
529
0
    }
530
0
    case TExprNodeType::NULL_LITERAL: {
531
        // insert a null literal
532
0
        column->insert_data(nullptr, 0);
533
0
        break;
534
0
    }
535
0
    default: {
536
0
        return Status::InternalError("unsupported partition column node type, type={}",
537
0
                                     t_expr.node_type);
538
0
    }
539
0
    }
540
0
    part_key->second = column->size() - 1;
541
0
    return Status::OK();
542
0
}
543
// NOLINTEND(readability-function-size)
544
545
Status VOlapTablePartitionParam::_create_partition_keys(const std::vector<TExprNode>& t_exprs,
546
0
                                                        BlockRow* part_key) {
547
0
    for (int i = 0; i < t_exprs.size(); i++) {
548
0
        RETURN_IF_ERROR(_create_partition_key(t_exprs[i], part_key, _partition_slot_locs[i]));
549
0
    }
550
0
    return Status::OK();
551
0
}
552
553
Status VOlapTablePartitionParam::generate_partition_from(const TOlapTablePartition& t_part,
554
5
                                                         VOlapTablePartition*& part_result) {
555
5
    DCHECK(part_result == nullptr);
556
    // here we set the default value of partition bounds first! if it doesn't have some key, it will be -1.
557
5
    part_result = _obj_pool.add(new VOlapTablePartition(&_partition_block));
558
5
    part_result->id = t_part.id;
559
5
    part_result->is_mutable = t_part.is_mutable;
560
    // only load_to_single_tablet = true will set load_tablet_idx
561
5
    if (t_part.__isset.load_tablet_idx) {
562
0
        part_result->load_tablet_idx = t_part.load_tablet_idx;
563
0
    }
564
565
5
    if (_is_in_partition) {
566
0
        for (const auto& keys : t_part.in_keys) {
567
0
            RETURN_IF_ERROR(_create_partition_keys(
568
0
                    keys, &part_result->in_keys.emplace_back(&_partition_block, -1)));
569
0
        }
570
0
        if (t_part.__isset.is_default_partition && t_part.is_default_partition &&
571
0
            _default_partition == nullptr) {
572
0
            _default_partition = part_result;
573
0
        }
574
5
    } else { // range
575
5
        if (t_part.__isset.start_keys) {
576
0
            RETURN_IF_ERROR(_create_partition_keys(t_part.start_keys, &part_result->start_key));
577
0
        }
578
        // we generate the right bound but not insert into partition map
579
5
        if (t_part.__isset.end_keys) {
580
0
            RETURN_IF_ERROR(_create_partition_keys(t_part.end_keys, &part_result->end_key));
581
0
        }
582
5
    }
583
584
5
    part_result->num_buckets = t_part.num_buckets;
585
5
    auto num_indexes = _schema->indexes().size();
586
5
    if (t_part.indexes.size() != num_indexes) {
587
0
        return Status::InternalError(
588
0
                "number of partition's index is not equal with schema's"
589
0
                ", num_part_indexes={}, num_schema_indexes={}",
590
0
                t_part.indexes.size(), num_indexes);
591
0
    }
592
5
    part_result->indexes = t_part.indexes;
593
5
    std::sort(part_result->indexes.begin(), part_result->indexes.end(),
594
5
              [](const OlapTableIndexTablets& lhs, const OlapTableIndexTablets& rhs) {
595
0
                  return lhs.index_id < rhs.index_id;
596
0
              });
597
    // check index
598
10
    for (int j = 0; j < num_indexes; ++j) {
599
5
        if (part_result->indexes[j].index_id != _schema->indexes()[j]->index_id) {
600
0
            return Status::InternalError(
601
0
                    "partition's index is not equal with schema's"
602
0
                    ", part_index={}, schema_index={}",
603
0
                    part_result->indexes[j].index_id, _schema->indexes()[j]->index_id);
604
0
        }
605
5
    }
606
5
    return Status::OK();
607
5
}
608
609
Status VOlapTablePartitionParam::add_partitions(
610
0
        const std::vector<TOlapTablePartition>& partitions) {
611
0
    for (const auto& t_part : partitions) {
612
0
        auto* part = _obj_pool.add(new VOlapTablePartition(&_partition_block));
613
0
        part->id = t_part.id;
614
0
        part->is_mutable = t_part.is_mutable;
615
616
        // we dont pass right keys when it's MAX_VALUE. so there's possibility we only have start_key but not end_key
617
        // range partition
618
0
        if (t_part.__isset.start_keys) {
619
0
            RETURN_IF_ERROR(_create_partition_keys(t_part.start_keys, &part->start_key));
620
0
        }
621
0
        if (t_part.__isset.end_keys) {
622
0
            RETURN_IF_ERROR(_create_partition_keys(t_part.end_keys, &part->end_key));
623
0
        }
624
        // list partition - we only set 1 value in 1 partition for new created ones
625
0
        if (t_part.__isset.in_keys) {
626
0
            for (const auto& keys : t_part.in_keys) {
627
0
                RETURN_IF_ERROR(_create_partition_keys(
628
0
                        keys, &part->in_keys.emplace_back(&_partition_block, -1)));
629
0
            }
630
0
            if (t_part.__isset.is_default_partition && t_part.is_default_partition) {
631
0
                _default_partition = part;
632
0
            }
633
0
        }
634
635
0
        part->num_buckets = t_part.num_buckets;
636
0
        auto num_indexes = _schema->indexes().size();
637
0
        if (t_part.indexes.size() != num_indexes) {
638
0
            return Status::InternalError(
639
0
                    "number of partition's index is not equal with schema's"
640
0
                    ", num_part_indexes={}, num_schema_indexes={}",
641
0
                    t_part.indexes.size(), num_indexes);
642
0
        }
643
0
        part->indexes = t_part.indexes;
644
0
        std::sort(part->indexes.begin(), part->indexes.end(),
645
0
                  [](const OlapTableIndexTablets& lhs, const OlapTableIndexTablets& rhs) {
646
0
                      return lhs.index_id < rhs.index_id;
647
0
                  });
648
        // check index
649
0
        for (int j = 0; j < num_indexes; ++j) {
650
0
            if (part->indexes[j].index_id != _schema->indexes()[j]->index_id) {
651
0
                return Status::InternalError(
652
0
                        "partition's index is not equal with schema's"
653
0
                        ", part_index={}, schema_index={}",
654
0
                        part->indexes[j].index_id, _schema->indexes()[j]->index_id);
655
0
            }
656
0
        }
657
0
        _partitions.emplace_back(part);
658
        // after _creating_partiton_keys
659
0
        if (_is_in_partition) {
660
0
            for (auto& in_key : part->in_keys) {
661
0
                _partitions_map->emplace(std::tuple {in_key.first, in_key.second, false}, part);
662
0
            }
663
0
        } else {
664
0
            _partitions_map->emplace(std::tuple {part->end_key.first, part->end_key.second, false},
665
0
                                     part);
666
0
        }
667
0
    }
668
669
0
    return Status::OK();
670
0
}
671
672
Status VOlapTablePartitionParam::replace_partitions(
673
        std::vector<int64_t>& old_partition_ids,
674
0
        const std::vector<TOlapTablePartition>& new_partitions) {
675
    // remove old replaced partitions
676
0
    DCHECK(old_partition_ids.size() == new_partitions.size());
677
678
    // init and add new partitions. insert into _partitions
679
0
    for (int i = 0; i < new_partitions.size(); i++) {
680
0
        const auto& t_part = new_partitions[i];
681
        // pair old_partition_ids and new_partitions one by one. TODO: sort to opt performance
682
0
        VOlapTablePartition* old_part = nullptr;
683
0
        auto old_part_id = old_partition_ids[i];
684
0
        if (auto it = std::find_if(
685
0
                    _partitions.begin(), _partitions.end(),
686
0
                    [=](const VOlapTablePartition* lhs) { return lhs->id == old_part_id; });
687
0
            it != _partitions.end()) {
688
0
            old_part = *it;
689
0
        } else {
690
0
            return Status::InternalError("Cannot find old tablet {} in replacing", old_part_id);
691
0
        }
692
693
0
        auto* part = _obj_pool.add(new VOlapTablePartition(&_partition_block));
694
0
        part->id = t_part.id;
695
0
        part->is_mutable = t_part.is_mutable;
696
697
        /// just substitute directly. no need to remove and reinsert keys.
698
        // range partition
699
0
        part->start_key = std::move(old_part->start_key);
700
0
        part->end_key = std::move(old_part->end_key);
701
        // list partition
702
0
        part->in_keys = std::move(old_part->in_keys);
703
0
        if (t_part.__isset.is_default_partition && t_part.is_default_partition) {
704
0
            _default_partition = part;
705
0
        }
706
707
0
        part->num_buckets = t_part.num_buckets;
708
0
        auto num_indexes = _schema->indexes().size();
709
0
        if (t_part.indexes.size() != num_indexes) {
710
0
            return Status::InternalError(
711
0
                    "number of partition's index is not equal with schema's"
712
0
                    ", num_part_indexes={}, num_schema_indexes={}",
713
0
                    t_part.indexes.size(), num_indexes);
714
0
        }
715
0
        part->indexes = t_part.indexes;
716
0
        std::sort(part->indexes.begin(), part->indexes.end(),
717
0
                  [](const OlapTableIndexTablets& lhs, const OlapTableIndexTablets& rhs) {
718
0
                      return lhs.index_id < rhs.index_id;
719
0
                  });
720
        // check index
721
0
        for (int j = 0; j < num_indexes; ++j) {
722
0
            if (part->indexes[j].index_id != _schema->indexes()[j]->index_id) {
723
0
                return Status::InternalError(
724
0
                        "partition's index is not equal with schema's"
725
0
                        ", part_index={}, schema_index={}",
726
0
                        part->indexes[j].index_id, _schema->indexes()[j]->index_id);
727
0
            }
728
0
        }
729
730
        // add new partitions with new id.
731
0
        _partitions.emplace_back(part);
732
0
        VLOG_NOTICE << "params add new partition " << part->id;
733
734
        // replace items in _partition_maps
735
0
        if (_is_in_partition) {
736
0
            for (auto& in_key : part->in_keys) {
737
0
                (*_partitions_map)[std::tuple {in_key.first, in_key.second, false}] = part;
738
0
            }
739
0
        } else {
740
0
            (*_partitions_map)[std::tuple {part->end_key.first, part->end_key.second, false}] =
741
0
                    part;
742
0
        }
743
0
    }
744
    // remove old partitions by id
745
0
    std::ranges::sort(old_partition_ids);
746
0
    for (auto it = _partitions.begin(); it != _partitions.end();) {
747
0
        if (std::ranges::binary_search(old_partition_ids, (*it)->id)) {
748
0
            it = _partitions.erase(it);
749
0
        } else {
750
0
            it++;
751
0
        }
752
0
    }
753
754
0
    return Status::OK();
755
0
}
756
757
} // namespace doris