Coverage Report

Created: 2024-11-20 12:56

/root/doris/be/src/olap/schema_change.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "olap/schema_change.h"
19
20
#include <gen_cpp/AgentService_types.h>
21
#include <gen_cpp/Exprs_types.h>
22
#include <gen_cpp/olap_file.pb.h>
23
24
#include <algorithm>
25
#include <exception>
26
#include <map>
27
#include <mutex>
28
#include <roaring/roaring.hh>
29
#include <tuple>
30
31
#include "common/logging.h"
32
#include "common/signal_handler.h"
33
#include "common/status.h"
34
#include "gutil/hash/hash.h"
35
#include "gutil/integral_types.h"
36
#include "gutil/strings/numbers.h"
37
#include "io/fs/file_system.h"
38
#include "io/io_common.h"
39
#include "olap/data_dir.h"
40
#include "olap/delete_handler.h"
41
#include "olap/field.h"
42
#include "olap/iterators.h"
43
#include "olap/merger.h"
44
#include "olap/olap_common.h"
45
#include "olap/olap_define.h"
46
#include "olap/rowset/beta_rowset.h"
47
#include "olap/rowset/rowset_meta.h"
48
#include "olap/rowset/rowset_reader_context.h"
49
#include "olap/rowset/rowset_writer_context.h"
50
#include "olap/rowset/segment_v2/column_reader.h"
51
#include "olap/rowset/segment_v2/inverted_index_desc.h"
52
#include "olap/rowset/segment_v2/inverted_index_writer.h"
53
#include "olap/rowset/segment_v2/segment.h"
54
#include "olap/schema.h"
55
#include "olap/segment_loader.h"
56
#include "olap/storage_engine.h"
57
#include "olap/tablet.h"
58
#include "olap/tablet_manager.h"
59
#include "olap/tablet_meta.h"
60
#include "olap/tablet_schema.h"
61
#include "olap/types.h"
62
#include "olap/utils.h"
63
#include "olap/wrapper_field.h"
64
#include "runtime/memory/mem_tracker.h"
65
#include "runtime/runtime_state.h"
66
#include "util/debug_points.h"
67
#include "util/defer_op.h"
68
#include "util/trace.h"
69
#include "vec/aggregate_functions/aggregate_function.h"
70
#include "vec/aggregate_functions/aggregate_function_reader.h"
71
#include "vec/columns/column.h"
72
#include "vec/columns/column_nullable.h"
73
#include "vec/common/assert_cast.h"
74
#include "vec/core/block.h"
75
#include "vec/core/column_with_type_and_name.h"
76
#include "vec/exprs/vexpr.h"
77
#include "vec/exprs/vexpr_context.h"
78
#include "vec/olap/olap_data_convertor.h"
79
80
namespace doris {
81
class CollectionValue;
82
83
using namespace ErrorCode;
84
85
constexpr int ALTER_TABLE_BATCH_SIZE = 4096;
86
87
class MultiBlockMerger {
88
public:
89
0
    MultiBlockMerger(TabletSharedPtr tablet) : _tablet(tablet), _cmp(tablet) {}
90
91
    Status merge(const std::vector<std::unique_ptr<vectorized::Block>>& blocks,
92
0
                 RowsetWriter* rowset_writer, uint64_t* merged_rows) {
93
0
        int rows = 0;
94
0
        for (auto& block : blocks) {
95
0
            rows += block->rows();
96
0
        }
97
0
        if (!rows) {
98
0
            return Status::OK();
99
0
        }
100
101
0
        std::vector<RowRef> row_refs;
102
0
        row_refs.reserve(rows);
103
0
        for (auto& block : blocks) {
104
0
            for (uint16_t i = 0; i < block->rows(); i++) {
105
0
                row_refs.emplace_back(block.get(), i);
106
0
            }
107
0
        }
108
        // TODO: try to use pdqsort to replace std::sort
109
        // The block version is incremental.
110
0
        std::stable_sort(row_refs.begin(), row_refs.end(), _cmp);
111
112
0
        auto finalized_block = _tablet->tablet_schema()->create_block();
113
0
        int columns = finalized_block.columns();
114
0
        *merged_rows += rows;
115
116
0
        if (_tablet->keys_type() == KeysType::AGG_KEYS) {
117
0
            auto tablet_schema = _tablet->tablet_schema();
118
0
            int key_number = _tablet->num_key_columns();
119
120
0
            std::vector<vectorized::AggregateFunctionPtr> agg_functions;
121
0
            std::vector<vectorized::AggregateDataPtr> agg_places;
122
123
0
            for (int i = key_number; i < columns; i++) {
124
0
                try {
125
0
                    vectorized::AggregateFunctionPtr function =
126
0
                            tablet_schema->column(i).get_aggregate_function(
127
0
                                    vectorized::AGG_LOAD_SUFFIX);
128
0
                    agg_functions.push_back(function);
129
                    // create aggregate data
130
0
                    vectorized::AggregateDataPtr place = new char[function->size_of_data()];
131
0
                    function->create(place);
132
0
                    agg_places.push_back(place);
133
0
                } catch (...) {
134
0
                    for (int j = 0; j < i - key_number; ++j) {
135
0
                        agg_functions[j]->destroy(agg_places[j]);
136
0
                        delete[] agg_places[j];
137
0
                    }
138
0
                    throw;
139
0
                }
140
0
            }
141
142
0
            DEFER({
143
0
                for (int i = 0; i < columns - key_number; i++) {
144
0
                    agg_functions[i]->destroy(agg_places[i]);
145
0
                    delete[] agg_places[i];
146
0
                }
147
0
            });
148
149
0
            for (int i = 0; i < rows; i++) {
150
0
                auto row_ref = row_refs[i];
151
152
0
                for (int j = key_number; j < columns; j++) {
153
0
                    auto column_ptr = row_ref.get_column(j).get();
154
0
                    agg_functions[j - key_number]->add(
155
0
                            agg_places[j - key_number],
156
0
                            const_cast<const vectorized::IColumn**>(&column_ptr), row_ref.position,
157
0
                            nullptr);
158
0
                }
159
160
0
                if (i == rows - 1 || _cmp.compare(row_refs[i], row_refs[i + 1])) {
161
0
                    for (int j = 0; j < key_number; j++) {
162
0
                        finalized_block.get_by_position(j).column->assume_mutable()->insert_from(
163
0
                                *row_ref.get_column(j), row_ref.position);
164
0
                    }
165
166
0
                    for (int j = key_number; j < columns; j++) {
167
0
                        agg_functions[j - key_number]->insert_result_into(
168
0
                                agg_places[j - key_number],
169
0
                                finalized_block.get_by_position(j).column->assume_mutable_ref());
170
0
                        agg_functions[j - key_number]->reset(agg_places[j - key_number]);
171
0
                    }
172
173
0
                    if (i == rows - 1 || finalized_block.rows() == ALTER_TABLE_BATCH_SIZE) {
174
0
                        *merged_rows -= finalized_block.rows();
175
0
                        rowset_writer->add_block(&finalized_block);
176
0
                        finalized_block.clear_column_data();
177
0
                    }
178
0
                }
179
0
            }
180
0
        } else {
181
0
            std::vector<RowRef> pushed_row_refs;
182
0
            if (_tablet->keys_type() == KeysType::DUP_KEYS) {
183
0
                std::swap(pushed_row_refs, row_refs);
184
0
            } else if (_tablet->keys_type() == KeysType::UNIQUE_KEYS) {
185
0
                for (int i = 0; i < rows; i++) {
186
0
                    if (i == rows - 1 || _cmp.compare(row_refs[i], row_refs[i + 1])) {
187
0
                        pushed_row_refs.push_back(row_refs[i]);
188
0
                    }
189
0
                }
190
0
            }
191
192
            // update real inserted row number
193
0
            rows = pushed_row_refs.size();
194
0
            *merged_rows -= rows;
195
196
0
            for (int i = 0; i < rows; i += ALTER_TABLE_BATCH_SIZE) {
197
0
                int limit = std::min(ALTER_TABLE_BATCH_SIZE, rows - i);
198
199
0
                for (int idx = 0; idx < columns; idx++) {
200
0
                    auto column = finalized_block.get_by_position(idx).column->assume_mutable();
201
202
0
                    for (int j = 0; j < limit; j++) {
203
0
                        auto row_ref = pushed_row_refs[i + j];
204
0
                        column->insert_from(*row_ref.get_column(idx), row_ref.position);
205
0
                    }
206
0
                }
207
0
                rowset_writer->add_block(&finalized_block);
208
0
                finalized_block.clear_column_data();
209
0
            }
210
0
        }
211
212
0
        RETURN_IF_ERROR(rowset_writer->flush());
213
0
        return Status::OK();
214
0
    }
215
216
private:
217
    struct RowRef {
218
        RowRef(vectorized::Block* block_, uint16_t position_)
219
0
                : block(block_), position(position_) {}
220
0
        vectorized::ColumnPtr get_column(int index) const {
221
0
            return block->get_by_position(index).column;
222
0
        }
223
        const vectorized::Block* block;
224
        uint16_t position;
225
    };
226
227
    struct RowRefComparator {
228
0
        RowRefComparator(TabletSharedPtr tablet) : _num_columns(tablet->num_key_columns()) {}
229
230
0
        int compare(const RowRef& lhs, const RowRef& rhs) const {
231
0
            return lhs.block->compare_at(lhs.position, rhs.position, _num_columns, *rhs.block, -1);
232
0
        }
233
234
0
        bool operator()(const RowRef& lhs, const RowRef& rhs) const {
235
0
            return compare(lhs, rhs) < 0;
236
0
        }
237
238
        const size_t _num_columns;
239
    };
240
241
    TabletSharedPtr _tablet;
242
    RowRefComparator _cmp;
243
};
244
245
BlockChanger::BlockChanger(TabletSchemaSPtr tablet_schema, DescriptorTbl desc_tbl)
246
0
        : _desc_tbl(desc_tbl) {
247
0
    _schema_mapping.resize(tablet_schema->num_columns());
248
0
}
249
250
0
BlockChanger::~BlockChanger() {
251
0
    for (auto it = _schema_mapping.begin(); it != _schema_mapping.end(); ++it) {
252
0
        SAFE_DELETE(it->default_value);
253
0
    }
254
0
    _schema_mapping.clear();
255
0
}
256
257
0
ColumnMapping* BlockChanger::get_mutable_column_mapping(size_t column_index) {
258
0
    if (column_index >= _schema_mapping.size()) {
259
0
        return nullptr;
260
0
    }
261
262
0
    return &(_schema_mapping[column_index]);
263
0
}
264
265
Status BlockChanger::change_block(vectorized::Block* ref_block,
266
0
                                  vectorized::Block* new_block) const {
267
0
    ObjectPool pool;
268
0
    RuntimeState* state = pool.add(RuntimeState::create_unique().release());
269
0
    state->set_desc_tbl(&_desc_tbl);
270
0
    state->set_be_exec_version(_fe_compatible_version);
271
0
    RowDescriptor row_desc =
272
0
            RowDescriptor(_desc_tbl.get_tuple_descriptor(_desc_tbl.get_row_tuples()[0]), false);
273
274
0
    if (_where_expr != nullptr) {
275
0
        vectorized::VExprContextSPtr ctx = nullptr;
276
0
        RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(*_where_expr, ctx));
277
0
        RETURN_IF_ERROR(ctx->prepare(state, row_desc));
278
0
        RETURN_IF_ERROR(ctx->open(state));
279
280
0
        RETURN_IF_ERROR(
281
0
                vectorized::VExprContext::filter_block(ctx.get(), ref_block, ref_block->columns()));
282
0
    }
283
284
0
    const int row_size = ref_block->rows();
285
0
    const int column_size = new_block->columns();
286
287
    // swap ref_block[key] and new_block[value]
288
0
    std::list<std::pair<int, int>> swap_idx_list;
289
0
    for (int idx = 0; idx < column_size; idx++) {
290
0
        if (_schema_mapping[idx].expr != nullptr) {
291
0
            vectorized::VExprContextSPtr ctx;
292
0
            RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(*_schema_mapping[idx].expr, ctx));
293
0
            RETURN_IF_ERROR(ctx->prepare(state, row_desc));
294
0
            RETURN_IF_ERROR(ctx->open(state));
295
296
0
            int result_column_id = -1;
297
0
            RETURN_IF_ERROR(ctx->execute(ref_block, &result_column_id));
298
0
            if (ref_block->get_by_position(result_column_id).column == nullptr) {
299
0
                return Status::Error<ErrorCode::INTERNAL_ERROR>(
300
0
                        "{} result column is nullptr",
301
0
                        ref_block->get_by_position(result_column_id).name);
302
0
            }
303
0
            ref_block->replace_by_position_if_const(result_column_id);
304
305
0
            if (ref_block->get_by_position(result_column_id).column->size() != row_size) {
306
0
                return Status::Error<ErrorCode::INTERNAL_ERROR>(
307
0
                        "{} size invalid, expect={}, real={}", new_block->get_by_position(idx).name,
308
0
                        row_size, ref_block->get_by_position(result_column_id).column->size());
309
0
            }
310
0
            RETURN_IF_ERROR(_check_cast_valid(ref_block->get_by_position(idx).column,
311
0
                                              ref_block->get_by_position(result_column_id).column,
312
0
                                              _type));
313
0
            swap_idx_list.push_back({result_column_id, idx});
314
0
        } else if (_schema_mapping[idx].ref_column < 0) {
315
0
            if (_type != ROLLUP) {
316
                // new column, write default value
317
0
                auto value = _schema_mapping[idx].default_value;
318
0
                auto column = new_block->get_by_position(idx).column->assume_mutable();
319
0
                if (value->is_null()) {
320
0
                    DCHECK(column->is_nullable());
321
0
                    column->insert_many_defaults(row_size);
322
0
                } else {
323
0
                    auto type_info = get_type_info(_schema_mapping[idx].new_column);
324
0
                    DefaultValueColumnIterator::insert_default_data(type_info.get(), value->size(),
325
0
                                                                    value->ptr(), column, row_size);
326
0
                }
327
0
            } else {
328
0
                return Status::Error<ErrorCode::INTERNAL_ERROR>(
329
0
                        "rollup job meet invalid ref_column, new_column={}",
330
0
                        _schema_mapping[idx].new_column->name());
331
0
            }
332
0
        } else {
333
            // same type, just swap column
334
0
            swap_idx_list.push_back({_schema_mapping[idx].ref_column, idx});
335
0
        }
336
0
    }
