Coverage Report

Created: 2026-03-14 13:34

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/tablet_info.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/tablet_info.h"
19
20
#include <butil/logging.h>
21
#include <gen_cpp/Descriptors_types.h>
22
#include <gen_cpp/Exprs_types.h>
23
#include <gen_cpp/Partitions_types.h>
24
#include <gen_cpp/Types_types.h>
25
#include <gen_cpp/descriptors.pb.h>
26
#include <gen_cpp/olap_file.pb.h>
27
#include <glog/logging.h>
28
29
#include <algorithm>
30
#include <cstddef>
31
#include <cstdint>
32
#include <memory>
33
#include <ostream>
34
#include <string>
35
#include <tuple>
36
37
#include "common/exception.h"
38
#include "common/logging.h"
39
#include "common/status.h"
40
#include "core/column/column.h"
41
#include "core/data_type/data_type.h"
42
#include "core/data_type/data_type_factory.hpp"
43
#include "core/data_type/define_primitive_type.h"
44
#include "core/data_type/primitive_type.h"
45
#include "core/value/large_int_value.h"
46
#include "runtime/descriptors.h"
47
#include "runtime/memory/mem_tracker.h"
48
#include "storage/tablet/tablet_schema.h"
49
#include "util/raw_value.h"
50
#include "util/string_parser.hpp"
51
#include "util/string_util.h"
52
// NOLINTNEXTLINE(unused-includes)
53
#include "core/value/vdatetime_value.h"
54
#include "exprs/function/cast/cast_to_timestamptz.h"
55
#include "exprs/vexpr_context.h" // IWYU pragma: keep
56
#include "exprs/vliteral.h"
57
58
namespace doris {
59
#include "common/compile_check_begin.h"
60
61
66.3k
void OlapTableIndexSchema::to_protobuf(POlapTableIndexSchema* pindex) const {
62
66.3k
    pindex->set_id(index_id);
63
66.3k
    pindex->set_schema_hash(schema_hash);
64
422k
    for (auto* slot : slots) {
65
422k
        pindex->add_columns(slot->col_name());
66
422k
    }
67
446k
    for (auto* column : columns) {
68
446k
        column->to_schema_pb(pindex->add_columns_desc());
69
446k
    }
70
66.3k
    for (auto* index : indexes) {
71
6.02k
        index->to_schema_pb(pindex->add_indexes_desc());
72
6.02k
    }
73
66.3k
}
74
75
bool VOlapTablePartKeyComparator::operator()(const BlockRowWithIndicator& lhs,
76
61.0M
                                             const BlockRowWithIndicator& rhs) const {
77
61.0M
    Block* l_block = std::get<0>(lhs);
78
61.0M
    Block* r_block = std::get<0>(rhs);
79
61.0M
    int32_t l_row = std::get<1>(lhs);
80
61.0M
    int32_t r_row = std::get<1>(rhs);
81
61.0M
    bool l_use_new = std::get<2>(lhs);
82
61.0M
    bool r_use_new = std::get<2>(rhs);
83
84
18.4E
    VLOG_TRACE << '\n' << l_block->dump_data() << '\n' << r_block->dump_data();
85
86
61.0M
    if (l_row == -1) {
87
176
        return false;
88
61.0M
    } else if (r_row == -1) {
89
33.4M
        return true;
90
33.4M
    }
91
92
27.5M
    if (_param_locs.empty()) { // no transform, use origin column
93
26.8M
        for (auto slot_loc : _slot_locs) {
94
26.8M
            auto res = l_block->get_by_position(slot_loc).column->compare_at(
95
26.8M
                    l_row, r_row, *r_block->get_by_position(slot_loc).column, -1);
96
26.8M
            if (res != 0) {
97
26.5M
                return res < 0;
98
26.5M
            }
99
26.8M
        }
100
26.8M
    } else { // use transformed column to compare
101
18.4E
        DCHECK(_slot_locs.size() == _param_locs.size())
102
18.4E
                << _slot_locs.size() << ' ' << _param_locs.size();
103
104
739k
        const std::vector<uint16_t>* l_index = l_use_new ? &_param_locs : &_slot_locs;
105
18.4E
        const std::vector<uint16_t>* r_index = r_use_new ? &_param_locs : &_slot_locs;
106
107
1.12M
        for (int i = 0; i < _slot_locs.size(); i++) {
108
1.00M
            ColumnPtr l_col = l_block->get_by_position((*l_index)[i]).column;
109
1.00M
            ColumnPtr r_col = r_block->get_by_position((*r_index)[i]).column;
110
111
1.00M
            auto res = l_col->compare_at(l_row, r_row, *r_col, -1);
112
1.00M
            if (res != 0) {
113
620k
                return res < 0;
114
620k
            }
115
1.00M
        }
116
739k
    }
117
118
    // equal, return false
119
391k
    return false;
120
27.5M
}
121
122
29.2k
Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
123
29.2k
    _db_id = pschema.db_id();
124
29.2k
    _table_id = pschema.table_id();
125
29.2k
    _version = pschema.version();
126
29.2k
    if (pschema.has_unique_key_update_mode()) {
127
29.2k
        _unique_key_update_mode = pschema.unique_key_update_mode();
128
29.2k
        if (pschema.has_sequence_map_col_unique_id()) {
129
29.2k
            _sequence_map_col_uid = pschema.sequence_map_col_unique_id();
130
29.2k
        }
131
18.4E
    } else {
132
        // for backward compatibility
133
18.4E
        if (pschema.has_partial_update() && pschema.partial_update()) {
134
0
            _unique_key_update_mode = UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS;
135
18.4E
        } else {
136
18.4E
            _unique_key_update_mode = UniqueKeyUpdateModePB::UPSERT;
137
18.4E
        }
138
18.4E
    }
139
29.2k
    _is_strict_mode = pschema.is_strict_mode();
140
29.2k
    if (_unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
141
2.59k
        _auto_increment_column = pschema.auto_increment_column();
142
2.59k
        if (!_auto_increment_column.empty() && pschema.auto_increment_column_unique_id() == -1) {
143
0
            return Status::InternalError(
144
0
                    "Auto increment column id is not set in FE. Maybe FE is an older version "
145
0
                    "different from BE.");
146
0
        }
147
2.59k
        _auto_increment_column_unique_id = pschema.auto_increment_column_unique_id();
148
2.59k
    }
149
29.2k
    if (_unique_key_update_mode != UniqueKeyUpdateModePB::UPSERT) {
150
2.75k
        if (pschema.has_partial_update_new_key_policy()) {
151
2.75k
            _partial_update_new_row_policy = pschema.partial_update_new_key_policy();
152
2.75k
        }
153
2.75k
    }
154
29.2k
    _timestamp_ms = pschema.timestamp_ms();
155
29.2k
    if (pschema.has_nano_seconds()) {
156
29.2k
        _nano_seconds = pschema.nano_seconds();
157
29.2k
    }
158
29.2k
    _timezone = pschema.timezone();
159
160
29.2k
    for (const auto& col : pschema.partial_update_input_columns()) {
161
16.7k
        _partial_update_input_columns.insert(col);
162
16.7k
    }
163
29.2k
    std::unordered_map<std::string, SlotDescriptor*> slots_map;
164
165
29.2k
    _tuple_desc = _obj_pool.add(new TupleDescriptor(pschema.tuple_desc()));
166
167
267k
    for (const auto& p_slot_desc : pschema.slot_descs()) {
168
267k
        auto* slot_desc = _obj_pool.add(new SlotDescriptor(p_slot_desc));
169
267k
        _tuple_desc->add_slot(slot_desc);
170
267k
        std::string data_type;
171
267k
        EnumToString(TPrimitiveType, to_thrift(slot_desc->col_type()), data_type);
172
267k
        std::string is_null_str = slot_desc->is_nullable() ? "true" : "false";
173
267k
        std::string data_type_str =
174
267k
                std::to_string(int64_t(TabletColumn::get_field_type_by_string(data_type)));
175
267k
        slots_map.emplace(to_lower(slot_desc->col_name()) + "+" + data_type_str + is_null_str,
176
267k
                          slot_desc);
177
267k
    }
178
179
37.1k
    for (const auto& p_index : pschema.indexes()) {
180
37.1k
        auto* index = _obj_pool.add(new OlapTableIndexSchema());
181
37.1k
        index->index_id = p_index.id();
182
37.1k
        index->schema_hash = p_index.schema_hash();
183
290k
        for (const auto& pcolumn_desc : p_index.columns_desc()) {
184
290k
            if (_unique_key_update_mode != UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS ||
185
290k
                _partial_update_input_columns.contains(pcolumn_desc.name())) {
186
269k
                std::string is_null_str = pcolumn_desc.is_nullable() ? "true" : "false";
187
269k
                std::string data_type_str = std::to_string(
188
269k
                        int64_t(TabletColumn::get_field_type_by_string(pcolumn_desc.type())));
189
269k
                auto it = slots_map.find(to_lower(pcolumn_desc.name()) + "+" + data_type_str +
190
269k
                                         is_null_str);
191
269k
                if (it == std::end(slots_map)) {
192
0
                    std::string keys {};
193
0
                    for (const auto& [key, _] : slots_map) {
194
0
                        keys += fmt::format("{},", key);
195
0
                    }
196
0
                    LOG_EVERY_SECOND(WARNING) << fmt::format(
197
0
                            "[OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema)]: "
198
0
                            "unknown index column, column={}, type={}, data_type_str={}, "
199
0
                            "is_null_str={}, slots_map.keys()=[{}], {}\npschema={}",
200
0
                            pcolumn_desc.name(), pcolumn_desc.type(), data_type_str, is_null_str,
201
0
                            keys, debug_string(), pschema.ShortDebugString());
202
203
0
                    return Status::InternalError("unknown index column, column={}, type={}",
204
0
                                                 pcolumn_desc.name(), pcolumn_desc.type());
205
0
                }
206
269k
                index->slots.emplace_back(it->second);
207
269k
            }
208
290k
            TabletColumn* tc = _obj_pool.add(new TabletColumn());
209
290k
            tc->init_from_pb(pcolumn_desc);
210
290k
            index->columns.emplace_back(tc);
211
290k
        }
212
37.1k
        for (const auto& pindex_desc : p_index.indexes_desc()) {
213
5.94k
            TabletIndex* ti = _obj_pool.add(new TabletIndex());
214
5.94k
            ti->init_from_pb(pindex_desc);
215
5.94k
            index->indexes.emplace_back(ti);
216
5.94k
        }
217
37.1k
        _indexes.emplace_back(index);
218
37.1k
    }
219
220
29.2k
    std::sort(_indexes.begin(), _indexes.end(),
221
32.7k
              [](const OlapTableIndexSchema* lhs, const OlapTableIndexSchema* rhs) {
222
32.7k
                  return lhs->index_id < rhs->index_id;
223
32.7k
              });
224
29.2k
    return Status::OK();
225
29.2k
}
226
227
94.4k
Status OlapTableSchemaParam::init_unique_key_update_mode(const TOlapTableSchemaParam& tschema) {
228
94.4k
    if (tschema.__isset.unique_key_update_mode) {
229
94.4k
        switch (tschema.unique_key_update_mode) {
230
87.3k
        case doris::TUniqueKeyUpdateMode::UPSERT: {
231
87.3k
            _unique_key_update_mode = UniqueKeyUpdateModePB::UPSERT;
232
87.3k
            break;
233
0
        }
234
6.87k
        case doris::TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS: {
235
6.87k
            _unique_key_update_mode = UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS;
236
6.87k
            break;
237
0
        }
238
157
        case doris::TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS: {
239
157
            _unique_key_update_mode = UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS;
240
157
            break;
241
0
        }
242
0
        default: {
243
0
            return Status::InternalError(
244
0
                    "Unknown unique_key_update_mode: {}, should be one of "
245
0
                    "UPSERT/UPDATE_FIXED_COLUMNS/UPDATE_FLEXIBLE_COLUMNS",
246
0
                    tschema.unique_key_update_mode);
247
0
        }
248
94.4k
        }
249
94.4k
        if (tschema.__isset.sequence_map_col_unique_id) {
250
94.4k
            _sequence_map_col_uid = tschema.sequence_map_col_unique_id;
251
94.4k
        }
252
94.4k
    } else {
253
        // for backward compatibility
254
4
        if (tschema.__isset.is_partial_update && tschema.is_partial_update) {
255
0
            _unique_key_update_mode = UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS;
256
4
        } else {
257
4
            _unique_key_update_mode = UniqueKeyUpdateModePB::UPSERT;
258
4
        }
259
4
    }
260
94.4k
    return Status::OK();
261
94.4k
}
262
263
94.4k
Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) {
264
94.4k
    _db_id = tschema.db_id;
265
94.4k
    _table_id = tschema.table_id;
266
94.4k
    _version = tschema.version;
267
94.4k
    RETURN_IF_ERROR(init_unique_key_update_mode(tschema));
268
94.4k
    if (tschema.__isset.is_strict_mode) {
269
94.4k
        _is_strict_mode = tschema.is_strict_mode;
270
94.4k
    }
271
94.4k
    if (_unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
272
6.86k
        _auto_increment_column = tschema.auto_increment_column;
273
6.86k
        if (!_auto_increment_column.empty() && tschema.auto_increment_column_unique_id == -1) {
274
0
            return Status::InternalError(
275
0
                    "Auto increment column id is not set in FE. Maybe FE is an older version "
276
0
                    "different from BE.");
277
0
        }
278
6.86k
        _auto_increment_column_unique_id = tschema.auto_increment_column_unique_id;
279
6.86k
    }
280
281
94.4k
    if (_unique_key_update_mode != UniqueKeyUpdateModePB::UPSERT) {
282
7.02k
        if (tschema.__isset.partial_update_new_key_policy) {
283
7.02k
            switch (tschema.partial_update_new_key_policy) {
284
6.96k
            case doris::TPartialUpdateNewRowPolicy::APPEND: {
285
6.96k
                _partial_update_new_row_policy = PartialUpdateNewRowPolicyPB::APPEND;
286
6.96k
                break;
287
0
            }
288
65
            case doris::TPartialUpdateNewRowPolicy::ERROR: {
289
65
                _partial_update_new_row_policy = PartialUpdateNewRowPolicyPB::ERROR;
290
65
                break;
291
0
            }
292
0
            default: {
293
0
                return Status::InvalidArgument(
294
0
                        "Unknown partial_update_new_key_behavior: {}, should be one of "
295
0
                        "'APPEND' or 'ERROR'",
296
0
                        tschema.partial_update_new_key_policy);
297
0
            }
298
7.02k
            }
299
7.02k
        }
300
7.02k
    }
301
302
94.4k
    for (const auto& tcolumn : tschema.partial_update_input_columns) {
303
38.2k
        _partial_update_input_columns.insert(tcolumn);
304
38.2k
    }
305
94.4k
    std::unordered_map<std::string, SlotDescriptor*> slots_map;
306
94.4k
    _tuple_desc = _obj_pool.add(new TupleDescriptor(tschema.tuple_desc));
307
652k
    for (const auto& t_slot_desc : tschema.slot_descs) {
308
652k
        auto* slot_desc = _obj_pool.add(new SlotDescriptor(t_slot_desc));
309
652k
        _tuple_desc->add_slot(slot_desc);
310
652k
        std::string is_null_str = slot_desc->is_nullable() ? "true" : "false";
311
652k
        std::string data_type_str = std::to_string(int64_t(slot_desc->col_type()));
312
652k
        slots_map.emplace(to_lower(slot_desc->col_name()) + "+" + data_type_str + is_null_str,
313
652k
                          slot_desc);
314
652k
    }
315
316
96.2k
    for (const auto& t_index : tschema.indexes) {
317
96.2k
        std::unordered_map<std::string, int32_t> index_slots_map;
318
96.2k
        auto* index = _obj_pool.add(new OlapTableIndexSchema());
319
96.2k
        index->index_id = t_index.id;
320
96.2k
        index->schema_hash = t_index.schema_hash;
321
702k
        for (const auto& tcolumn_desc : t_index.columns_desc) {
322
702k
            if (_unique_key_update_mode != UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS ||
323
702k
                _partial_update_input_columns.contains(tcolumn_desc.column_name)) {
324
655k
                std::string is_null_str = tcolumn_desc.is_allow_null ? "true" : "false";
325
655k
                std::string data_type_str =
326
655k
                        std::to_string(int64_t(thrift_to_type(tcolumn_desc.column_type.type)));
327
655k
                auto it = slots_map.find(to_lower(tcolumn_desc.column_name) + "+" + data_type_str +
328
655k
                                         is_null_str);
329
655k
                if (it == slots_map.end()) {
330
0
                    std::stringstream ss;
331
0
                    ss << tschema;
332
0
                    std::string keys {};
333
0
                    for (const auto& [key, _] : slots_map) {
334
0
                        keys += fmt::format("{},", key);
335
0
                    }
336
0
                    LOG_EVERY_SECOND(WARNING) << fmt::format(
337
0
                            "[OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema)]: "
338
0
                            "unknown index column, column={}, type={}, data_type_str={}, "
339
0
                            "is_null_str={}, slots_map.keys()=[{}], {}\ntschema={}",
340
0
                            tcolumn_desc.column_name, tcolumn_desc.column_type.type, data_type_str,
341
0
                            is_null_str, keys, debug_string(), ss.str());
342
0
                    return Status::InternalError("unknown index column, column={}, type={}",
343
0
                                                 tcolumn_desc.column_name,
344
0
                                                 tcolumn_desc.column_type.type);
345
0
                }
346
655k
                index->slots.emplace_back(it->second);
347
655k
            }
348
702k
            index_slots_map.emplace(to_lower(tcolumn_desc.column_name), tcolumn_desc.col_unique_id);
349
702k
            TabletColumn* tc = _obj_pool.add(new TabletColumn());
350
702k
            tc->init_from_thrift(tcolumn_desc);
351
702k
            index->columns.emplace_back(tc);
352
702k
        }
353
96.2k
        if (t_index.__isset.indexes_desc) {
354
96.1k
            for (const auto& tindex_desc : t_index.indexes_desc) {
355
10.6k
                std::vector<int32_t> column_unique_ids(tindex_desc.columns.size());
356
21.3k
                for (size_t i = 0; i < tindex_desc.columns.size(); i++) {
357
10.6k
                    auto it = index_slots_map.find(to_lower(tindex_desc.columns[i]));
358
10.6k
                    if (it != index_slots_map.end()) {
359
10.6k
                        column_unique_ids[i] = it->second;
360
10.6k
                    }
361
10.6k
                }
362
10.6k
                TabletIndex* ti = _obj_pool.add(new TabletIndex());
363
10.6k
                ti->init_from_thrift(tindex_desc, column_unique_ids);
364
10.6k
                index->indexes.emplace_back(ti);
365
10.6k
            }
366
96.1k
        }
367
96.2k
        if (t_index.__isset.where_clause) {
368
66
            RETURN_IF_ERROR(VExpr::create_expr_tree(t_index.where_clause, index->where_clause));
369
66
        }
370
96.2k
        _indexes.emplace_back(index);
371
96.2k
    }
372
373
94.4k
    std::sort(_indexes.begin(), _indexes.end(),
374
94.4k
              [](const OlapTableIndexSchema* lhs, const OlapTableIndexSchema* rhs) {
375
5.01k
                  return lhs->index_id < rhs->index_id;
376
5.01k
              });
377
94.4k
    return Status::OK();
378
94.4k
}
379
380
65.2k
void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const {
381
65.2k
    pschema->set_db_id(_db_id);
382
65.2k
    pschema->set_table_id(_table_id);
383
65.2k
    pschema->set_version(_version);
384
65.2k
    pschema->set_unique_key_update_mode(_unique_key_update_mode);
385
65.2k
    if (_unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
386
        // for backward compatibility
387
3.41k
        pschema->set_partial_update(true);
388
3.41k
    }
389
65.2k
    pschema->set_partial_update_new_key_policy(_partial_update_new_row_policy);
390
65.2k
    pschema->set_is_strict_mode(_is_strict_mode);
391
65.2k
    pschema->set_auto_increment_column(_auto_increment_column);
392
65.2k
    pschema->set_auto_increment_column_unique_id(_auto_increment_column_unique_id);
393
65.2k
    pschema->set_timestamp_ms(_timestamp_ms);
394
65.2k
    pschema->set_timezone(_timezone);
395
65.2k
    pschema->set_nano_seconds(_nano_seconds);
396
65.2k
    pschema->set_sequence_map_col_unique_id(_sequence_map_col_uid);
397
65.2k
    for (auto col : _partial_update_input_columns) {
398
19.1k
        *pschema->add_partial_update_input_columns() = col;
399
19.1k
    }
400
65.2k
    _tuple_desc->to_protobuf(pschema->mutable_tuple_desc());
401
420k
    for (auto* slot : _tuple_desc->slots()) {
402
420k
        slot->to_protobuf(pschema->add_slot_descs());
403
420k
    }
404
66.4k
    for (auto* index : _indexes) {
405
66.4k
        index->to_protobuf(pschema->add_indexes());
406
66.4k
    }
407
65.2k
}
408
409
0
std::string OlapTableSchemaParam::debug_string() const {
410
0
    std::stringstream ss;
411
0
    ss << "tuple_desc=" << _tuple_desc->debug_string();
412
0
    return ss.str();
413
0
}
414
415
VOlapTablePartitionParam::VOlapTablePartitionParam(std::shared_ptr<OlapTableSchemaParam>& schema,
416
                                                   const TOlapTablePartitionParam& t_param)