337
338
0
    for (auto it : swap_idx_list) {
339
0
        auto& ref_col = ref_block->get_by_position(it.first).column;
340
0
        auto& new_col = new_block->get_by_position(it.second).column;
341
342
0
        bool ref_col_nullable = ref_col->is_nullable();
343
0
        bool new_col_nullable = new_col->is_nullable();
344
345
0
        if (ref_col_nullable != new_col_nullable) {
346
            // not nullable to nullable
347
0
            if (new_col_nullable) {
348
0
                auto* new_nullable_col =
349
0
                        assert_cast<vectorized::ColumnNullable*>(new_col->assume_mutable().get());
350
351
0
                new_nullable_col->change_nested_column(ref_col);
352
0
                new_nullable_col->get_null_map_data().resize_fill(ref_col->size());
353
0
            } else {
354
                // nullable to not nullable:
355
                // suppose column `c_phone` is originally varchar(16) NOT NULL,
356
                // then do schema change `alter table test modify column c_phone int not null`,
357
                // the cast expr of schema change is `CastExpr(CAST String to Nullable(Int32))`,
358
                // so need to handle nullable to not nullable here
359
0
                auto* ref_nullable_col =
360
0
                        assert_cast<vectorized::ColumnNullable*>(ref_col->assume_mutable().get());
361
362
0
                new_col = ref_nullable_col->get_nested_column_ptr();
363
0
            }
364
0
        } else {
365
0
            new_block->get_by_position(it.second).column =
366
0
                    ref_block->get_by_position(it.first).column;
367
0
        }
368
0
    }
369
0
    return Status::OK();
370
0
}
371
372
// This check is to prevent schema-change from causing data loss
373
Status BlockChanger::_check_cast_valid(vectorized::ColumnPtr ref_column,
374
                                       vectorized::ColumnPtr new_column,
375
0
                                       AlterTabletType type) const {
376
0
    if (ref_column->size() != new_column->size()) {
377
0
        return Status::InternalError(
378
0
                "column size is changed, ref_column_size={}, new_column_size={}",
379
0
                ref_column->size(), new_column->size());
380
0
    }
381
0
    if (type == ROLLUP) {
382
0
        return Status::OK();
383
0
    }
384
0
    if (ref_column->is_nullable() != new_column->is_nullable()) {
385
0
        if (ref_column->is_nullable()) {
386
0
            auto* ref_null_map =
387
0
                    vectorized::check_and_get_column<vectorized::ColumnNullable>(ref_column)
388
0
                            ->get_null_map_column()
389
0
                            .get_data()
390
0
                            .data();
391
392
0
            bool is_changed = false;
393
0
            for (size_t i = 0; i < ref_column->size(); i++) {
394
0
                is_changed |= ref_null_map[i];
395
0
            }
396
0
            if (is_changed) {
397
0
                return Status::DataQualityError("Null data is changed to not nullable");
398
0
            }
399
0
        } else {
400
0
            const auto& null_map_column =
401
0
                    vectorized::check_and_get_column<vectorized::ColumnNullable>(new_column)
402
0
                            ->get_null_map_column();
403
0
            const auto& nested_column =
404
0
                    vectorized::check_and_get_column<vectorized::ColumnNullable>(new_column)
405
0
                            ->get_nested_column();
406
0
            const auto* new_null_map = null_map_column.get_data().data();
407
408
0
            if (null_map_column.size() != new_column->size() ||
409
0
                nested_column.size() != new_column->size()) {
410
0
                DCHECK(false) << "null_map_column_size=" << null_map_column.size()
411
0
                              << " new_column_size=" << new_column->size()
412
0
                              << " nested_column_size=" << nested_column.size();
413
0
                return Status::InternalError(
414
0
                        "null_map_column size is changed, null_map_column_size={}, "
415
0
                        "new_column_size={}",
416
0
                        null_map_column.size(), new_column->size());
417
0
            }
418
419
0
            bool is_changed = false;
420
0
            for (size_t i = 0; i < ref_column->size(); i++) {
421
0
                is_changed |= new_null_map[i];
422
0
            }
423
0
            if (is_changed) {
424
0
                return Status::DataQualityError("Some data is changed to null");
425
0
            }
426
0
        }
427
0
    }
428
429
0
    if (ref_column->is_nullable() && new_column->is_nullable()) {
430
0
        auto* ref_null_map =
431
0
                vectorized::check_and_get_column<vectorized::ColumnNullable>(ref_column)
432
0
                        ->get_null_map_column()
433
0
                        .get_data()
434
0
                        .data();
435
0
        auto* new_null_map =
436
0
                vectorized::check_and_get_column<vectorized::ColumnNullable>(new_column)
437
0
                        ->get_null_map_column()
438
0
                        .get_data()
439
0
                        .data();
440
441
0
        bool is_changed = false;
442
0
        for (size_t i = 0; i < ref_column->size(); i++) {
443
0
            is_changed |= (ref_null_map[i] != new_null_map[i]);
444
0
        }
445
0
        if (is_changed) {
446
0
            return Status::DataQualityError("is_null of data is changed!");
447
0
        }
448
0
    }
449
0
    return Status::OK();
450
0
}
451
452
Status LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
453
                                   TabletSharedPtr new_tablet, TabletSharedPtr base_tablet,
454
0
                                   TabletSchemaSPtr base_tablet_schema) {
455
    // In some cases, there may be more than one type of rowset in a tablet,
456
    // in which case the conversion cannot be done directly by linked schema change,
457
    // but requires direct schema change to rewrite the data.
458
0
    if (rowset_reader->type() != rowset_writer->type()) {
459
0
        LOG(INFO) << "the type of rowset " << rowset_reader->rowset()->rowset_id()
460
0
                  << " in base tablet is not same as type " << rowset_writer->type()
461
0
                  << ", use direct schema change.";
462
0
        return SchemaChangeHandler::get_sc_procedure(_changer, false, true)
463
0
                ->process(rowset_reader, rowset_writer, new_tablet, base_tablet,
464
0
                          base_tablet_schema);
465
0
    } else {
466
0
        Status status = rowset_writer->add_rowset_for_linked_schema_change(rowset_reader->rowset());
467
0
        if (!status) {
468
0
            LOG(WARNING) << "fail to convert rowset."
469
0
                         << ", new_tablet=" << new_tablet->full_name()
470
0
                         << ", version=" << rowset_writer->version().first << "-"
471
0
                         << rowset_writer->version().second << ", error status " << status;
472
0
            return status;
473
0
        }
474
        // copy delete bitmap to new tablet.
475
0
        if (new_tablet->keys_type() == UNIQUE_KEYS &&
476
0
            new_tablet->enable_unique_key_merge_on_write()) {
477
0
            DeleteBitmap origin_delete_bitmap(base_tablet->tablet_id());
478
0
            base_tablet->tablet_meta()->delete_bitmap().subset(
479
0
                    {rowset_reader->rowset()->rowset_id(), 0, 0},
480
0
                    {rowset_reader->rowset()->rowset_id(), UINT32_MAX, INT64_MAX},
481
0
                    &origin_delete_bitmap);
482
0
            for (auto iter = origin_delete_bitmap.delete_bitmap.begin();
483
0
                 iter != origin_delete_bitmap.delete_bitmap.end(); ++iter) {
484
0
                int ret = new_tablet->tablet_meta()->delete_bitmap().set(
485
0
                        {rowset_writer->rowset_id(), std::get<1>(iter->first),
486
0
                         std::get<2>(iter->first)},
487
0
                        iter->second);
488
0
                DCHECK(ret == 1);
489
0
            }
490
0
        }
491
0
        return Status::OK();
492
0
    }
493
0
}
494
495
Status VSchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader,
496
                                             RowsetWriter* rowset_writer,
497
                                             TabletSharedPtr new_tablet,
498
0
                                             TabletSchemaSPtr base_tablet_schema) {
499
0
    bool eof = false;
500
0
    do {
501
        // tablet may be dropped due to user cancel, schema change thread should fast fail
502
        // and release tablet lock.
503
0
        if (new_tablet->tablet_state() == TABLET_SHUTDOWN) {
504
0
            return Status::Error<TABLE_ALREADY_DELETED_ERROR>(
505
0
                    "fail to process tablet because it is to be deleted. tablet_id={}",
506
0
                    new_tablet->tablet_id());
507
0
        }
508
0
        auto new_block =
509
0
                vectorized::Block::create_unique(new_tablet->tablet_schema()->create_block());
510
0
        auto ref_block = vectorized::Block::create_unique(base_tablet_schema->create_block());
511
512
0
        auto st = rowset_reader->next_block(ref_block.get());
513
0
        if (!st) {
514
0
            if (st.is<ErrorCode::END_OF_FILE>()) {
515
0
                if (ref_block->rows() == 0) {
516
0
                    break;
517
0
                } else {
518
0
                    eof = true;
519
0
                }
520
0
            } else {
521
0
                return st;
522
0
            }
523
0
        }
524
525
0
        RETURN_IF_ERROR(_changer.change_block(ref_block.get(), new_block.get()));
526
0
        RETURN_IF_ERROR(rowset_writer->add_block(new_block.get()));
527
0
    } while (!eof);
528
529
0
    RETURN_IF_ERROR(rowset_writer->flush());
530
0
    return Status::OK();
531
0
}
532
533
VSchemaChangeWithSorting::VSchemaChangeWithSorting(const BlockChanger& changer,
534
                                                   size_t memory_limitation)
535
        : _changer(changer),
536
          _memory_limitation(memory_limitation),
537
0
          _temp_delta_versions(Version::mock()) {
538
0
    _mem_tracker = std::make_unique<MemTracker>(
539
0
            fmt::format("VSchemaChangeWithSorting:changer={}", std::to_string(int64(&changer))));
540
0
}
541
542
Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader,
543
                                                RowsetWriter* rowset_writer,
544
                                                TabletSharedPtr new_tablet,