417
68.1k
        : _schema(schema),
418
68.1k
          _t_param(t_param),
419
68.1k
          _slots(_schema->tuple_desc()->slots()),
420
68.1k
          _mem_tracker(std::make_unique<MemTracker>("OlapTablePartitionParam")),
421
68.1k
          _part_type(t_param.partition_type) {
422
68.1k
    if (t_param.__isset.enable_automatic_partition && t_param.enable_automatic_partition) {
423
332
        _is_auto_partition = true;
424
332
        auto size = t_param.partition_function_exprs.size();
425
332
        _part_func_ctx.resize(size);
426
332
        _partition_function.resize(size);
427
18.4E
        DCHECK((t_param.partition_type == TPartitionType::RANGE_PARTITIONED && size == 1) ||
428
18.4E
               (t_param.partition_type == TPartitionType::LIST_PARTITIONED && size >= 1))
429
18.4E
                << "now support only 1 partition column for auto range partitions. "
430
18.4E
                << t_param.partition_type << " " << size;
431
676
        for (int i = 0; i < size; ++i) {
432
344
            Status st =
433
344
                    VExpr::create_expr_tree(t_param.partition_function_exprs[i], _part_func_ctx[i]);
434
344
            if (!st.ok()) {
435
0
                throw Exception(Status::InternalError("Partition function expr is not valid"),
436
0
                                "Partition function expr is not valid");
437
0
            }
438
344
            _partition_function[i] = _part_func_ctx[i]->root();
439
344
        }
440
332
    }
441
442
68.1k
    if (t_param.__isset.enable_auto_detect_overwrite && t_param.enable_auto_detect_overwrite) {
443
120
        _is_auto_detect_overwrite = true;
444
120
        DCHECK(t_param.__isset.overwrite_group_id);
445
120
        _overwrite_group_id = t_param.overwrite_group_id;
446
120
    }
447
448
68.1k
    if (t_param.__isset.master_address) {
449
0
        _master_address = std::make_shared<TNetworkAddress>(t_param.master_address);
450
0
    }
451
452
68.1k
    if (_is_auto_partition) {
453
        // the nullable mode depends on partition_exprs. not column slots. so use them.
454
18.4E
        DCHECK(_partition_function.size() <= _slots.size())
455
18.4E
                << _partition_function.size() << ", " << _slots.size();
456
457
        // suppose (k0, [k1], [k2]), so get [k1, 0], [k2, 1]
458
333
        std::map<std::string, int> partition_slots_map; // name to idx in part_exprs
459
676
        for (size_t i = 0; i < t_param.partition_columns.size(); i++) {
460
343
            partition_slots_map.emplace(t_param.partition_columns[i], i);
461
343
        }
462
463
        // here we rely on the same order and number of the _part_funcs and _slots in the prefix
464
        // _part_block contains all slots of table.
465
1.31k
        for (auto* slot : _slots) {
466
            // try to replace with partition expr.
467
1.31k
            if (auto it = partition_slots_map.find(slot->col_name());
468
1.31k
                it != partition_slots_map.end()) { // it's a partition column slot
469
345
                auto& expr_type = _partition_function[it->second]->data_type();
470
345
                _partition_block.insert({expr_type->create_column(), expr_type, slot->col_name()});
471
967
            } else {
472
967
                _partition_block.insert({slot->get_empty_mutable_column(),
473
967
                                         slot->get_data_type_ptr(), slot->col_name()});
474
967
            }
475
1.31k
        }
476
18.4E
        VLOG_TRACE << _partition_block.dump_structure();
477
67.7k
    } else {
478
        // we insert all. but not all will be used. it will controlled by _partition_slot_locs
479
437k
        for (auto* slot : _slots) {
480
437k
            _partition_block.insert({slot->get_empty_mutable_column(), slot->get_data_type_ptr(),
481
437k
                                     slot->col_name()});
482
437k
        }
483
67.7k
    }
484
68.1k
}
485
486
68.2k
VOlapTablePartitionParam::~VOlapTablePartitionParam() {
487
68.2k
    _mem_tracker->release(_mem_usage);
488
68.2k
}
489
490
67.8k
Status VOlapTablePartitionParam::init() {
491
67.8k
    std::vector<std::string> slot_column_names;
492
439k
    for (auto* slot_desc : _schema->tuple_desc()->slots()) {
493
439k
        slot_column_names.emplace_back(slot_desc->col_name());
494
439k
    }
495
496
67.8k
    auto find_slot_locs = [&slot_column_names](const std::string& slot_name,
497
67.8k
                                               std::vector<uint16_t>& locs,
498
72.7k
                                               const std::string& column_type) {
499
72.7k
        auto it = std::find(slot_column_names.begin(), slot_column_names.end(), slot_name);
500
72.7k
        if (it == slot_column_names.end()) {
501
0
            return Status::InternalError("{} column not found, column ={}", column_type, slot_name);
502
0
        }
503
72.7k
        locs.emplace_back(it - slot_column_names.begin());
504
72.7k
        return Status::OK();
505
72.7k
    };
506
507
    // here we find the partition columns. others maybe non-partition columns/special columns.
508
67.8k
    if (_t_param.__isset.partition_columns) {
509
6.92k
        for (auto& part_col : _t_param.partition_columns) {
510
6.92k
            RETURN_IF_ERROR(find_slot_locs(part_col, _partition_slot_locs, "partition"));
511
6.92k
        }
512
6.83k
    }
513
514
67.8k
    _partitions_map = std::make_unique<
515
67.8k
            std::map<BlockRowWithIndicator, VOlapTablePartition*, VOlapTablePartKeyComparator>>(
516
67.8k
            VOlapTablePartKeyComparator(_partition_slot_locs, _transformed_slot_locs));
517
67.8k
    if (_t_param.__isset.distributed_columns) {
518
67.7k
        for (auto& col : _t_param.distributed_columns) {
519
65.7k
            RETURN_IF_ERROR(find_slot_locs(col, _distributed_slot_locs, "distributed"));
520
65.7k
        }
521
67.7k
    }
522
523
    // for both auto/non-auto partition table.
524
67.8k
    _is_in_partition = _part_type == TPartitionType::type::LIST_PARTITIONED;
525
526
    // initial partitions. if meet dummy partitions only for open BE nodes, not generate key of them for finding
527
86.1k
    for (const auto& t_part : _t_param.partitions) {
528
86.1k
        VOlapTablePartition* part = nullptr;
529
86.1k
        RETURN_IF_ERROR(generate_partition_from(t_part, part));
530
86.1k
        _partitions.emplace_back(part);
531
532
86.1k
        if (!_t_param.partitions_is_fake) {
533
86.1k
            if (_is_in_partition) {
534
12.1k
                for (auto& in_key : part->in_keys) {
535
12.1k
                    _partitions_map->emplace(std::tuple {in_key.first, in_key.second, false}, part);
536
12.1k
                }
537
78.8k
            } else {
538
78.8k
                _partitions_map->emplace(
539
78.8k
                        std::tuple {part->end_key.first, part->end_key.second, false}, part);
540
78.8k
            }
541
86.1k
        }
542
86.1k
    }
543
544
67.8k
    _mem_usage = _partition_block.allocated_bytes();
545
67.8k
    _mem_tracker->consume(_mem_usage);
546
67.8k
    return Status::OK();
547
67.8k
}
548
549
bool VOlapTablePartitionParam::_part_contains(VOlapTablePartition* part,
550
40.0M
                                              BlockRowWithIndicator key) const {
551
40.0M
    VOlapTablePartKeyComparator comparator(_partition_slot_locs, _transformed_slot_locs);
552
    // we have used upper_bound to find to ensure key < part.right and this part is closest(right - key is min)
553
    // now we only have to check (key >= part.left). the comparator(a,b) means a < b, so we use anti
554
40.0M
    return part->start_key.second == -1 /* spj: start_key.second == -1 means only single partition*/
555
40.0M
           || !comparator(key, std::tuple {part->start_key.first, part->start_key.second, false});
556
40.0M
}
557
558
// insert value into _partition_block's column
559
// NOLINTBEGIN(readability-function-size)
560
47.8k
static Status _create_partition_key(const TExprNode& t_expr, BlockRow* part_key, uint16_t pos) {
561
47.8k
    auto column = std::move(*part_key->first->get_by_position(pos).column).mutate();
562
    //TODO: use assert_cast before insert_data
563
47.8k
    switch (t_expr.node_type) {
564
27.7k
    case TExprNodeType::DATE_LITERAL: {
565
27.7k
        auto primitive_type =
566
27.7k
                DataTypeFactory::instance().create_data_type(t_expr.type)->get_primitive_type();
567
27.7k
        if (primitive_type == TYPE_DATEV2) {
568
22.2k
            DateV2Value<DateV2ValueType> dt;
569
22.2k
            if (!dt.from_date_str(t_expr.date_literal.value.c_str(),
570
22.2k
                                  t_expr.date_literal.value.size())) {
571
0
                std::stringstream ss;
572
0
                ss << "invalid date literal in partition column, date=" << t_expr.date_literal;
573
0
                return Status::InternalError(ss.str());
574
0
            }
575
22.2k
            column->insert_data(reinterpret_cast<const char*>(&dt), 0);
576
22.2k
        } else if (primitive_type == TYPE_DATETIMEV2) {
577
4.90k
            DateV2Value<DateTimeV2ValueType> dt;
578
4.90k
            const int32_t scale =
579
4.90k
                    t_expr.type.types.empty() ? -1 : t_expr.type.types.front().scalar_type.scale;
580
4.90k
            if (!dt.from_date_str(t_expr.date_literal.value.c_str(),
581
4.90k
                                  t_expr.date_literal.value.size(), scale)) {
582
0
                std::stringstream ss;
583
0
                ss << "invalid date literal in partition column, date=" << t_expr.date_literal;
584
0
                return Status::InternalError(ss.str());
585
0
            }
586
4.90k
            column->insert_data(reinterpret_cast<const char*>(&dt), 0);
587
4.90k
        } else if (primitive_type == TYPE_TIMESTAMPTZ) {
588
387
            TimestampTzValue res;
589
387
            CastParameters params {.status = Status::OK(), .is_strict = true};
590
387
            const int32_t scale =
591
387
                    t_expr.type.types.empty() ? -1 : t_expr.type.types.front().scalar_type.scale;
592
387
            if (!CastToTimstampTz::from_string(
593
387
                        {t_expr.date_literal.value.c_str(), t_expr.date_literal.value.size()}, res,
594
387
                        params, nullptr, scale)) [[unlikely]] {
595
0
                std::stringstream ss;
596
0
                ss << "invalid timestamptz literal in partition column, value="
597
0
                   << t_expr.date_literal;
598
0
                return Status::InternalError(ss.str());
599
387
            } else {
600
387
                column->insert_data(reinterpret_cast<const char*>(&res), 0);
601
387
            }
602
387
        } else {
603
            // TYPE_DATE (DATEV1) or TYPE_DATETIME (DATETIMEV1)
604
271
            VecDateTimeValue dt;
605
271
            if (!dt.from_date_str(t_expr.date_literal.value.c_str(),
606
271
                                  t_expr.date_literal.value.size())) {
607
0
                std::stringstream ss;
608
0
                ss << "invalid date literal in partition column, date=" << t_expr.date_literal;
609
0
                return Status::InternalError(ss.str());
610
0
            }
611
271
            if (DataTypeFactory::instance().create_data_type(t_expr.type)->get_primitive_type() ==
612
271
                TYPE_DATE) {
613
120
                dt.cast_to_date();
614
120
            }
615
271
            column->insert_data(reinterpret_cast<const char*>(&dt), 0);
616
271
        }
617
27.7k
        break;
618
27.7k
    }
619
27.7k
    case TExprNodeType::INT_LITERAL: {
620
14.4k
        switch (t_expr.type.types[0].scalar_type.type) {
621
4.02k
        case TPrimitiveType::TINYINT: {
622
4.02k
            auto value = cast_set<int8_t>(t_expr.int_literal.value);
623
4.02k
            column->insert_data(reinterpret_cast<const char*>(&value), 0);
624
4.02k
            break;
625
0
        }
626
391
        case TPrimitiveType::SMALLINT: {
627
391
            auto value = cast_set<int16_t>(t_expr.int_literal.value);
628
391
            column->insert_data(reinterpret_cast<const char*>(&value), 0);
629
391
            break;
630
0
        }
631
9.37k
        case TPrimitiveType::INT: {
632
9.37k
            auto value = cast_set<int32_t>(t_expr.int_literal.value);
633
9.37k
            column->insert_data(reinterpret_cast<const char*>(&value), 0);
634
9.37k
            break;
635
0
        }
636
593
        default:
637
593
            int64_t value = t_expr.int_literal.value;
638
593
            column->insert_data(reinterpret_cast<const char*>(&value), 0);
639
14.4k
        }
640
14.2k
        break;
641
14.4k
    }
642
14.2k
    case TExprNodeType::LARGE_INT_LITERAL: {
643
189
        StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS;
644
189
        auto value = StringParser::string_to_int<__int128>(t_expr.large_int_literal.value.c_str(),
645
189
                                                           t_expr.large_int_literal.value.size(),
646
189
                                                           &parse_result);
647
189
        if (parse_result != StringParser::PARSE_SUCCESS) {
648
0
            value = MAX_INT128;
649
0
        }
650
189
        column->insert_data(reinterpret_cast<const char*>(&value), 0);
651
189
        break;
652
14.4k
    }
653
5.33k
    case TExprNodeType::STRING_LITERAL: {
654
5.33k
        size_t len = t_expr.string_literal.value.size();
655
5.33k
        const char* str_val = t_expr.string_literal.value.c_str();
656
5.33k
        column->insert_data(str_val, len);
657
5.33k
        break;
658
14.4k
    }
659
23
    case TExprNodeType::BOOL_LITERAL: {
660
23
        column->insert_data(reinterpret_cast<const char*>(&t_expr.bool_literal.value), 0);
661
23
        break;
662
14.4k
    }
663
66
    case TExprNodeType::NULL_LITERAL: {
664
        // insert a null literal
665
66
        if (!column->is_nullable()) {
666
            // https://github.com/apache/doris/pull/39449 have forbid this cause. always add this check as protective measures
667
0
            return Status::InternalError("The column {} is not null, can't insert into NULL value.",
668
0
                                         part_key->first->get_by_position(pos).name);
669
0
        }
670
66
        column->insert_data(nullptr, 0);
671
66
        break;
672
66
    }
673
0
    default: {
674
0
        return Status::InternalError("unsupported partition column node type, type={}",
675
0
                                     t_expr.node_type);
676
66
    }
677
47.8k
    }
678
47.5k
    part_key->second = cast_set<int32_t>(column->size() - 1);
679
47.5k
    return Status::OK();
680
47.8k
}
681
// NOLINTEND(readability-function-size)
682
683
Status VOlapTablePartitionParam::_create_partition_keys(const std::vector<TExprNode>& t_exprs,
684
46.9k
                                                        BlockRow* part_key) {
685
94.7k
    for (int i = 0; i < t_exprs.size(); i++) {
686
47.8k
        RETURN_IF_ERROR(_create_partition_key(t_exprs[i], part_key, _partition_slot_locs[i]));
687
47.8k
    }
688
46.9k
    return Status::OK();
689
46.9k
}
690
691
Status VOlapTablePartitionParam::generate_partition_from(const TOlapTablePartition& t_part,
692
86.5k
                                                         VOlapTablePartition*& part_result) {
693
86.5k
    DCHECK(part_result == nullptr);
694
    // here we set the default value of partition bounds first! if it doesn't have some key, it will be -1.
695
86.5k
    part_result = _obj_pool.add(new VOlapTablePartition(&_partition_block));
696
86.5k
    part_result->id = t_part.id;
697
86.5k
    part_result->is_mutable = t_part.is_mutable;
698
    // only load_to_single_tablet = true will set load_tablet_idx
699
86.5k
    if (t_part.__isset.load_tablet_idx) {
700
35.8k
        part_result->load_tablet_idx = t_part.load_tablet_idx;
701
35.8k
    }
702
703
86.5k
    if (_is_in_partition) {
704
12.3k
        for (const auto& keys : t_part.in_keys) {
705
12.3k
            RETURN_IF_ERROR(_create_partition_keys(
706
12.3k
                    keys, &part_result->in_keys.emplace_back(&_partition_block, -1)));
707
12.3k
        }
708
7.49k
        if (t_part.__isset.is_default_partition && t_part.is_default_partition &&
709
7.49k
            _default_partition == nullptr) {
710
17
            _default_partition = part_result;
711
17
        }
712
79.0k
    } else { // range
713
79.0k
        if (t_part.__isset.start_keys) {
714
16.4k
            RETURN_IF_ERROR(_create_partition_keys(t_part.start_keys, &part_result->start_key));
715
16.4k
        }
716
        // we generate the right bound but not insert into partition map
717
79.0k
        if (t_part.__isset.end_keys) {
718
17.6k
            RETURN_IF_ERROR(_create_partition_keys(t_part.end_keys, &part_result->end_key));
719
17.6k
        }
720
79.0k
    }
721
722
86.5k
    part_result->num_buckets = t_part.num_buckets;
723
86.5k
    auto num_indexes = _schema->indexes().size();
724
86.5k
    if (t_part.indexes.size() != num_indexes) {
725
0
        return Status::InternalError(
726
0
                "number of partition's index is not equal with schema's"
727
0
                ", num_part_indexes={}, num_schema_indexes={}",
728
0
                t_part.indexes.size(), num_indexes);
729
0
    }
730
86.5k
    part_result->indexes = t_part.indexes;
731
86.5k
    std::sort(part_result->indexes.begin(), part_result->indexes.end(),
732
86.5k
              [](const OlapTableIndexTablets& lhs, const OlapTableIndexTablets& rhs) {
733
4.55k
                  return lhs.index_id < rhs.index_id;
734
4.55k
              });
735
    // check index
736
174k
    for (int j = 0; j < num_indexes; ++j) {
737
88.2k
        if (part_result->indexes[j].index_id != _schema->indexes()[j]->index_id) {
738
0
            return Status::InternalError(
739
0
                    "partition's index is not equal with schema's"
740
0
                    ", part_index={}, schema_index={}",
741
0
                    part_result->indexes[j].index_id, _schema->indexes()[j]->index_id);
742
0
        }
743
88.2k
    }
744
86.5k
    if (t_part.__isset.total_replica_num) {
745
86.3k
        part_result->total_replica_num = t_part.total_replica_num;
746
86.3k
    }
747
86.5k
    if (t_part.__isset.load_required_replica_num) {
748
86.3k
        part_result->load_required_replica_num = t_part.load_required_replica_num;
749
86.3k
    }
750
86.5k
    return Status::OK();
751
86.5k
}
752
753
Status VOlapTablePartitionParam::add_partitions(
754
173
        const std::vector<TOlapTablePartition>& partitions) {
755
342
    for (const auto& t_part : partitions) {
756
342
        auto* part = _obj_pool.add(new VOlapTablePartition(&_partition_block));
757
342
        part->id = t_part.id;
758
342
        part->is_mutable = t_part.is_mutable;
759
760
        // we dont pass right keys when it's MAX_VALUE. so there's possibility we only have start_key but not end_key
761
        // range partition
762
342
        if (t_part.__isset.start_keys) {
763
149
            RETURN_IF_ERROR(_create_partition_keys(t_part.start_keys, &part->start_key));
764
149
        }
765
342
        if (t_part.__isset.end_keys) {
766
148
            RETURN_IF_ERROR(_create_partition_keys(t_part.end_keys, &part->end_key));
767
148
        }
768
        // list partition - we only set 1 value in 1 partition for new created ones
769
342
        if (t_part.__isset.in_keys) {
770
190
            for (const auto& keys : t_part.in_keys) {
771
190
                RETURN_IF_ERROR(_create_partition_keys(
772
190
                        keys, &part->in_keys.emplace_back(&_partition_block, -1)));
773
190
            }
774
190
            if (t_part.__isset.is_default_partition && t_part.is_default_partition) {
775
0
                _default_partition = part;
776
0
            }
777
190
        }
778
779
342
        part->num_buckets = t_part.num_buckets;
780
342
        auto num_indexes = _schema->indexes().size();
781
342
        if (t_part.indexes.size() != num_indexes) {
782
0
            return Status::InternalError(
783
0
                    "number of partition's index is not equal with schema's"
784
0
                    ", num_part_indexes={}, num_schema_indexes={}",
785
0
                    t_part.indexes.size(), num_indexes);
786
0
        }
787
342
        part->indexes = t_part.indexes;
788
342
        std::sort(part->indexes.begin(), part->indexes.end(),
789
342
                  [](const OlapTableIndexTablets& lhs, const OlapTableIndexTablets& rhs) {
790
0
                      return lhs.index_id < rhs.index_id;
791
0
                  });
792
        // check index
793
684
        for (int j = 0; j < num_indexes; ++j) {
794
342
            if (part->indexes[j].index_id != _schema->indexes()[j]->index_id) {
795
0
                return Status::InternalError(
796
0
                        "partition's index is not equal with schema's"
797
0
                        ", part_index={}, schema_index={}",
798
0
                        part->indexes[j].index_id, _schema->indexes()[j]->index_id);
799
0
            }
800
342
        }
801
342
        _partitions.emplace_back(part);
802
        // after _creating_partiton_keys
803
342
        if (_is_in_partition) {
804
190
            for (auto& in_key : part->in_keys) {
805
190
                _partitions_map->emplace(std::tuple {in_key.first, in_key.second, false}, part);
806
190
            }
807
190
        } else {
808
152
            _partitions_map->emplace(std::tuple {part->end_key.first, part->end_key.second, false},
809
152
                                     part);
810
152
        }
811
342
    }
812
813
173
    return Status::OK();
814
173
}
815
816
Status VOlapTablePartitionParam::replace_partitions(
817
        std::vector<int64_t>& old_partition_ids,
818
20
        const std::vector<TOlapTablePartition>& new_partitions) {
819
    // remove old replaced partitions
820
20
    DCHECK(old_partition_ids.size() == new_partitions.size());
821
822
    // init and add new partitions. insert into _partitions
823
52
    for (int i = 0; i < new_partitions.size(); i++) {
824
32
        const auto& t_part = new_partitions[i];
825
        // pair old_partition_ids and new_partitions one by one. TODO: sort to opt performance
826
32
        VOlapTablePartition* old_part = nullptr;
827
32
        auto old_part_id = old_partition_ids[i];
828
32
        if (auto it = std::find_if(
829
32
                    _partitions.begin(), _partitions.end(),
830
76
                    [=](const VOlapTablePartition* lhs) { return lhs->id == old_part_id; });
831
32
            it != _partitions.end()) {
832
32
            old_part = *it;
833
32
        } else {
834
0
            return Status::InternalError("Cannot find old tablet {} in replacing", old_part_id);
835
0
        }
836
837
32
        auto* part = _obj_pool.add(new VOlapTablePartition(&_partition_block));
838
32
        part->id = t_part.id;
839
32
        part->is_mutable = t_part.is_mutable;
840
841
        /// just substitute directly. no need to remove and reinsert keys.
842
        // range partition
843
32
        part->start_key = std::move(old_part->start_key);
844
32
        part->end_key = std::move(old_part->end_key);
845
        // list partition
846
32
        part->in_keys = std::move(old_part->in_keys);
847
32
        if (t_part.__isset.is_default_partition && t_part.is_default_partition) {
848
0
            _default_partition = part;
849
0
        }
850
851
32
        part->num_buckets = t_part.num_buckets;
852
32
        auto num_indexes = _schema->indexes().size();
853
32
        if (t_part.indexes.size() != num_indexes) {
854
0
            return Status::InternalError(
855
0
                    "number of partition's index is not equal with schema's"
856
0
                    ", num_part_indexes={}, num_schema_indexes={}",
857
0
                    t_part.indexes.size(), num_indexes);
858
0
        }
859
32
        part->indexes = t_part.indexes;
860
32
        std::sort(part->indexes.begin(), part->indexes.end(),
861
32
                  [](const OlapTableIndexTablets& lhs, const OlapTableIndexTablets& rhs) {
862
0
                      return lhs.index_id < rhs.index_id;
863
0
                  });
864
        // check index
865
64
        for (int j = 0; j < num_indexes; ++j) {
866
32
            if (part->indexes[j].index_id != _schema->indexes()[j]->index_id) {
867
0
                return Status::InternalError(
868
0
                        "partition's index is not equal with schema's"
869
0
                        ", part_index={}, schema_index={}",
870
0
                        part->indexes[j].index_id, _schema->indexes()[j]->index_id);
871
0
            }
872
32
        }
873
874
        // add new partitions with new id.
875
32
        _partitions.emplace_back(part);
876
32
        VLOG_NOTICE << "params add new partition " << part->id;
877
878
        // replace items in _partition_maps
879
32
        if (_is_in_partition) {
880
44
            for (auto& in_key : part->in_keys) {
881
44
                (*_partitions_map)[std::tuple {in_key.first, in_key.second, false}] = part;
882
44
            }
883
21
        } else {
884
11
            (*_partitions_map)[std::tuple {part->end_key.first, part->end_key.second, false}] =
885
11
                    part;
886
11
        }
887
32
    }
888
    // remove old partitions by id
889
20
    std::ranges::sort(old_partition_ids);
890
129
    for (auto it = _partitions.begin(); it != _partitions.end();) {
891
109
        if (std::ranges::binary_search(old_partition_ids, (*it)->id)) {
892
32
            it = _partitions.erase(it);
893
77
        } else {
894
77
            it++;
895
77
        }
896
109
    }
897
898
20
    return Status::OK();
899
20
}
900
#include "common/compile_check_end.h"
901
902
} // namespace doris