545
0
                                                TabletSchemaSPtr base_tablet_schema) {
546
    // for internal sorting
547
0
    std::vector<std::unique_ptr<vectorized::Block>> blocks;
548
549
    // for external sorting
550
    // src_rowsets to store the rowset generated by internal sorting
551
0
    std::vector<RowsetSharedPtr> src_rowsets;
552
553
0
    Defer defer {[&]() {
554
        // remove the intermediate rowsets generated by internal sorting
555
0
        for (auto& row_set : src_rowsets) {
556
0
            StorageEngine::instance()->add_unused_rowset(row_set);
557
0
        }
558
0
    }};
559
560
0
    RowsetSharedPtr rowset = rowset_reader->rowset();
561
0
    SegmentsOverlapPB segments_overlap = rowset->rowset_meta()->segments_overlap();
562
0
    int64_t newest_write_timestamp = rowset->newest_write_timestamp();
563
0
    _temp_delta_versions.first = _temp_delta_versions.second;
564
565
0
    auto create_rowset = [&]() -> Status {
566
0
        if (blocks.empty()) {
567
0
            return Status::OK();
568
0
        }
569
570
0
        RowsetSharedPtr rowset;
571
0
        RETURN_IF_ERROR(_internal_sorting(
572
0
                blocks, Version(_temp_delta_versions.second, _temp_delta_versions.second),
573
0
                newest_write_timestamp, new_tablet, BETA_ROWSET, segments_overlap, &rowset));
574
0
        src_rowsets.push_back(rowset);
575
576
0
        for (auto& block : blocks) {
577
0
            _mem_tracker->release(block->allocated_bytes());
578
0
        }
579
0
        blocks.clear();
580
581
        // increase temp version
582
0
        _temp_delta_versions.second++;
583
0
        return Status::OK();
584
0
    };
585
586
0
    auto new_block = vectorized::Block::create_unique(new_tablet->tablet_schema()->create_block());
587
588
0
    bool eof = false;
589
0
    do {
590
        // tablet may be dropped due to user cancel, schema change thread should fast fail
591
        // and release tablet lock.
592
0
        if (new_tablet->tablet_state() == TABLET_SHUTDOWN) {
593
0
            return Status::Error<TABLE_ALREADY_DELETED_ERROR>(
594
0
                    "fail to process tablet because it is to be deleted. tablet_id={}",
595
0
                    new_tablet->tablet_id());
596
0
        }
597
0
        auto ref_block = vectorized::Block::create_unique(base_tablet_schema->create_block());
598
0
        auto st = rowset_reader->next_block(ref_block.get());
599
0
        if (!st) {
600
0
            if (st.is<ErrorCode::END_OF_FILE>()) {
601
0
                if (ref_block->rows() == 0) {
602
0
                    break;
603
0
                } else {
604
0
                    eof = true;
605
0
                }
606
0
            } else {
607
0
                return st;
608
0
            }
609
0
        }
610
611
0
        RETURN_IF_ERROR(_changer.change_block(ref_block.get(), new_block.get()));
612
613
0
        constexpr double HOLD_BLOCK_MEMORY_RATE =
614
0
                0.66; // Reserve some memory for use by other parts of this job
615
0
        if (_mem_tracker->consumption() + new_block->allocated_bytes() > _memory_limitation ||
616
0
            _mem_tracker->consumption() > _memory_limitation * HOLD_BLOCK_MEMORY_RATE) {
617
0
            RETURN_IF_ERROR(create_rowset());
618
619
0
            if (_mem_tracker->consumption() + new_block->allocated_bytes() > _memory_limitation) {
620
0
                return Status::Error<INVALID_ARGUMENT>(
621
0
                        "Memory limitation is too small for Schema Change. _memory_limitation={}, "
622
0
                        "new_block->allocated_bytes()={}, consumption={}",
623
0
                        _memory_limitation, new_block->allocated_bytes(),
624
0
                        _mem_tracker->consumption());
625
0
            }
626
0
        }
627
0
        _mem_tracker->consume(new_block->allocated_bytes());
628
629
        // move unique ptr
630
0
        blocks.push_back(
631
0
                vectorized::Block::create_unique(new_tablet->tablet_schema()->create_block()));
632
0
        swap(blocks.back(), new_block);
633
0
    } while (!eof);
634
635
0
    RETURN_IF_ERROR(create_rowset());
636
637
0
    if (src_rowsets.empty()) {
638
0
        RETURN_IF_ERROR(rowset_writer->flush());
639
0
    } else {
640
0
        RETURN_IF_ERROR(_external_sorting(src_rowsets, rowset_writer, new_tablet));
641
0
    }
642
643
0
    return Status::OK();
644
0
}
645
646
Status VSchemaChangeWithSorting::_internal_sorting(
647
        const std::vector<std::unique_ptr<vectorized::Block>>& blocks, const Version& version,
648
        int64_t newest_write_timestamp, TabletSharedPtr new_tablet, RowsetTypePB new_rowset_type,
649
0
        SegmentsOverlapPB segments_overlap, RowsetSharedPtr* rowset) {
650
0
    uint64_t merged_rows = 0;
651
0
    MultiBlockMerger merger(new_tablet);
652
0
    std::unique_ptr<RowsetWriter> rowset_writer;
653
0
    RowsetWriterContext context;
654
0
    context.version = version;
655
0
    context.rowset_state = VISIBLE;
656
0
    context.segments_overlap = segments_overlap;
657
0
    context.tablet_schema = new_tablet->tablet_schema();
658
0
    context.newest_write_timestamp = newest_write_timestamp;
659
0
    context.write_type = DataWriteType::TYPE_SCHEMA_CHANGE;
660
0
    RETURN_IF_ERROR(new_tablet->create_rowset_writer(context, &rowset_writer));
661
0
    RETURN_IF_ERROR(merger.merge(blocks, rowset_writer.get(), &merged_rows));
662
663
0
    _add_merged_rows(merged_rows);
664
0
    RETURN_IF_ERROR(rowset_writer->build(*rowset));
665
0
    return Status::OK();
666
0
}
667
668
Status VSchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_rowsets,
669
                                                   RowsetWriter* rowset_writer,
670
0
                                                   TabletSharedPtr new_tablet) {
671
0
    std::vector<RowsetReaderSharedPtr> rs_readers;
672
0
    for (auto& rowset : src_rowsets) {
673
0
        RowsetReaderSharedPtr rs_reader;
674
0
        RETURN_IF_ERROR(rowset->create_reader(&rs_reader));
675
0
        rs_readers.push_back(rs_reader);
676
0
    }
677
678
0
    Merger::Statistics stats;
679
0
    RETURN_IF_ERROR(Merger::vmerge_rowsets(new_tablet, ReaderType::READER_ALTER_TABLE,
680
0
                                           new_tablet->tablet_schema(), rs_readers, rowset_writer,
681
0
                                           &stats));
682
0
    _add_merged_rows(stats.merged_rows);
683
0
    _add_filtered_rows(stats.filtered_rows);
684
0
    return Status::OK();
685
0
}
686
687
0
Status SchemaChangeHandler::process_alter_tablet_v2(const TAlterTabletReqV2& request) {
688
0
    if (!request.__isset.desc_tbl) {
689
0
        return Status::Error<INVALID_ARGUMENT>(
690
0
                "desc_tbl is not set. Maybe the FE version is not equal to the BE "
691
0
                "version.");
692
0
    }
693
694
0
    LOG(INFO) << "begin to do request alter tablet: base_tablet_id=" << request.base_tablet_id
695
0
              << ", new_tablet_id=" << request.new_tablet_id
696
0
              << ", alter_version=" << request.alter_version;
697
698
0
    TabletSharedPtr base_tablet =
699
0
            StorageEngine::instance()->tablet_manager()->get_tablet(request.base_tablet_id);
700
0
    if (base_tablet == nullptr) {
701
0
        return Status::Error<TABLE_NOT_FOUND>("fail to find base tablet. base_tablet={}",
702
0
                                              request.base_tablet_id);
703
0
    }
704
    // Lock schema_change_lock util schema change info is stored in tablet header
705
0
    std::unique_lock<std::mutex> schema_change_lock(base_tablet->get_schema_change_lock(),
706
0
                                                    std::try_to_lock);
707
0
    if (!schema_change_lock.owns_lock()) {
708
0
        return Status::Error<TRY_LOCK_FAILED>("failed to obtain schema change lock. base_tablet={}",
709
0
                                              request.base_tablet_id);
710
0
    }
711
712
0
    Status res = _do_process_alter_tablet_v2(request);
713
0
    LOG(INFO) << "finished alter tablet process, res=" << res;
714
0
    return res;
715
0
}
716
717
std::shared_mutex SchemaChangeHandler::_mutex;
718
std::unordered_set<int64_t> SchemaChangeHandler::_tablet_ids_in_converting;
719
720
// In the past schema change and rollup will create new tablet  and will wait for txns starting before the task to finished
721
// It will cost a lot of time to wait and the task is very difficult to understand.
722
// In alter task v2, FE will call BE to create tablet and send an alter task to BE to convert historical data.
723
// The admin should upgrade all BE and then upgrade FE.
724
// Should delete the old code after upgrade finished.
725
0
Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& request) {
726
0
    Status res = Status::OK();
727
0
    TabletSharedPtr base_tablet =
728
0
            StorageEngine::instance()->tablet_manager()->get_tablet(request.base_tablet_id);
729
0
    if (base_tablet == nullptr) {
730
0
        return Status::Error<TABLE_NOT_FOUND>("fail to find base tablet. base_tablet={}",
731
0
                                              request.base_tablet_id);
732
0
    }
733
734
0
    signal::tablet_id = base_tablet->get_table_id();
735
736
    // new tablet has to exist
737
0
    TabletSharedPtr new_tablet =
738
0
            StorageEngine::instance()->tablet_manager()->get_tablet(request.new_tablet_id);
739
0
    if (new_tablet == nullptr) {
740
0
        return Status::Error<TABLE_NOT_FOUND>("fail to find new tablet. new_tablet={}",
741
0
                                              request.new_tablet_id);
742
0
    }
743
744
    // check if tablet's state is not_ready, if it is ready, it means the tablet already finished
745
    // check whether the tablet's max continuous version == request.version
746
0
    if (new_tablet->tablet_state() != TABLET_NOTREADY) {
747
0
        res = _validate_alter_result(new_tablet, request);
748
0
        LOG(INFO) << "tablet's state=" << new_tablet->tablet_state()
749
0
                  << " the convert job already finished, check its version"
750
0
                  << " res=" << res;
751
0
        return res;
752
0
    }
753
0
    new_tablet->set_alter_failed(false);
754
0
    Defer defer([&new_tablet] {
755
        // if tablet state is not TABLET_RUNNING when return, indicates that alter has failed.
756
0
        if (new_tablet->tablet_state() != TABLET_RUNNING) {
757
0
            new_tablet->set_alter_failed(true);
758
0
        }
759
0
    });
760
761
0
    LOG(INFO) << "finish to validate alter tablet request. begin to convert data from base tablet "
762
0
                 "to new tablet"
763
0
              << " base_tablet=" << base_tablet->full_name()
764
0
              << " new_tablet=" << new_tablet->full_name();
765
766
0
    std::shared_lock base_migration_rlock(base_tablet->get_migration_lock(), std::try_to_lock);
767
0
    if (!base_migration_rlock.owns_lock()) {
768
0
        return Status::Error<TRY_LOCK_FAILED>(
769
0
                "SchemaChangeHandler::_do_process_alter_tablet_v2 get lock failed");
770
0
    }
771
0
    std::shared_lock new_migration_rlock(new_tablet->get_migration_lock(), std::try_to_lock);
772
0
    if (!new_migration_rlock.owns_lock()) {
773
0
        return Status::Error<TRY_LOCK_FAILED>(
774
0
                "SchemaChangeHandler::_do_process_alter_tablet_v2 get lock failed");
775
0
    }
776
777
0
    std::vector<Version> versions_to_be_changed;
778
0
    int64_t end_version = -1;
779
    // reader_context is stack variables, it's lifetime should keep the same
780
    // with rs_readers
781
0
    RowsetReaderContext reader_context;
782
0
    std::vector<RowSetSplits> rs_splits;
783
    // delete handlers for new tablet
784
0
    DeleteHandler delete_handler;
785
0
    std::vector<ColumnId> return_columns;
786
    // Create a new tablet schema, should merge with dropped columns in light weight schema change
787
0
    TabletSchemaSPtr base_tablet_schema = std::make_shared<TabletSchema>();
788
0
    base_tablet_schema->copy_from(*base_tablet->tablet_schema());
789
0
    if (!request.columns.empty() && request.columns[0].col_unique_id >= 0) {
790
0
        base_tablet_schema->clear_columns();
791
0
        for (const auto& column : request.columns) {
792
0
            base_tablet_schema->append_column(TabletColumn(column));
793
0
        }
794
        // The request only include column info, do not include bitmap or bloomfilter index info,
795
        // So we also need to copy index info from the real base tablet
796
0
        base_tablet_schema->update_index_info_from(*base_tablet->tablet_schema());
797
0
    }
798
    // Use tablet schema directly from base tablet, they are the newest schema, not contain
799
    // dropped column during light weight schema change.
800
    // But the tablet schema in base tablet maybe not the latest from FE, so that if fe pass through
801
    // a tablet schema, then use request schema.
802
0
    size_t num_cols = request.columns.empty() ? base_tablet->tablet_schema()->num_columns()
803
0
                                              : request.columns.size();
804
0
    return_columns.resize(num_cols);
805
0
    for (int i = 0; i < num_cols; ++i) {
806
0
        return_columns[i] = i;
807
0
    }
808
809
    // begin to find deltas to convert from base tablet to new tablet so that
810
    // obtain base tablet and new tablet's push lock and header write lock to prevent loading data
811
0
    {
812
0
        std::lock_guard<std::mutex> base_tablet_lock(base_tablet->get_push_lock());
813
0
        std::lock_guard<std::mutex> new_tablet_lock(new_tablet->get_push_lock());
814
0
        std::lock_guard<std::shared_mutex> base_tablet_wlock(base_tablet->get_header_lock());
815
0
        SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
816
0
        std::lock_guard<std::shared_mutex> new_tablet_wlock(new_tablet->get_header_lock());
817
818
0
        do {
819
0
            RowsetSharedPtr max_rowset;
820
            // get history data to be converted and it will check if there is hold in base tablet
821
0
            res = _get_versions_to_be_changed(base_tablet, &versions_to_be_changed, &max_rowset);
822
0
            if (!res) {
823
0
                LOG(WARNING) << "fail to get version to be changed. res=" << res;
824
0
                break;
825
0
            }
826
827
0
            DBUG_EXECUTE_IF("SchemaChangeJob.process_alter_tablet.alter_fail", {
828
0
                res = Status::InternalError(
829
0
                        "inject alter tablet failed. base_tablet={}, new_tablet={}",
830
0
                        request.base_tablet_id, request.new_tablet_id);
831
0
                LOG(WARNING) << "inject error. res=" << res;
832
0
                break;
833
0
            });
834
835
            // should check the max_version >= request.alter_version, if not the convert is useless
836
0
            if (max_rowset == nullptr || max_rowset->end_version() < request.alter_version) {
837
0
                res = Status::InternalError(
838
0
                        "base tablet's max version={} is less than request version={}",
839
0
                        (max_rowset == nullptr ? 0 : max_rowset->end_version()),
840
0
                        request.alter_version);
841
0
                break;
842
0
            }
843
            // before calculating version_to_be_changed,
844
            // remove all data from new tablet, prevent to rewrite data(those double pushed when wait)
845
0
            LOG(INFO) << "begin to remove all data from new tablet to prevent rewrite."
846
0
                      << " new_tablet=" << new_tablet->full_name();
847
0
            std::vector<RowsetSharedPtr> rowsets_to_delete;
848
0
            std::vector<std::pair<Version, RowsetSharedPtr>> version_rowsets;
849
0
            new_tablet->acquire_version_and_rowsets(&version_rowsets);
850
0
            std::sort(version_rowsets.begin(), version_rowsets.end(),
851
0
                      [](const std::pair<Version, RowsetSharedPtr>& l,
852
0
                         const std::pair<Version, RowsetSharedPtr>& r) {
853
0
                          return l.first.first < r.first.first;
854
0
                      });
855
0
            for (auto& pair : version_rowsets) {
856
0
                if (pair.first.second <= max_rowset->end_version()) {
857
0
                    rowsets_to_delete.push_back(pair.second);
858
0
                } else if (pair.first.first <= max_rowset->end_version()) {
859
                    // If max version is [X-10] and new tablet has version [7-9][10-12],
860
                    // we only can remove [7-9] from new tablet. If we add [X-10] to new tablet, it will has version
861
                    // cross: [X-10] [10-12].
862
                    // So, we should return OLAP_ERR_VERSION_ALREADY_MERGED for fast fail.
863
0
                    return Status::Error<VERSION_ALREADY_MERGED>(
864
0
                            "New tablet has a version {} crossing base tablet's max_version={}",
865
0
                            pair.first.to_string(), max_rowset->end_version());
866
0
                }
867
0
            }
868
0
            new_tablet->delete_rowsets(rowsets_to_delete, false);
869
            // inherit cumulative_layer_point from base_tablet
870
            // check if new_tablet.ce_point > base_tablet.ce_point?
871
0
            new_tablet->set_cumulative_layer_point(-1);
872
            // save tablet meta
873
0
            new_tablet->save_meta();
874
0
            for (auto& rowset : rowsets_to_delete) {
875
                // do not call rowset.remove directly, using gc thread to delete it
876
0
                StorageEngine::instance()->add_unused_rowset(rowset);
877
0
            }
878
879
            // init one delete handler
880
0
            for (auto& version : versions_to_be_changed) {
881
0
                end_version = std::max(end_version, version.second);
882
0
            }
883
884
            // acquire data sources correspond to history versions
885
0
            base_tablet->capture_rs_readers(versions_to_be_changed, &rs_splits);
886
0
            if (rs_splits.empty()) {
887
0
                res = Status::Error<ALTER_DELTA_DOES_NOT_EXISTS>(
888
0
                        "fail to acquire all data sources. version_num={}, data_source_num={}",
889
0
                        versions_to_be_changed.size(), rs_splits.size());
890
0
                break;
891
0
            }
892
0
            std::vector<RowsetMetaSharedPtr> del_preds;
893
0
            for (auto&& split : rs_splits) {
894
0
                auto& rs_meta = split.rs_reader->rowset()->rowset_meta();
895
0
                if (!rs_meta->has_delete_predicate() || rs_meta->start_version() > end_version) {
896
0
                    continue;
897
0
                }
898
0
                base_tablet_schema->merge_dropped_columns(*rs_meta->tablet_schema());
899
0
                del_preds.push_back(rs_meta);
900
0
            }
901
0
            res = delete_handler.init(base_tablet_schema, del_preds, end_version);
902
0
            if (!res) {
903
0
                LOG(WARNING) << "init delete handler failed. base_tablet="
904
0
                             << base_tablet->full_name() << ", end_version=" << end_version;
905
0
                break;
906
0
            }
907
908
0
            reader_context.reader_type = ReaderType::READER_ALTER_TABLE;
909
0
            reader_context.tablet_schema = base_tablet_schema;
910
0
            reader_context.need_ordered_result = true;
911
0
            reader_context.delete_handler = &delete_handler;
912
0
            reader_context.return_columns = &return_columns;
913
0
            reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx();
914
0
            reader_context.is_unique = base_tablet->keys_type() == UNIQUE_KEYS;
915
0
            reader_context.batch_size = ALTER_TABLE_BATCH_SIZE;
916
0
            reader_context.delete_bitmap = &base_tablet->tablet_meta()->delete_bitmap();
917
0
            reader_context.version = Version(0, end_version);
918
0
            for (auto& rs_split : rs_splits) {
919
0
                res = rs_split.rs_reader->init(&reader_context);
920
0
                if (!res) {
921
0
                    LOG(WARNING) << "failed to init rowset reader: " << base_tablet->full_name();
922
0
                    break;
923
0
                }
924
0
            }
925
0
        } while (false);
926
0
    }
927
928
0
    do {
929
0
        if (!res) {
930
0
            break;
931
0
        }
932
0
        SchemaChangeParams sc_params;
933
934
0
        DescriptorTbl::create(&sc_params.pool, request.desc_tbl, &sc_params.desc_tbl);
935
0
        sc_params.base_tablet = base_tablet;
936
0
        sc_params.new_tablet = new_tablet;
937
0
        sc_params.ref_rowset_readers.reserve(rs_splits.size());
938
0
        for (RowSetSplits& split : rs_splits) {
939
0
            sc_params.ref_rowset_readers.emplace_back(split.rs_reader);
940
0
        }
941
0
        sc_params.delete_handler = &delete_handler;
942
0
        sc_params.base_tablet_schema = base_tablet_schema;
943
0
        sc_params.be_exec_version = request.be_exec_version;
944
0
        DCHECK(request.__isset.alter_tablet_type);
945
0
        switch (request.alter_tablet_type) {
946
0
        case TAlterTabletType::SCHEMA_CHANGE:
947
0
            sc_params.alter_tablet_type = AlterTabletType::SCHEMA_CHANGE;
948
0
            break;
949
0
        case TAlterTabletType::ROLLUP:
950
0
            sc_params.alter_tablet_type = AlterTabletType::ROLLUP;
951
0
            break;
952
0
        case TAlterTabletType::MIGRATION:
953
0
            sc_params.alter_tablet_type = AlterTabletType::MIGRATION;
954
0
            break;
955
0
        }
956
0
        if (request.__isset.materialized_view_params) {
957
0
            for (auto item : request.materialized_view_params) {
958
0
                AlterMaterializedViewParam mv_param;
959
0
                mv_param.column_name = item.column_name;
960
961
0
                if (item.__isset.mv_expr) {
962
0
                    mv_param.expr = std::make_shared<TExpr>(item.mv_expr);
963
0
                }
964
0
                sc_params.materialized_params_map.insert(
965
0
                        std::make_pair(item.column_name, mv_param));
966
0
            }
967
0
        }
968
0
        {
969
0
            std::lock_guard<std::shared_mutex> wrlock(_mutex);
970
0
            _tablet_ids_in_converting.insert(new_tablet->tablet_id());
971
0
        }
972
0
        int64_t real_alter_version = 0;
973
0
        res = _convert_historical_rowsets(sc_params, &real_alter_version);
974
0
        {
975
0
            std::lock_guard<std::shared_mutex> wrlock(_mutex);
976
0
            _tablet_ids_in_converting.erase(new_tablet->tablet_id());
977
0
        }
978
0
        if (!res) {
979
0
            break;
980
0
        }
981
982
0
        DCHECK_GE(real_alter_version, request.alter_version);
983
984
0
        if (new_tablet->keys_type() == UNIQUE_KEYS &&
985
0
            new_tablet->enable_unique_key_merge_on_write()) {
986
0
            res = _calc_delete_bitmap_for_mow_table(new_tablet, real_alter_version);
987
0
            if (!res) {
988
0
                break;
989
0
            }
990
0
        } else {
991
            // set state to ready
992
0
            std::lock_guard<std::shared_mutex> new_wlock(new_tablet->get_header_lock());
993
0
            SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
994
0
            res = new_tablet->set_tablet_state(TabletState::TABLET_RUNNING);
995
0
            if (!res) {
996
0
                break;
997
0
            }
998
0
            new_tablet->save_meta();
999
0
        }
1000
0
    } while (false);
1001
1002
0
    if (res) {
1003
        // _validate_alter_result should be outside the above while loop.
1004
        // to avoid requiring the header lock twice.
1005
0
        res = _validate_alter_result(new_tablet, request);
1006
0
    }
1007
1008
    // if failed convert history data, then just remove the new tablet
1009
0
    if (!res) {
1010
0
        LOG(WARNING) << "failed to alter tablet. base_tablet=" << base_tablet->full_name()
1011
0
                     << ", drop new_tablet=" << new_tablet->full_name();
1012
        // do not drop the new tablet and its data. GC thread will
1013
0
    }
1014
1015
0
    return res;
1016
0
}
1017
1018
0
bool SchemaChangeHandler::tablet_in_converting(int64_t tablet_id) {
1019
0
    std::shared_lock rdlock(_mutex);
1020
0
    return _tablet_ids_in_converting.find(tablet_id) != _tablet_ids_in_converting.end();
1021
0
}
1022
1023
Status SchemaChangeHandler::_get_versions_to_be_changed(
1024
        TabletSharedPtr base_tablet, std::vector<Version>* versions_to_be_changed,
1025
0
        RowsetSharedPtr* max_rowset) {
1026
0
    RowsetSharedPtr rowset = base_tablet->rowset_with_max_version();
1027
0
    if (rowset == nullptr) {
1028
0
        return Status::Error<ALTER_DELTA_DOES_NOT_EXISTS>("Tablet has no version. base_tablet={}",
1029
0
                                                          base_tablet->full_name());
1030
0
    }
1031
0
    *max_rowset = rowset;
1032
1033
0
    RETURN_IF_ERROR(base_tablet->capture_consistent_versions(Version(0, rowset->version().second),
1034
0
                                                             versions_to_be_changed, false, false));
1035
1036
0
    return Status::OK();
1037
0
}
1038
1039
// The `real_alter_version` parameter indicates that the version of [0-real_alter_version] is
1040
// converted from a base tablet, only used for the mow table now.
1041
Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams& sc_params,
1042
0
                                                        int64_t* real_alter_version) {
1043
0
    LOG(INFO) << "begin to convert historical rowsets for new_tablet from base_tablet."
1044
0
              << " base_tablet=" << sc_params.base_tablet->full_name()
1045
0
              << ", new_tablet=" << sc_params.new_tablet->full_name();
1046
1047
    // find end version
1048
0
    int32_t end_version = -1;
1049
0
    for (size_t i = 0; i < sc_params.ref_rowset_readers.size(); ++i) {
1050
0
        if (sc_params.ref_rowset_readers[i]->version().second > end_version) {
1051
0
            end_version = sc_params.ref_rowset_readers[i]->version().second;
1052
0
        }
1053
0
    }
1054
1055
    // Add filter information in change, and filter column information will be set in _parse_request
1056
    // And filter some data every time the row block changes
1057
0
    BlockChanger changer(sc_params.new_tablet->tablet_schema(), *sc_params.desc_tbl);
1058
1059
0
    bool sc_sorting = false;
1060
0
    bool sc_directly = false;
1061
1062
    // a.Parse the Alter request and convert it into an internal representation
1063
0
    Status res = _parse_request(sc_params, &changer, &sc_sorting, &sc_directly);
1064
0
    LOG(INFO) << "schema change type, sc_sorting: " << sc_sorting
1065
0
              << ", sc_directly: " << sc_directly
1066
0
              << ", base_tablet=" << sc_params.base_tablet->full_name()
1067
0
              << ", new_tablet=" << sc_params.new_tablet->full_name();
1068
1069
0
    auto process_alter_exit = [&]() -> Status {
1070
0
        {
1071
            // save tablet meta here because rowset meta is not saved during add rowset
1072
0
            std::lock_guard<std::shared_mutex> new_wlock(sc_params.new_tablet->get_header_lock());
1073
0
            SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
1074
0
            sc_params.new_tablet->save_meta();
1075
0
        }
1076
0
        if (res) {
1077
0
            Version test_version(0, end_version);
1078
0
            res = sc_params.new_tablet->check_version_integrity(test_version);
1079
0
        }
1080
1081
0
        LOG(INFO) << "finish converting rowsets for new_tablet from base_tablet. "
1082
0
                  << "base_tablet=" << sc_params.base_tablet->full_name()
1083
0
                  << ", new_tablet=" << sc_params.new_tablet->full_name();
1084
0
        return res;
1085
0
    };
1086
1087
0
    if (!res) {
1088
0
        LOG(WARNING) << "failed to parse the request. res=" << res;
1089
0
        return process_alter_exit();
1090
0
    }
1091
1092
0
    if (!sc_sorting && !sc_directly && sc_params.alter_tablet_type == AlterTabletType::ROLLUP) {
1093
0
        res = Status::Error<SCHEMA_SCHEMA_INVALID>(
1094
0
                "Don't support to add materialized view by linked schema change");
1095
0
        return process_alter_exit();
1096
0
    }
1097
1098
    // b. Generate historical data converter
1099
0
    auto sc_procedure = get_sc_procedure(changer, sc_sorting, sc_directly);
1100
1101
    // c.Convert historical data
1102
0
    bool have_failure_rowset = false;
1103
0
    for (auto& rs_reader : sc_params.ref_rowset_readers) {
1104
0
        VLOG_TRACE << "begin to convert a history rowset. version=" << rs_reader->version().first
1105
0
                   << "-" << rs_reader->version().second;
1106
1107
        // set status for monitor
1108
        // As long as there is a new_table as running, ref table is set as running
1109
        // NOTE If the first sub_table fails first, it will continue to go as normal here
1110
0
        TabletSharedPtr new_tablet = sc_params.new_tablet;
1111
        // When tablet create new rowset writer, it may change rowset type, in this case
1112
        // linked schema change will not be used.
1113
0
        std::unique_ptr<RowsetWriter> rowset_writer;
1114
0
        RowsetWriterContext context;
1115
0
        context.version = rs_reader->version();
1116
0
        context.rowset_state = VISIBLE;
1117
0
        context.segments_overlap = rs_reader->rowset()->rowset_meta()->segments_overlap();
1118
0
        context.tablet_schema = new_tablet->tablet_schema();
1119
0
        context.newest_write_timestamp = rs_reader->newest_write_timestamp();
1120
0
        context.fs = io::global_local_filesystem();
1121
0
        context.write_type = DataWriteType::TYPE_SCHEMA_CHANGE;
1122
0
        Status status = new_tablet->create_rowset_writer(context, &rowset_writer);
1123
0
        if (!status.ok()) {
1124
0
            res = Status::Error<ROWSET_BUILDER_INIT>("create_rowset_writer failed, reason={}",
1125
0
                                                     status.to_string());
1126
0
            return process_alter_exit();
1127
0
        }
1128
1129
0
        if (res = sc_procedure->process(rs_reader, rowset_writer.get(), sc_params.new_tablet,
1130
0
                                        sc_params.base_tablet, sc_params.base_tablet_schema);
1131
0
            !res) {
1132
0
            LOG(WARNING) << "failed to process the version."
1133
0
                         << " version=" << rs_reader->version().first << "-"
1134
0
                         << rs_reader->version().second << ", " << res.to_string();
1135
0
            return process_alter_exit();
1136
0
        }
1137
        // Add the new version of the data to the header
1138
        // In order to prevent the occurrence of deadlock, we must first lock the old table, and then lock the new table
1139
0
        std::lock_guard<std::mutex> lock(sc_params.new_tablet->get_push_lock());
1140
0
        RowsetSharedPtr new_rowset;
1141
0
        if (!(res = rowset_writer->build(new_rowset)).ok()) {
1142
0
            LOG(WARNING) << "failed to build rowset, exit alter process";
1143
0
            return process_alter_exit();
1144
0
        }
1145
0
        res = sc_params.new_tablet->add_rowset(new_rowset);
1146
0
        if (res.is<PUSH_VERSION_ALREADY_EXIST>()) {
1147
0
            LOG(WARNING) << "version already exist, version revert occurred. "
1148
0
                         << "tablet=" << sc_params.new_tablet->full_name() << ", version='"
1149
0
                         << rs_reader->version().first << "-" << rs_reader->version().second;
1150
0
            StorageEngine::instance()->add_unused_rowset(new_rowset);
1151
0
            have_failure_rowset = true;
1152
0
            res = Status::OK();
1153
0
        } else if (!res) {
1154
0
            LOG(WARNING) << "failed to register new version. "
1155
0
                         << " tablet=" << sc_params.new_tablet->full_name()
1156
0
                         << ", version=" << rs_reader->version().first << "-"
1157
0
                         << rs_reader->version().second;
1158
0
            StorageEngine::instance()->add_unused_rowset(new_rowset);
1159
0
            return process_alter_exit();
1160
0
        } else {
1161
0
            VLOG_NOTICE << "register new version. tablet=" << sc_params.new_tablet->full_name()
1162
0
                        << ", version=" << rs_reader->version().first << "-"
1163
0
                        << rs_reader->version().second;
1164
0
        }
1165
0
        if (!have_failure_rowset) {
1166
0
            *real_alter_version = rs_reader->version().second;
1167
0
        }
1168
1169
0
        VLOG_TRACE << "succeed to convert a history version."
1170
0
                   << " version=" << rs_reader->version().first << "-"
1171
0
                   << rs_reader->version().second;
1172
0
    }
1173
1174
    // XXX:The SchemaChange state should not be canceled at this time, because the new Delta has to be converted to the old and new Schema version
1175
0
    return process_alter_exit();
1176
0
}
1177
1178
// @static
1179
// Analyze the mapping of the column and the mapping of the filter key
1180
Status SchemaChangeHandler::_parse_request(const SchemaChangeParams& sc_params,
1181
                                           BlockChanger* changer, bool* sc_sorting,
1182
0
                                           bool* sc_directly) {
1183
0
    changer->set_type(sc_params.alter_tablet_type);
1184
0
    changer->set_compatible_version(sc_params.be_exec_version);
1185
1186
0
    TabletSharedPtr base_tablet = sc_params.base_tablet;
1187
0
    TabletSharedPtr new_tablet = sc_params.new_tablet;
1188
0
    TabletSchemaSPtr base_tablet_schema = sc_params.base_tablet_schema;
1189
0
    const std::unordered_map<std::string, AlterMaterializedViewParam>& materialized_function_map =
1190
0
            sc_params.materialized_params_map;
1191
0
    DescriptorTbl desc_tbl = *sc_params.desc_tbl;
1192
    // set column mapping
1193
0
    for (int i = 0, new_schema_size = new_tablet->tablet_schema()->num_columns();
1194
0
         i < new_schema_size; ++i) {
1195
0
        const TabletColumn& new_column = new_tablet->tablet_schema()->column(i);
1196
0
        const std::string& column_name = new_column.name();
1197
0
        ColumnMapping* column_mapping = changer->get_mutable_column_mapping(i);
1198
0
        column_mapping->new_column = &new_column;
1199
1200
0
        if (materialized_function_map.find(column_name) != materialized_function_map.end()) {
1201
0
            auto mvParam = materialized_function_map.find(column_name)->second;
1202
0
            column_mapping->expr = mvParam.expr;
1203
0
            if (column_mapping->expr != nullptr) {
1204
0
                continue;
1205
0
            } else if (sc_params.alter_tablet_type != ROLLUP) {
1206
0
                return Status::Error<CE_CMD_PARAMS_ERROR>(
1207
0
                        "referenced column was missing. [column={} ,origin_column={}]", column_name,
1208
0
                        mvParam.origin_column_name);
1209
0
            }
1210
0
        }
1211
1212
0
        int32_t column_index = base_tablet_schema->field_index(column_name);
1213
0
        if (column_index >= 0) {
1214
0
            column_mapping->ref_column = column_index;
1215
0
            continue;
1216
0
        }
1217
1218
0
        if (column_name.find("__doris_shadow_") == 0) {
1219
            // Should delete in the future, just a protection for bug.
1220
0
            LOG(INFO) << "a shadow column is encountered " << column_name;
1221
0
            return Status::InternalError("failed due to operate on shadow column");
1222
0
        }
1223
        // Newly added column go here
1224
0
        column_mapping->ref_column = -1;
1225
1226
0
        if (i < base_tablet_schema->num_short_key_columns()) {
1227
0
            *sc_directly = true;
1228
0
        }
1229
0
        RETURN_IF_ERROR(
1230
0
                _init_column_mapping(column_mapping, new_column, new_column.default_value()));
1231
1232
0
        LOG(INFO) << "A column with default value will be added after schema changing. "
1233
0
                  << "column=" << column_name << ", default_value=" << new_column.default_value()
1234
0
                  << " to table " << new_tablet->get_table_id();
1235
0
    }
1236
1237
0
    if (materialized_function_map.count(WHERE_SIGN)) {
1238
0
        changer->set_where_expr(materialized_function_map.find(WHERE_SIGN)->second.expr);
1239
0
    }
1240
1241
    // If the reference sequence of the Key column is out of order, it needs to be reordered
1242
0
    int num_default_value = 0;
1243
1244
0
    for (int i = 0, new_schema_size = new_tablet->num_key_columns(); i < new_schema_size; ++i) {
1245
0
        ColumnMapping* column_mapping = changer->get_mutable_column_mapping(i);
1246
1247
0
        if (!column_mapping->has_reference()) {
1248
0
            num_default_value++;
1249
0
            continue;
1250
0
        }
1251
1252
0
        if (column_mapping->ref_column != i - num_default_value) {
1253
0
            *sc_sorting = true;
1254
0
            return Status::OK();
1255
0
        }
1256
0
    }
1257
1258
0
    TabletSchemaSPtr new_tablet_schema = new_tablet->tablet_schema();
1259
0
    if (base_tablet_schema->keys_type() != new_tablet_schema->keys_type()) {
1260
        // only when base table is dup and mv is agg
1261
        // the rollup job must be reagg.
1262
0
        *sc_sorting = true;
1263
0
        return Status::OK();
1264
0
    }
1265
1266
    // If the sort of key has not been changed but the new keys num is less then base's,
1267
    // the new table should be re agg.
1268
    // So we also need to set sc_sorting = true.
1269
    // A, B, C are keys(sort keys), D is value
1270
    // followings need resort:
1271
    //      old keys:    A   B   C   D
1272
    //      new keys:    A   B
1273
0
    if (new_tablet_schema->keys_type() != KeysType::DUP_KEYS &&
1274
0
        new_tablet->num_key_columns() < base_tablet_schema->num_key_columns()) {
1275
        // this is a table with aggregate key type, and num of key columns in new schema
1276
        // is less, which means the data in new tablet should be more aggregated.
1277
        // so we use sorting schema change to sort and merge the data.
1278
0
        *sc_sorting = true;
1279
0
        return Status::OK();
1280
0
    }
1281
1282
0
    if (sc_params.alter_tablet_type == ROLLUP) {
1283
0
        *sc_directly = true;
1284
0
        return Status::OK();
1285
0
    }
1286
1287
0
    if (new_tablet->enable_unique_key_merge_on_write() &&
1288
0
        new_tablet->num_key_columns() > base_tablet_schema->num_key_columns()) {
1289
0
        *sc_directly = true;
1290
0
        return Status::OK();
1291
0
    }
1292
1293
0
    if (base_tablet_schema->num_short_key_columns() != new_tablet->num_short_key_columns()) {
1294
        // the number of short_keys changed, can't do linked schema change
1295
0
        *sc_directly = true;
1296
0
        return Status::OK();
1297
0
    }
1298
1299
0
    if (!sc_params.delete_handler->empty()) {
1300
        // there exists delete condition in header, can't do linked schema change
1301
0
        *sc_directly = true;
1302
0
        return Status::OK();
1303
0
    }
1304
1305
0
    for (size_t i = 0; i < new_tablet->num_columns(); ++i) {
1306
0
        ColumnMapping* column_mapping = changer->get_mutable_column_mapping(i);
1307
0
        if (column_mapping->expr != nullptr) {
1308
0
            *sc_directly = true;
1309
0
            return Status::OK();
1310
0
        } else if (column_mapping->ref_column >= 0) {
1311
0
            const auto& column_new = new_tablet_schema->column(i);
1312
0
            const auto& column_old = base_tablet_schema->column(column_mapping->ref_column);
1313
            // index changed
1314
0
            if (column_new.is_bf_column() != column_old.is_bf_column() ||
1315
0
                column_new.has_bitmap_index() != column_old.has_bitmap_index()) {
1316
0
                *sc_directly = true;
1317
0
                return Status::OK();
1318
0
            }
1319
0
        }
1320
0
    }
1321
1322
    // if rs_reader has remote files, link schema change is not supported,
1323
    // use directly schema change instead.
1324
0
    if (!(*sc_directly) && !(*sc_sorting)) {
1325
        // check has remote rowset
1326
0
        for (auto& rs_reader : sc_params.ref_rowset_readers) {
1327
0
            if (!rs_reader->rowset()->is_local()) {
1328
0
                *sc_directly = true;
1329
0
                break;
1330
0
            }
1331
0
        }
1332
0
    }
1333
1334
0
    return Status::OK();
1335
0
}
1336
1337
Status SchemaChangeHandler::_init_column_mapping(ColumnMapping* column_mapping,
1338
                                                 const TabletColumn& column_schema,
1339
0
                                                 const std::string& value) {
1340
0
    column_mapping->default_value = WrapperField::create(column_schema);
1341
1342
0
    if (column_mapping->default_value == nullptr) {
1343
0
        return Status::Error<MEM_ALLOC_FAILED>("column_mapping->default_value is nullptr");
1344
0
    }
1345
1346
0
    if (column_schema.is_nullable() && value.length() == 0) {
1347
0
        column_mapping->default_value->set_null();
1348
0
    } else {
1349
0
        column_mapping->default_value->from_string(value, column_schema.precision(),
1350
0
                                                   column_schema.frac());
1351
0
    }
1352
1353
0
    return Status::OK();
1354
0
}
1355
1356
Status SchemaChangeHandler::_validate_alter_result(TabletSharedPtr new_tablet,
1357
0
                                                   const TAlterTabletReqV2& request) {
1358
0
    Version max_continuous_version = {-1, 0};
1359
0
    new_tablet->max_continuous_version_from_beginning(&max_continuous_version);
1360
0
    LOG(INFO) << "find max continuous version of tablet=" << new_tablet->full_name()
1361
0
              << ", start_version=" << max_continuous_version.first
1362
0
              << ", end_version=" << max_continuous_version.second;
1363
0
    if (max_continuous_version.second < request.alter_version) {
1364
0
        return Status::InternalError("result version={} is less than request version={}",
1365
0
                                     max_continuous_version.second, request.alter_version);
1366
0
    }
1367
1368
0
    std::vector<std::pair<Version, RowsetSharedPtr>> version_rowsets;
1369
0
    {
1370
0
        std::shared_lock rdlock(new_tablet->get_header_lock());
1371
0
        new_tablet->acquire_version_and_rowsets(&version_rowsets);
1372
0
    }
1373
0
    for (auto& pair : version_rowsets) {
1374
0
        RowsetSharedPtr rowset = pair.second;
1375
0
        if (!rowset->check_file_exist()) {
1376
0
            return Status::Error<FILE_NOT_EXIST>(
1377
0
                    "SchemaChangeHandler::_validate_alter_result meet invalid rowset");
1378
0
        }
1379
0
    }
1380
0
    return Status::OK();
1381
0
}
1382
1383
// For unique with merge-on-write table, should process delete bitmap here.
1384
// 1. During double write, the newly imported rowsets does not calculate
1385
// delete bitmap and publish successfully.
1386
// 2. After conversion, calculate delete bitmap for the rowsets imported
1387
// during double write. During this period, new data can still be imported
1388
// witout calculating delete bitmap and publish successfully.
1389
// 3. Block the new publish, calculate the delete bitmap of the
1390
// incremental rowsets.
1391
// 4. Switch the tablet status to TABLET_RUNNING. The newly imported
1392
// data will calculate delete bitmap.
1393
Status SchemaChangeHandler::_calc_delete_bitmap_for_mow_table(TabletSharedPtr new_tablet,
1394
0
                                                              int64_t alter_version) {
1395
0
    DBUG_EXECUTE_IF("SchemaChangeHandler._calc_delete_bitmap_for_mow_table.random_failed", {
1396
0
        if (rand() % 100 < (100 * dp->param("percent", 0.1))) {
1397
0
            LOG_WARNING("SchemaChangeHandler._calc_delete_bitmap_for_mow_table.random_failed");
1398
0
            return Status::InternalError("debug schema change calc delete bitmap random failed");
1399
0
        }
1400
0
    });
1401
1402
    // can't do compaction when calc delete bitmap, if the rowset being calculated does
1403
    // a compaction, it may cause the delete bitmap to be missed.
1404
0
    std::lock_guard base_compaction_lock(new_tablet->get_base_compaction_lock());
1405
0
    std::lock_guard cumu_compaction_lock(new_tablet->get_cumulative_compaction_lock());
1406
1407
    // step 2
1408
0
    int64_t max_version = new_tablet->max_version().second;
1409
0
    std::vector<RowsetSharedPtr> rowsets;
1410
0
    if (alter_version < max_version) {
1411
0
        LOG(INFO) << "alter table for unique with merge-on-write, calculate delete bitmap of "
1412
0
                  << "double write rowsets for version: " << alter_version + 1 << "-" << max_version
1413
0
                  << " new_tablet=" << new_tablet->tablet_id();
1414
0
        std::shared_lock<std::shared_mutex> rlock(new_tablet->get_header_lock());
1415
0
        RETURN_IF_ERROR(
1416
0
                new_tablet->capture_consistent_rowsets({alter_version + 1, max_version}, &rowsets));
1417
0
    }
1418
0
    for (auto rowset_ptr : rowsets) {
1419
0
        std::lock_guard<std::mutex> rwlock(new_tablet->get_rowset_update_lock());
1420
0
        std::shared_lock<std::shared_mutex> rlock(new_tablet->get_header_lock());
1421
0
        RETURN_IF_ERROR(new_tablet->update_delete_bitmap_without_lock(rowset_ptr));
1422
0
    }
1423
1424
    // step 3
1425
0
    std::lock_guard<std::mutex> rwlock(new_tablet->get_rowset_update_lock());
1426
0
    std::lock_guard<std::shared_mutex> new_wlock(new_tablet->get_header_lock());
1427
0
    SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
1428
0
    int64_t new_max_version = new_tablet->max_version_unlocked().second;
1429
0
    rowsets.clear();
1430
0
    if (max_version < new_max_version) {
1431
0
        LOG(INFO) << "alter table for unique with merge-on-write, calculate delete bitmap of "
1432
0
                  << "incremental rowsets for version: " << max_version + 1 << "-"
1433
0
                  << new_max_version << " new_tablet=" << new_tablet->tablet_id();
1434
0
        RETURN_IF_ERROR(new_tablet->capture_consistent_rowsets({max_version + 1, new_max_version},
1435
0
                                                               &rowsets));
1436
0
    }
1437
0
    for (auto rowset_ptr : rowsets) {
1438
0
        RETURN_IF_ERROR(new_tablet->update_delete_bitmap_without_lock(rowset_ptr));
1439
0
    }
1440
    // step 4
1441
0
    RETURN_IF_ERROR(new_tablet->set_tablet_state(TabletState::TABLET_RUNNING));
1442
0
    new_tablet->save_meta();
1443
0
    return Status::OK();
1444
0
}
1445
1446
} // namespace doris