Coverage Report

Created: 2025-09-11 19:59

/root/doris/be/src/olap/schema_change.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "olap/schema_change.h"
19
20
#include <gen_cpp/olap_file.pb.h>
21
#include <glog/logging.h>
22
#include <thrift/protocol/TDebugProtocol.h>
23
24
#include <algorithm>
25
#include <exception>
26
#include <map>
27
#include <memory>
28
#include <mutex>
29
#include <roaring/roaring.hh>
30
#include <tuple>
31
#include <utility>
32
33
#include "agent/be_exec_version_manager.h"
34
#include "cloud/cloud_schema_change_job.h"
35
#include "cloud/config.h"
36
#include "common/consts.h"
37
#include "common/logging.h"
38
#include "common/signal_handler.h"
39
#include "common/status.h"
40
#include "exec/schema_scanner/schema_metadata_name_ids_scanner.h"
41
#include "gutil/integral_types.h"
42
#include "gutil/strings/numbers.h"
43
#include "io/fs/file_system.h"
44
#include "io/io_common.h"
45
#include "olap/base_tablet.h"
46
#include "olap/data_dir.h"
47
#include "olap/delete_handler.h"
48
#include "olap/field.h"
49
#include "olap/iterators.h"
50
#include "olap/merger.h"
51
#include "olap/olap_common.h"
52
#include "olap/olap_define.h"
53
#include "olap/rowset/beta_rowset.h"
54
#include "olap/rowset/pending_rowset_helper.h"
55
#include "olap/rowset/rowset_meta.h"
56
#include "olap/rowset/rowset_reader_context.h"
57
#include "olap/rowset/rowset_writer_context.h"
58
#include "olap/rowset/segment_v2/column_reader.h"
59
#include "olap/rowset/segment_v2/inverted_index_desc.h"
60
#include "olap/rowset/segment_v2/inverted_index_writer.h"
61
#include "olap/rowset/segment_v2/segment.h"
62
#include "olap/schema.h"
63
#include "olap/segment_loader.h"
64
#include "olap/storage_engine.h"
65
#include "olap/tablet.h"
66
#include "olap/tablet_fwd.h"
67
#include "olap/tablet_manager.h"
68
#include "olap/tablet_meta.h"
69
#include "olap/tablet_schema.h"
70
#include "olap/types.h"
71
#include "olap/utils.h"
72
#include "olap/wrapper_field.h"
73
#include "runtime/exec_env.h"
74
#include "runtime/memory/mem_tracker.h"
75
#include "runtime/runtime_state.h"
76
#include "util/debug_points.h"
77
#include "util/defer_op.h"
78
#include "util/trace.h"
79
#include "vec/aggregate_functions/aggregate_function.h"
80
#include "vec/aggregate_functions/aggregate_function_reader.h"
81
#include "vec/columns/column.h"
82
#include "vec/columns/column_nullable.h"
83
#include "vec/common/assert_cast.h"
84
#include "vec/common/schema_util.h"
85
#include "vec/core/block.h"
86
#include "vec/core/column_with_type_and_name.h"
87
#include "vec/exprs/vexpr.h"
88
#include "vec/exprs/vexpr_context.h"
89
#include "vec/olap/olap_data_convertor.h"
90
91
namespace doris {
92
class CollectionValue;
93
94
using namespace ErrorCode;
95
96
constexpr int ALTER_TABLE_BATCH_SIZE = 4064;
97
98
class MultiBlockMerger {
99
public:
100
0
    MultiBlockMerger(BaseTabletSPtr tablet) : _tablet(tablet), _cmp(*tablet) {}
101
102
    Status merge(const std::vector<std::unique_ptr<vectorized::Block>>& blocks,
103
0
                 RowsetWriter* rowset_writer, uint64_t* merged_rows) {
104
0
        int rows = 0;
105
0
        for (const auto& block : blocks) {
  Branch (105:32): [True: 0, False: 0]
106
0
            rows += block->rows();
107
0
        }
108
0
        if (!rows) {
  Branch (108:13): [True: 0, False: 0]
109
0
            return Status::OK();
110
0
        }
111
112
0
        std::vector<RowRef> row_refs;
113
0
        row_refs.reserve(rows);
114
0
        for (const auto& block : blocks) {
  Branch (114:32): [True: 0, False: 0]
115
0
            for (uint16_t i = 0; i < block->rows(); i++) {
  Branch (115:34): [True: 0, False: 0]
116
0
                row_refs.emplace_back(block.get(), i);
117
0
            }
118
0
        }
119
        // TODO: try to use pdqsort to replace std::sort
120
        // The block version is incremental.
121
0
        std::stable_sort(row_refs.begin(), row_refs.end(), _cmp);
122
123
0
        auto finalized_block = _tablet->tablet_schema()->create_block();
124
0
        int columns = finalized_block.columns();
125
0
        *merged_rows += rows;
126
127
0
        if (_tablet->keys_type() == KeysType::AGG_KEYS) {
  Branch (127:13): [True: 0, False: 0]
128
0
            auto tablet_schema = _tablet->tablet_schema();
129
0
            int key_number = _tablet->num_key_columns();
130
131
0
            std::vector<vectorized::AggregateFunctionPtr> agg_functions;
132
0
            std::vector<vectorized::AggregateDataPtr> agg_places;
133
134
0
            for (int i = key_number; i < columns; i++) {
  Branch (134:38): [True: 0, False: 0]
135
0
                try {
136
0
                    vectorized::AggregateFunctionPtr function =
137
0
                            tablet_schema->column(i).get_aggregate_function(
138
0
                                    vectorized::AGG_LOAD_SUFFIX,
139
0
                                    tablet_schema->column(i).get_be_exec_version());
140
0
                    if (!function) {
  Branch (140:25): [True: 0, False: 0]
141
0
                        return Status::InternalError(
142
0
                                "could not find aggregate function on column {}, aggregation={}",
143
0
                                tablet_schema->column(i).name(),
144
0
                                tablet_schema->column(i).aggregation());
145
0
                    }
146
0
                    agg_functions.push_back(function);
147
                    // create aggregate data
148
0
                    auto* place = new char[function->size_of_data()];
149
0
                    function->create(place);
150
0
                    agg_places.push_back(place);
151
0
                } catch (...) {
152
0
                    for (int j = 0; j < i - key_number; ++j) {
  Branch (152:37): [True: 0, False: 0]
153
0
                        agg_functions[j]->destroy(agg_places[j]);
154
0
                        delete[] agg_places[j];
155
0
                    }
156
0
                    throw;
157
0
                }
158
0
            }
159
160
0
            DEFER({
Line
Count
Source
46
0
#define DEFER(...) DEFER_FWD(__LINE__, __VA_ARGS__)
Line
Count
Source
45
0
#define DEFER_FWD(n, ...) DEFER_CONCAT(n, __VA_ARGS__)
Line
Count
Source
44
0
#define DEFER_CONCAT(n, ...) const auto defer##n = doris::Defer([&]() { __VA_ARGS__; })
161
0
                for (int i = 0; i < columns - key_number; i++) {
162
0
                    agg_functions[i]->destroy(agg_places[i]);
163
0
                    delete[] agg_places[i];
164
0
                }
165
0
            });
166
167
0
            for (int i = 0; i < rows; i++) {
  Branch (167:29): [True: 0, False: 0]
168
0
                auto row_ref = row_refs[i];
169
0
                for (int j = key_number; j < columns; j++) {
  Branch (169:42): [True: 0, False: 0]
170
0
                    const auto* column_ptr = row_ref.get_column(j).get();
171
0
                    agg_functions[j - key_number]->add(
172
0
                            agg_places[j - key_number],
173
0
                            const_cast<const vectorized::IColumn**>(&column_ptr), row_ref.position,
174
0
                            &_arena);
175
0
                }
176
177
0
                if (i == rows - 1 || _cmp.compare(row_refs[i], row_refs[i + 1])) {
  Branch (177:21): [True: 0, False: 0]
  Branch (177:38): [True: 0, False: 0]
178
0
                    for (int j = 0; j < key_number; j++) {
  Branch (178:37): [True: 0, False: 0]
179
0
                        finalized_block.get_by_position(j).column->assume_mutable()->insert_from(
180
0
                                *row_ref.get_column(j), row_ref.position);
181
0
                    }
182
183
0
                    for (int j = key_number; j < columns; j++) {
  Branch (183:46): [True: 0, False: 0]
184
0
                        agg_functions[j - key_number]->insert_result_into(
185
0
                                agg_places[j - key_number],
186
0
                                finalized_block.get_by_position(j).column->assume_mutable_ref());
187
0
                        agg_functions[j - key_number]->reset(agg_places[j - key_number]);
188
0
                    }
189
190
0
                    if (i == rows - 1 || finalized_block.rows() == ALTER_TABLE_BATCH_SIZE) {
  Branch (190:25): [True: 0, False: 0]
  Branch (190:42): [True: 0, False: 0]
191
0
                        *merged_rows -= finalized_block.rows();
192
0
                        RETURN_IF_ERROR(rowset_writer->add_block(&finalized_block));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
193
0
                        finalized_block.clear_column_data();
194
0
                    }
195
0
                }
196
0
            }
197
0
        } else {
198
0
            std::vector<RowRef> pushed_row_refs;
199
0
            if (_tablet->keys_type() == KeysType::DUP_KEYS) {
  Branch (199:17): [True: 0, False: 0]
200
0
                std::swap(pushed_row_refs, row_refs);
201
0
            } else if (_tablet->keys_type() == KeysType::UNIQUE_KEYS) {
  Branch (201:24): [True: 0, False: 0]
202
0
                for (int i = 0; i < rows; i++) {
  Branch (202:33): [True: 0, False: 0]
203
0
                    if (i == rows - 1 || _cmp.compare(row_refs[i], row_refs[i + 1])) {
  Branch (203:25): [True: 0, False: 0]
  Branch (203:42): [True: 0, False: 0]
204
0
                        pushed_row_refs.push_back(row_refs[i]);
205
0
                    }
206
0
                }
207
0
            }
208
209
            // update real inserted row number
210
0
            rows = pushed_row_refs.size();
211
0
            *merged_rows -= rows;
212
213
0
            for (int i = 0; i < rows; i += ALTER_TABLE_BATCH_SIZE) {
  Branch (213:29): [True: 0, False: 0]
214
0
                int limit = std::min(ALTER_TABLE_BATCH_SIZE, rows - i);
215
216
0
                for (int idx = 0; idx < columns; idx++) {
  Branch (216:35): [True: 0, False: 0]
217
0
                    auto column = finalized_block.get_by_position(idx).column->assume_mutable();
218
0
                    for (int j = 0; j < limit; j++) {
  Branch (218:37): [True: 0, False: 0]
219
0
                        auto row_ref = pushed_row_refs[i + j];
220
0
                        column->insert_from(*row_ref.get_column(idx), row_ref.position);
221
0
                    }
222
0
                }
223
0
                RETURN_IF_ERROR(rowset_writer->add_block(&finalized_block));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
224
0
                finalized_block.clear_column_data();
225
0
            }
226
0
        }
227
228
0
        RETURN_IF_ERROR(rowset_writer->flush());
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
229
0
        return Status::OK();
230
0
    }
231
232
private:
233
    struct RowRef {
234
        RowRef(vectorized::Block* block_, uint16_t position_)
235
0
                : block(block_), position(position_) {}
236
0
        vectorized::ColumnPtr get_column(int index) const {
237
0
            return block->get_by_position(index).column;
238
0
        }
239
        const vectorized::Block* block;
240
        uint16_t position;
241
    };
242
243
    struct RowRefComparator {
244
0
        RowRefComparator(const BaseTablet& tablet) : _num_columns(tablet.num_key_columns()) {}
245
246
0
        int compare(const RowRef& lhs, const RowRef& rhs) const {
247
0
            return lhs.block->compare_at(lhs.position, rhs.position, _num_columns, *rhs.block, -1);
248
0
        }
249
250
0
        bool operator()(const RowRef& lhs, const RowRef& rhs) const {
251
0
            return compare(lhs, rhs) < 0;
252
0
        }
253
254
        const size_t _num_columns;
255
    };
256
257
    BaseTabletSPtr _tablet;
258
    RowRefComparator _cmp;
259
    vectorized::Arena _arena;
260
};
261
262
BlockChanger::BlockChanger(TabletSchemaSPtr tablet_schema, DescriptorTbl desc_tbl)
263
0
        : _desc_tbl(std::move(desc_tbl)) {
264
0
    _schema_mapping.resize(tablet_schema->num_columns());
265
0
}
266
267
0
BlockChanger::~BlockChanger() {
268
0
    for (auto it = _schema_mapping.begin(); it != _schema_mapping.end(); ++it) {
  Branch (268:45): [True: 0, False: 0]
269
0
        SAFE_DELETE(it->default_value);
Line
Count
Source
168
0
    do {                      \
169
0
        if (nullptr != ptr) { \
  Branch (169:13): [True: 0, False: 0]
170
0
            delete ptr;       \
171
0
            ptr = nullptr;    \
172
0
        }                     \
173
0
    } while (0)
  Branch (173:14): [Folded - Ignored]
270
0
    }
271
0
    _schema_mapping.clear();
272
0
}
273
274
0
ColumnMapping* BlockChanger::get_mutable_column_mapping(size_t column_index) {
275
0
    if (column_index >= _schema_mapping.size()) {
  Branch (275:9): [True: 0, False: 0]
276
0
        return nullptr;
277
0
    }
278
279
0
    return &(_schema_mapping[column_index]);
280
0
}
281
282
Status BlockChanger::change_block(vectorized::Block* ref_block,
283
0
                                  vectorized::Block* new_block) const {
284
0
    std::unique_ptr<RuntimeState> state = RuntimeState::create_unique();
285
0
    state->set_desc_tbl(&_desc_tbl);
286
0
    state->set_be_exec_version(_fe_compatible_version);
287
0
    RowDescriptor row_desc =
288
0
            RowDescriptor(_desc_tbl.get_tuple_descriptor(_desc_tbl.get_row_tuples()[0]), false);
289
290
0
    if (_where_expr != nullptr) {
  Branch (290:9): [True: 0, False: 0]
291
0
        vectorized::VExprContextSPtr ctx = nullptr;
292
0
        RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(*_where_expr, ctx));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
293
0
        RETURN_IF_ERROR(ctx->prepare(state.get(), row_desc));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
294
0
        RETURN_IF_ERROR(ctx->open(state.get()));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
295
296
0
        RETURN_IF_ERROR(
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
297
0
                vectorized::VExprContext::filter_block(ctx.get(), ref_block, ref_block->columns()));
298
0
    }
299
300
0
    const int row_num = ref_block->rows();
301
0
    const int new_schema_cols_num = new_block->columns();
302
303
    // will be used for swaping ref_block[entry.first] and new_block[entry.second]
304
0
    std::list<std::pair<int, int>> swap_idx_list;
305
0
    for (int idx = 0; idx < new_schema_cols_num; idx++) {
  Branch (305:23): [True: 0, False: 0]
306
0
        auto expr = _schema_mapping[idx].expr;
307
0
        if (expr != nullptr) {
  Branch (307:13): [True: 0, False: 0]
308
0
            vectorized::VExprContextSPtr ctx;
309
0
            RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(*expr, ctx));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
310
0
            RETURN_IF_ERROR(ctx->prepare(state.get(), row_desc));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
311
0
            RETURN_IF_ERROR(ctx->open(state.get()));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
312
313
0
            int result_tmp_column_idx = -1;
314
0
            RETURN_IF_ERROR(ctx->execute(ref_block, &result_tmp_column_idx));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
315
0
            auto& result_tmp_column_def = ref_block->get_by_position(result_tmp_column_idx);
316
0
            if (result_tmp_column_def.column == nullptr) {
  Branch (316:17): [True: 0, False: 0]
317
0
                return Status::Error<ErrorCode::INTERNAL_ERROR>(
318
0
                        "result column={} is nullptr, input expr={}", result_tmp_column_def.name,
319
0
                        apache::thrift::ThriftDebugString(*expr));
320
0
            }
321
0
            ref_block->replace_by_position_if_const(result_tmp_column_idx);
322
323
0
            if (result_tmp_column_def.column->size() != row_num) {
  Branch (323:17): [True: 0, False: 0]
324
0
                return Status::Error<ErrorCode::INTERNAL_ERROR>(
325
0
                        "result size invalid, expect={}, real={}; input expr={}", row_num,
326
0
                        result_tmp_column_def.column->size(),
327
0
                        apache::thrift::ThriftDebugString(*expr));
328
0
            }
329
330
0
            if (_type == SCHEMA_CHANGE) {
  Branch (330:17): [True: 0, False: 0]
331
                // danger casts (expected to be rejected by upstream caller) may cause data to be null and result in data loss in schema change
332
                // for rollup, this check is unecessary, and ref columns are not set in this case, it works on exprs
333
334
                // column_idx in base schema
335
0
                int32_t ref_column_idx = _schema_mapping[idx].ref_column_idx;
336
0
                DCHECK_GE(ref_column_idx, 0);
337
0
                auto& ref_column_def = ref_block->get_by_position(ref_column_idx);
338
0
                RETURN_IF_ERROR(
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
339
0
                        _check_cast_valid(ref_column_def.column, result_tmp_column_def.column));
340
0
            }
341
0
            swap_idx_list.emplace_back(result_tmp_column_idx, idx);
342
0
        } else if (_schema_mapping[idx].ref_column_idx < 0) {
  Branch (342:20): [True: 0, False: 0]
343
            // new column, write default value
344
0
            auto* value = _schema_mapping[idx].default_value;
345
0
            auto column = new_block->get_by_position(idx).column->assume_mutable();
346
0
            if (value->is_null()) {
  Branch (346:17): [True: 0, False: 0]
347
0
                DCHECK(column->is_nullable());
348
0
                column->insert_many_defaults(row_num);
349
0
            } else {
350
0
                auto type_info = get_type_info(_schema_mapping[idx].new_column);
351
0
                DefaultValueColumnIterator::insert_default_data(type_info.get(), value->size(),
352
0
                                                                value->ptr(), column, row_num);
353
0
            }
354
0
        } else {
355
            // same type, just swap column
356
0
            swap_idx_list.emplace_back(_schema_mapping[idx].ref_column_idx, idx);
357
0
        }
358
0
    }
359
360
0
    for (auto it : swap_idx_list) {
  Branch (360:18): [True: 0, False: 0]
361
0
        auto& ref_col = ref_block->get_by_position(it.first).column;
362
0
        auto& new_col = new_block->get_by_position(it.second).column;
363
364
0
        bool ref_col_nullable = ref_col->is_nullable();
365
0
        bool new_col_nullable = new_col->is_nullable();
366
367
0
        if (ref_col_nullable != new_col_nullable) {
  Branch (367:13): [True: 0, False: 0]
368
            // not nullable to nullable
369
0
            if (new_col_nullable) {
  Branch (369:17): [True: 0, False: 0]
370
0
                auto* new_nullable_col =
371
0
                        assert_cast<vectorized::ColumnNullable*>(new_col->assume_mutable().get());
372
373
0
                new_nullable_col->change_nested_column(ref_col);
374
0
                new_nullable_col->get_null_map_data().resize_fill(ref_col->size());
375
0
            } else {
376
                // nullable to not nullable:
377
                // suppose column `c_phone` is originally varchar(16) NOT NULL,
378
                // then do schema change `alter table test modify column c_phone int not null`,
379
                // the cast expr of schema change is `CastExpr(CAST String to Nullable(Int32))`,
380
                // so need to handle nullable to not nullable here
381
0
                auto* ref_nullable_col =
382
0
                        assert_cast<vectorized::ColumnNullable*>(ref_col->assume_mutable().get());
383
384
0
                new_col = ref_nullable_col->get_nested_column_ptr();
385
0
            }
386
0
        } else {
387
0
            new_block->get_by_position(it.second).column =
388
0
                    ref_block->get_by_position(it.first).column;
389
0
        }
390
0
    }
391
0
    return Status::OK();
392
0
}
393
394
// This check can prevent schema-change from causing data loss after type cast
395
Status BlockChanger::_check_cast_valid(vectorized::ColumnPtr input_column,
396
0
                                       vectorized::ColumnPtr output_column) {
397
0
    if (input_column->size() != output_column->size()) {
  Branch (397:9): [True: 0, False: 0]
398
0
        return Status::InternalError(
399
0
                "column size is changed, input_column_size={}, output_column_size={}; "
400
0
                "input_column={}",
401
0
                input_column->size(), output_column->size(), input_column->get_name());
402
0
    }
403
0
    DCHECK_EQ(input_column->size(), output_column->size())
404
0
            << "length check should have done before calling this function!";
405
406
0
    if (input_column->is_nullable() != output_column->is_nullable()) {
  Branch (406:9): [True: 0, False: 0]
407
0
        if (input_column->is_nullable()) {
  Branch (407:13): [True: 0, False: 0]
408
0
            const auto* ref_null_map =
409
0
                    vectorized::check_and_get_column<vectorized::ColumnNullable>(input_column)
410
0
                            ->get_null_map_column()
411
0
                            .get_data()
412
0
                            .data();
413
414
0
            bool is_changed = false;
415
0
            for (size_t i = 0; i < input_column->size(); i++) {
  Branch (415:32): [True: 0, False: 0]
416
0
                is_changed |= ref_null_map[i];
417
0
            }
418
0
            if (is_changed) {
  Branch (418:17): [True: 0, False: 0]
419
0
                return Status::DataQualityError(
420
0
                        "some null data is changed to not null, intput_column={}",
421
0
                        input_column->get_name());
422
0
            }
423
0
        } else {
424
0
            const auto& null_map_column =
425
0
                    vectorized::check_and_get_column<vectorized::ColumnNullable>(output_column)
426
0
                            ->get_null_map_column();
427
0
            const auto& nested_column =
428
0
                    vectorized::check_and_get_column<vectorized::ColumnNullable>(output_column)
429
0
                            ->get_nested_column();
430
0
            const auto* new_null_map = null_map_column.get_data().data();
431
432
0
            if (null_map_column.size() != output_column->size()) {
  Branch (432:17): [True: 0, False: 0]
433
0
                return Status::InternalError(
434
0
                        "null_map_column size mismatch output_column_size, "
435
0
                        "null_map_column_size={}, output_column_size={}; input_column={}",
436
0
                        null_map_column.size(), output_column->size(), input_column->get_name());
437
0
            }
438
439
0
            if (nested_column.size() != output_column->size()) {
  Branch (439:17): [True: 0, False: 0]
440
0
                return Status::InternalError(
441
0
                        "nested_column size is changed, nested_column_size={}, "
442
0
                        "ouput_column_size={}; input_column={}",
443
0
                        nested_column.size(), output_column->size(), input_column->get_name());
444
0
            }
445
446
0
            bool is_changed = false;
447
0
            for (size_t i = 0; i < input_column->size(); i++) {
  Branch (447:32): [True: 0, False: 0]
448
0
                is_changed |= new_null_map[i];
449
0
            }
450
0
            if (is_changed) {
  Branch (450:17): [True: 0, False: 0]
451
0
                return Status::DataQualityError(
452
0
                        "some not null data is changed to null, intput_column={}",
453
0
                        input_column->get_name());
454
0
            }
455
0
        }
456
0
    }
457
458
0
    if (input_column->is_nullable() && output_column->is_nullable()) {
  Branch (458:9): [True: 0, False: 0]
  Branch (458:40): [True: 0, False: 0]
459
0
        const auto* ref_null_map =
460
0
                vectorized::check_and_get_column<vectorized::ColumnNullable>(input_column)
461
0
                        ->get_null_map_column()
462
0
                        .get_data()
463
0
                        .data();
464
0
        const auto* new_null_map =
465
0
                vectorized::check_and_get_column<vectorized::ColumnNullable>(output_column)
466
0
                        ->get_null_map_column()
467
0
                        .get_data()
468
0
                        .data();
469
470
0
        bool is_changed = false;
471
0
        for (size_t i = 0; i < input_column->size(); i++) {
  Branch (471:28): [True: 0, False: 0]
472
0
            is_changed |= (ref_null_map[i] != new_null_map[i]);
473
0
        }
474
0
        if (is_changed) {
  Branch (474:13): [True: 0, False: 0]
475
0
            return Status::DataQualityError(
476
0
                    "null map is changed after calculation, input_column={}",
477
0
                    input_column->get_name());
478
0
        }
479
0
    }
480
0
    return Status::OK();
481
0
}
482
483
Status LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
484
                                   BaseTabletSPtr new_tablet, BaseTabletSPtr base_tablet,
485
                                   TabletSchemaSPtr base_tablet_schema,
486
0
                                   TabletSchemaSPtr new_tablet_schema) {
487
0
    Status status = rowset_writer->add_rowset_for_linked_schema_change(rowset_reader->rowset());
488
0
    if (!status) {
  Branch (488:9): [True: 0, False: 0]
489
0
        LOG(WARNING) << "fail to convert rowset."
490
0
                     << ", new_tablet=" << new_tablet->tablet_id()
491
0
                     << ", version=" << rowset_writer->version().first << "-"
492
0
                     << rowset_writer->version().second << ", error status " << status;
493
0
        return status;
494
0
    }
495
    // copy delete bitmap to new tablet.
496
0
    if (new_tablet->keys_type() == UNIQUE_KEYS && new_tablet->enable_unique_key_merge_on_write()) {
  Branch (496:9): [True: 0, False: 0]
  Branch (496:51): [True: 0, False: 0]
497
0
        DeleteBitmap origin_delete_bitmap(base_tablet->tablet_id());
498
0
        base_tablet->tablet_meta()->delete_bitmap()->subset(
499
0
                {rowset_reader->rowset()->rowset_id(), 0, 0},
500
0
                {rowset_reader->rowset()->rowset_id(), UINT32_MAX, INT64_MAX},
501
0
                &origin_delete_bitmap);
502
0
        for (auto& iter : origin_delete_bitmap.delete_bitmap) {
  Branch (502:25): [True: 0, False: 0]
503
0
            int ret = new_tablet->tablet_meta()->delete_bitmap()->set(
504
0
                    {rowset_writer->rowset_id(), std::get<1>(iter.first), std::get<2>(iter.first)},
505
0
                    iter.second);
506
0
            DCHECK(ret == 1);
507
0
        }
508
0
    }
509
0
    return Status::OK();
510
0
}
511
512
Status VSchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader,
513
                                             RowsetWriter* rowset_writer, BaseTabletSPtr new_tablet,
514
                                             TabletSchemaSPtr base_tablet_schema,
515
0
                                             TabletSchemaSPtr new_tablet_schema) {
516
0
    bool eof = false;
517
0
    do {
518
0
        auto new_block = vectorized::Block::create_unique(new_tablet_schema->create_block());
519
0
        auto ref_block = vectorized::Block::create_unique(base_tablet_schema->create_block());
520
521
0
        auto st = rowset_reader->next_block(ref_block.get());
522
0
        if (!st) {
  Branch (522:13): [True: 0, False: 0]
523
0
            if (st.is<ErrorCode::END_OF_FILE>()) {
  Branch (523:17): [True: 0, False: 0]
524
0
                if (ref_block->rows() == 0) {
  Branch (524:21): [True: 0, False: 0]
525
0
                    break;
526
0
                } else {
527
0
                    eof = true;
528
0
                }
529
0
            } else {
530
0
                return st;
531
0
            }
532
0
        }
533
534
0
        RETURN_IF_ERROR(_changer.change_block(ref_block.get(), new_block.get()));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
535
0
        RETURN_IF_ERROR(rowset_writer->add_block(new_block.get()));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
536
0
    } while (!eof);
  Branch (536:14): [True: 0, False: 0]
537
538
0
    RETURN_IF_ERROR(rowset_writer->flush());
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
539
0
    return Status::OK();
540
0
}
541
542
VBaseSchemaChangeWithSorting::VBaseSchemaChangeWithSorting(const BlockChanger& changer,
543
                                                           size_t memory_limitation)
544
        : _changer(changer),
545
          _memory_limitation(memory_limitation),
546
0
          _temp_delta_versions(Version::mock()) {
547
0
    _mem_tracker = std::make_unique<MemTracker>(
548
0
            fmt::format("VSchemaChangeWithSorting:changer={}", std::to_string(int64(&changer))));
549
0
}
550
551
Status VBaseSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader,
552
                                                    RowsetWriter* rowset_writer,
553
                                                    BaseTabletSPtr new_tablet,
554
                                                    TabletSchemaSPtr base_tablet_schema,
555
0
                                                    TabletSchemaSPtr new_tablet_schema) {
556
    // for internal sorting
557
0
    std::vector<std::unique_ptr<vectorized::Block>> blocks;
558
559
0
    RowsetSharedPtr rowset = rowset_reader->rowset();
560
0
    SegmentsOverlapPB segments_overlap = rowset->rowset_meta()->segments_overlap();
561
0
    int64_t newest_write_timestamp = rowset->newest_write_timestamp();
562
0
    _temp_delta_versions.first = _temp_delta_versions.second;
563
0
    _src_rowsets.clear(); // init _src_rowsets
564
0
    auto create_rowset = [&]() -> Status {
565
0
        if (blocks.empty()) {
  Branch (565:13): [True: 0, False: 0]
566
0
            return Status::OK();
567
0
        }
568
569
0
        auto rowset = DORIS_TRY(_internal_sorting(
Line
Count
Source
716
0
    ({                                           \
717
0
        auto&& res = (stmt);                     \
718
0
        using T = std::decay_t<decltype(res)>;   \
719
0
        if (!res.has_value()) [[unlikely]] {     \
  Branch (719:13): [True: 0, False: 0]
720
0
            return std::forward<T>(res).error(); \
721
0
        }                                        \
722
0
        std::forward<T>(res).value();            \
723
0
    });
570
0
                blocks, Version(_temp_delta_versions.second, _temp_delta_versions.second + 1),
571
0
                newest_write_timestamp, new_tablet, BETA_ROWSET, segments_overlap,
572
0
                new_tablet_schema));
573
0
        _src_rowsets.push_back(std::move(rowset));
574
0
        for (auto& block : blocks) {
  Branch (574:26): [True: 0, False: 0]
575
0
            _mem_tracker->release(block->allocated_bytes());
576
0
        }
577
0
        blocks.clear();
578
579
        // increase temp version
580
0
        _temp_delta_versions.second += 2;
581
0
        return Status::OK();
582
0
    };
583
584
0
    auto new_block = vectorized::Block::create_unique(new_tablet_schema->create_block());
585
586
0
    bool eof = false;
587
0
    do {
588
0
        auto ref_block = vectorized::Block::create_unique(base_tablet_schema->create_block());
589
0
        auto st = rowset_reader->next_block(ref_block.get());
590
0
        if (!st) {
  Branch (590:13): [True: 0, False: 0]
591
0
            if (st.is<ErrorCode::END_OF_FILE>()) {
  Branch (591:17): [True: 0, False: 0]
592
0
                if (ref_block->rows() == 0) {
  Branch (592:21): [True: 0, False: 0]
593
0
                    break;
594
0
                } else {
595
0
                    eof = true;
596
0
                }
597
0
            } else {
598
0
                return st;
599
0
            }
600
0
        }
601
602
0
        RETURN_IF_ERROR(_changer.change_block(ref_block.get(), new_block.get()));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
603
604
0
        constexpr double HOLD_BLOCK_MEMORY_RATE =
605
0
                0.66; // Reserve some memory for use by other parts of this job
606
0
        if (_mem_tracker->consumption() + new_block->allocated_bytes() > _memory_limitation ||
  Branch (606:13): [True: 0, False: 0]
607
0
            _mem_tracker->consumption() > _memory_limitation * HOLD_BLOCK_MEMORY_RATE) {
  Branch (607:13): [True: 0, False: 0]
608
0
            RETURN_IF_ERROR(create_rowset());
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
609
610
0
            if (_mem_tracker->consumption() + new_block->allocated_bytes() > _memory_limitation) {
  Branch (610:17): [True: 0, False: 0]
611
0
                return Status::Error<INVALID_ARGUMENT>(
612
0
                        "Memory limitation is too small for Schema Change. _memory_limitation={}, "
613
0
                        "new_block->allocated_bytes()={}, consumption={}",
614
0
                        _memory_limitation, new_block->allocated_bytes(),
615
0
                        _mem_tracker->consumption());
616
0
            }
617
0
        }
618
0
        _mem_tracker->consume(new_block->allocated_bytes());
619
620
        // move unique ptr
621
0
        blocks.push_back(vectorized::Block::create_unique(new_tablet_schema->create_block()));
622
0
        swap(blocks.back(), new_block);
623
0
    } while (!eof);
  Branch (623:14): [True: 0, False: 0]
624
625
0
    RETURN_IF_ERROR(create_rowset());
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
626
627
0
    if (_src_rowsets.empty()) {
  Branch (627:9): [True: 0, False: 0]
628
0
        RETURN_IF_ERROR(rowset_writer->flush());
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
629
0
    } else {
630
0
        RETURN_IF_ERROR(
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
631
0
                _external_sorting(_src_rowsets, rowset_writer, new_tablet, new_tablet_schema));
632
0
    }
633
634
0
    return Status::OK();
635
0
}
636
637
Result<RowsetSharedPtr> VBaseSchemaChangeWithSorting::_internal_sorting(
638
        const std::vector<std::unique_ptr<vectorized::Block>>& blocks, const Version& version,
639
        int64_t newest_write_timestamp, BaseTabletSPtr new_tablet, RowsetTypePB new_rowset_type,
640
0
        SegmentsOverlapPB segments_overlap, TabletSchemaSPtr new_tablet_schema) {
641
0
    uint64_t merged_rows = 0;
642
0
    MultiBlockMerger merger(new_tablet);
643
0
    RowsetWriterContext context;
644
0
    context.version = version;
645
0
    context.rowset_state = VISIBLE;
646
0
    context.segments_overlap = segments_overlap;
647
0
    context.tablet_schema = new_tablet_schema;
648
0
    context.newest_write_timestamp = newest_write_timestamp;
649
0
    context.write_type = DataWriteType::TYPE_SCHEMA_CHANGE;
650
0
    std::unique_ptr<RowsetWriter> rowset_writer;
651
    // TODO(plat1ko): Use monad op
652
0
    if (auto result = new_tablet->create_rowset_writer(context, false); !result.has_value())
  Branch (652:73): [True: 0, False: 0]
653
0
            [[unlikely]] {
654
0
        return unexpected(std::move(result).error());
655
0
    } else {
656
0
        rowset_writer = std::move(result).value();
657
0
    }
658
0
    RETURN_IF_ERROR_RESULT(merger.merge(blocks, rowset_writer.get(), &merged_rows));
Line
Count
Source
708
0
    do {                                            \
709
0
        Status _status_ = (stmt);                   \
710
0
        if (UNLIKELY(!_status_.ok())) {             \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
711
0
            return unexpected(std::move(_status_)); \
712
0
        }                                           \
713
0
    } while (false)
  Branch (713:14): [Folded - Ignored]
659
0
    _add_merged_rows(merged_rows);
660
0
    RowsetSharedPtr rowset;
661
0
    RETURN_IF_ERROR_RESULT(rowset_writer->build(rowset));
Line
Count
Source
708
0
    do {                                            \
709
0
        Status _status_ = (stmt);                   \
710
0
        if (UNLIKELY(!_status_.ok())) {             \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
711
0
            return unexpected(std::move(_status_)); \
712
0
        }                                           \
713
0
    } while (false)
  Branch (713:14): [Folded - Ignored]
662
0
    return rowset;
663
0
}
664
665
Result<RowsetSharedPtr> VLocalSchemaChangeWithSorting::_internal_sorting(
666
        const std::vector<std::unique_ptr<vectorized::Block>>& blocks, const Version& version,
667
        int64_t newest_write_timestamp, BaseTabletSPtr new_tablet, RowsetTypePB new_rowset_type,
668
0
        SegmentsOverlapPB segments_overlap, TabletSchemaSPtr new_tablet_schema) {
669
0
    uint64_t merged_rows = 0;
670
0
    MultiBlockMerger merger(new_tablet);
671
0
    RowsetWriterContext context;
672
0
    context.version = version;
673
0
    context.rowset_state = VISIBLE;
674
0
    context.segments_overlap = segments_overlap;
675
0
    context.tablet_schema = new_tablet_schema;
676
0
    context.newest_write_timestamp = newest_write_timestamp;
677
0
    context.write_type = DataWriteType::TYPE_SCHEMA_CHANGE;
678
0
    std::unique_ptr<RowsetWriter> rowset_writer;
679
    // TODO(plat1ko): Use monad op
680
0
    if (auto result = new_tablet->create_rowset_writer(context, false); !result.has_value())
  Branch (680:73): [True: 0, False: 0]
681
0
            [[unlikely]] {
682
0
        return unexpected(std::move(result).error());
683
0
    } else {
684
0
        rowset_writer = std::move(result).value();
685
0
    }
686
0
    auto guard = _local_storage_engine.pending_local_rowsets().add(context.rowset_id);
687
0
    _pending_rs_guards.push_back(std::move(guard));
688
0
    RETURN_IF_ERROR_RESULT(merger.merge(blocks, rowset_writer.get(), &merged_rows));
Line
Count
Source
708
0
    do {                                            \
709
0
        Status _status_ = (stmt);                   \
710
0
        if (UNLIKELY(!_status_.ok())) {             \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
711
0
            return unexpected(std::move(_status_)); \
712
0
        }                                           \
713
0
    } while (false)
  Branch (713:14): [Folded - Ignored]
689
0
    _add_merged_rows(merged_rows);
690
0
    RowsetSharedPtr rowset;
691
0
    RETURN_IF_ERROR_RESULT(rowset_writer->build(rowset));
Line
Count
Source
708
0
    do {                                            \
709
0
        Status _status_ = (stmt);                   \
710
0
        if (UNLIKELY(!_status_.ok())) {             \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
711
0
            return unexpected(std::move(_status_)); \
712
0
        }                                           \
713
0
    } while (false)
  Branch (713:14): [Folded - Ignored]
692
0
    return rowset;
693
0
}
694
695
Status VBaseSchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_rowsets,
696
                                                       RowsetWriter* rowset_writer,
697
                                                       BaseTabletSPtr new_tablet,
698
0
                                                       TabletSchemaSPtr new_tablet_schema) {
699
0
    std::vector<RowsetReaderSharedPtr> rs_readers;
700
0
    for (auto& rowset : src_rowsets) {
  Branch (700:23): [True: 0, False: 0]
701
0
        RowsetReaderSharedPtr rs_reader;
702
0
        RETURN_IF_ERROR(rowset->create_reader(&rs_reader));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
703
0
        rs_readers.push_back(rs_reader);
704
0
    }
705
706
0
    Merger::Statistics stats;
707
0
    RETURN_IF_ERROR(Merger::vmerge_rowsets(new_tablet, ReaderType::READER_ALTER_TABLE,
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
708
0
                                           *new_tablet_schema, rs_readers, rowset_writer, &stats));
709
0
    _add_merged_rows(stats.merged_rows);
710
0
    _add_filtered_rows(stats.filtered_rows);
711
0
    return Status::OK();
712
0
}
713
714
Status VLocalSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader,
715
                                                     RowsetWriter* rowset_writer,
716
                                                     BaseTabletSPtr new_tablet,
717
                                                     TabletSchemaSPtr base_tablet_schema,
718
0
                                                     TabletSchemaSPtr new_tablet_schema) {
719
0
    Defer defer {[&]() {
720
        // remove the intermediate rowsets generated by internal sorting
721
0
        for (auto& row_set : _src_rowsets) {
  Branch (721:28): [True: 0, False: 0]
722
0
            _local_storage_engine.add_unused_rowset(row_set);
723
0
        }
724
0
    }};
725
0
    _pending_rs_guards.clear();
726
0
    return VBaseSchemaChangeWithSorting::_inner_process(rowset_reader, rowset_writer, new_tablet,
727
0
                                                        base_tablet_schema, new_tablet_schema);
728
0
}
729
730
0
Status SchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& request) {
731
0
    if (!request.__isset.desc_tbl) {
  Branch (731:9): [True: 0, False: 0]
732
0
        return Status::Error<INVALID_ARGUMENT>(
733
0
                "desc_tbl is not set. Maybe the FE version is not equal to the BE "
734
0
                "version.");
735
0
    }
736
0
    if (_base_tablet == nullptr) {
  Branch (736:9): [True: 0, False: 0]
737
0
        return Status::Error<TABLE_NOT_FOUND>("fail to find base tablet. base_tablet={}",
738
0
                                              request.base_tablet_id);
739
0
    }
740
0
    if (_new_tablet == nullptr) {
  Branch (740:9): [True: 0, False: 0]
741
0
        return Status::Error<TABLE_NOT_FOUND>("fail to find new tablet. new_tablet={}",
742
0
                                              request.new_tablet_id);
743
0
    }
744
745
0
    LOG(INFO) << "begin to do request alter tablet: base_tablet_id=" << request.base_tablet_id
746
0
              << ", new_tablet_id=" << request.new_tablet_id
747
0
              << ", alter_version=" << request.alter_version;
748
749
    // Lock schema_change_lock util schema change info is stored in tablet header
750
0
    std::unique_lock<std::mutex> schema_change_lock(_base_tablet->get_schema_change_lock(),
751
0
                                                    std::try_to_lock);
752
0
    if (!schema_change_lock.owns_lock()) {
  Branch (752:9): [True: 0, False: 0]
753
0
        return Status::Error<TRY_LOCK_FAILED>("failed to obtain schema change lock. base_tablet={}",
754
0
                                              request.base_tablet_id);
755
0
    }
756
757
0
    Status res = _do_process_alter_tablet(request);
758
0
    LOG(INFO) << "finished alter tablet process, res=" << res;
759
0
    DBUG_EXECUTE_IF("SchemaChangeJob::process_alter_tablet.leave.sleep", { sleep(5); });
Line
Count
Source
37
0
    if (UNLIKELY(config::enable_debug_points)) {                              \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
38
0
        auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \
39
0
        if (dp) {                                                             \
  Branch (39:13): [True: 0, False: 0]
40
0
            [[maybe_unused]] auto DP_NAME = debug_point_name;                 \
41
0
            { code; }                                                         \
42
0
        }                                                                     \
43
0
    }
760
0
    return res;
761
0
}
762
763
SchemaChangeJob::SchemaChangeJob(StorageEngine& local_storage_engine,
764
                                 const TAlterTabletReqV2& request, const std::string& job_id)
765
0
        : _local_storage_engine(local_storage_engine) {
766
0
    _base_tablet = _local_storage_engine.tablet_manager()->get_tablet(request.base_tablet_id);
767
0
    _new_tablet = _local_storage_engine.tablet_manager()->get_tablet(request.new_tablet_id);
768
0
    if (_base_tablet && _new_tablet) {
  Branch (768:9): [True: 0, False: 0]
  Branch (768:25): [True: 0, False: 0]
769
0
        _base_tablet_schema = std::make_shared<TabletSchema>();
770
0
        _base_tablet_schema->update_tablet_columns(*_base_tablet->tablet_schema(), request.columns);
771
        // The request only include column info, do not include bitmap or bloomfilter index info,
772
        // So we also need to copy index info from the real base tablet
773
0
        _base_tablet_schema->update_index_info_from(*_base_tablet->tablet_schema());
774
        // During a schema change, the extracted columns of a variant should not be included in the tablet schema.
775
        // This is because the schema change for a variant needs to ignore the extracted columns.
776
        // Otherwise, the schema types in different rowsets might be inconsistent. When performing a schema change,
777
        // the complete variant is constructed by reading all the sub-columns of the variant.
778
0
        _new_tablet_schema = _new_tablet->tablet_schema()->copy_without_variant_extracted_columns();
779
0
    }
780
0
    _job_id = job_id;
781
0
}
782
783
// In the past schema change and rollup will create new tablet  and will wait for txns starting before the task to finished
784
// It will cost a lot of time to wait and the task is very difficult to understand.
785
// In alter task v2, FE will call BE to create tablet and send an alter task to BE to convert historical data.
786
// The admin should upgrade all BE and then upgrade FE.
787
// Should delete the old code after upgrade finished.
788
0
Status SchemaChangeJob::_do_process_alter_tablet(const TAlterTabletReqV2& request) {
789
0
    DBUG_EXECUTE_IF("SchemaChangeJob._do_process_alter_tablet.sleep", { sleep(10); })
Line
Count
Source
37
0
    if (UNLIKELY(config::enable_debug_points)) {                              \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
38
0
        auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \
39
0
        if (dp) {                                                             \
  Branch (39:13): [True: 0, False: 0]
40
0
            [[maybe_unused]] auto DP_NAME = debug_point_name;                 \
41
0
            { code; }                                                         \
42
0
        }                                                                     \
43
0
    }
790
0
    Status res;
791
0
    signal::tablet_id = _base_tablet->get_table_id();
792
793
    // check if tablet's state is not_ready, if it is ready, it means the tablet already finished
794
    // check whether the tablet's max continuous version == request.version
795
0
    if (_new_tablet->tablet_state() != TABLET_NOTREADY) {
  Branch (795:9): [True: 0, False: 0]
796
0
        res = _validate_alter_result(request);
797
0
        LOG(INFO) << "tablet's state=" << _new_tablet->tablet_state()
798
0
                  << " the convert job already finished, check its version"
799
0
                  << " res=" << res;
800
0
        return res;
801
0
    }
802
0
    _new_tablet->set_alter_failed(false);
803
0
    Defer defer([this] {
804
        // if tablet state is not TABLET_RUNNING when return, indicates that alter has failed.
805
0
        if (_new_tablet->tablet_state() != TABLET_RUNNING) {
  Branch (805:13): [True: 0, False: 0]
806
0
            _new_tablet->set_alter_failed(true);
807
0
        }
808
0
    });
809
810
0
    LOG(INFO) << "finish to validate alter tablet request. begin to convert data from base tablet "
811
0
                 "to new tablet"
812
0
              << " base_tablet=" << _base_tablet->tablet_id()
813
0
              << " new_tablet=" << _new_tablet->tablet_id();
814
815
0
    std::shared_lock base_migration_rlock(_base_tablet->get_migration_lock(), std::try_to_lock);
816
0
    if (!base_migration_rlock.owns_lock()) {
  Branch (816:9): [True: 0, False: 0]
817
0
        return Status::Error<TRY_LOCK_FAILED>(
818
0
                "SchemaChangeJob::_do_process_alter_tablet get lock failed");
819
0
    }
820
0
    std::shared_lock new_migration_rlock(_new_tablet->get_migration_lock(), std::try_to_lock);
821
0
    if (!new_migration_rlock.owns_lock()) {
  Branch (821:9): [True: 0, False: 0]
822
0
        return Status::Error<TRY_LOCK_FAILED>(
823
0
                "SchemaChangeJob::_do_process_alter_tablet get lock failed");
824
0
    }
825
826
0
    std::vector<Version> versions_to_be_changed;
827
0
    int64_t end_version = -1;
828
    // reader_context is stack variables, it's lifetime should keep the same
829
    // with rs_readers
830
0
    RowsetReaderContext reader_context;
831
0
    std::vector<RowSetSplits> rs_splits;
832
    // delete handlers for new tablet
833
0
    DeleteHandler delete_handler;
834
0
    std::vector<ColumnId> return_columns;
835
836
    // Use tablet schema directly from base tablet, they are the newest schema, not contain
837
    // dropped column during light weight schema change.
838
    // But the tablet schema in base tablet maybe not the latest from FE, so that if fe pass through
839
    // a tablet schema, then use request schema.
840
0
    size_t num_cols =
841
0
            request.columns.empty() ? _base_tablet_schema->num_columns() : request.columns.size();
  Branch (841:13): [True: 0, False: 0]
842
0
    return_columns.resize(num_cols);
843
0
    for (int i = 0; i < num_cols; ++i) {
  Branch (843:21): [True: 0, False: 0]
844
0
        return_columns[i] = i;
845
0
    }
846
847
0
    DBUG_EXECUTE_IF("SchemaChangeJob::_do_process_alter_tablet.block", DBUG_BLOCK);
Line
Count
Source
37
0
    if (UNLIKELY(config::enable_debug_points)) {                              \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
38
0
        auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \
39
0
        if (dp) {                                                             \
  Branch (39:13): [True: 0, False: 0]
40
0
            [[maybe_unused]] auto DP_NAME = debug_point_name;                 \
41
0
            { code; }                                                         \
  Branch (41:15): [True: 0, False: 0]
42
0
        }                                                                     \
43
0
    }
848
849
    // begin to find deltas to convert from base tablet to new tablet so that
850
    // obtain base tablet and new tablet's push lock and header write lock to prevent loading data
851
0
    {
852
0
        std::lock_guard base_tablet_lock(_base_tablet->get_push_lock());
853
0
        std::lock_guard new_tablet_lock(_new_tablet->get_push_lock());
854
0
        std::lock_guard base_tablet_wlock(_base_tablet->get_header_lock());
855
0
        SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
Line
Count
Source
31
0
    SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT(timeout, LOG(WARNING))
Line
Count
Source
41
0
    using namespace std::chrono_literals;                                               \
42
0
    auto VARNAME_LINENUM(scoped_simple_trace) = doris::MonotonicMicros();               \
43
0
    SCOPED_CLEANUP({                                                                    \
Line
Count
Source
34
0
    auto VARNAME_LINENUM(scoped_cleanup) = MakeScopedCleanup([&] { func_body });
44
0
        auto VARNAME_LINENUM(timeout_us) =                                              \
45
0
                std::chrono::duration_cast<std::chrono::microseconds>(timeout).count(); \
46
0
        auto VARNAME_LINENUM(cost_us) =                                                 \
47
0
                doris::MonotonicMicros() - VARNAME_LINENUM(scoped_simple_trace);        \
48
0
        if (VARNAME_LINENUM(cost_us) >= VARNAME_LINENUM(timeout_us)) {                  \
49
0
            stream << "Simple trace cost(us): " << VARNAME_LINENUM(cost_us);            \
50
0
        }                                                                               \
51
0
    })
856
0
        std::lock_guard<std::shared_mutex> new_tablet_wlock(_new_tablet->get_header_lock());
857
858
0
        do {
859
0
            RowsetSharedPtr max_rowset;
860
            // get history data to be converted and it will check if there is hold in base tablet
861
0
            res = _get_versions_to_be_changed(&versions_to_be_changed, &max_rowset);
862
0
            if (!res) {
  Branch (862:17): [True: 0, False: 0]
863
0
                LOG(WARNING) << "fail to get version to be changed. res=" << res;
864
0
                break;
865
0
            }
866
867
0
            DBUG_EXECUTE_IF("SchemaChangeJob.process_alter_tablet.alter_fail", {
Line
Count
Source
37
0
    if (UNLIKELY(config::enable_debug_points)) {                              \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
38
0
        auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \
39
0
        if (dp) {                                                             \
  Branch (39:13): [True: 0, False: 0]
40
0
            [[maybe_unused]] auto DP_NAME = debug_point_name;                 \
41
0
            { code; }                                                         \
42
0
        }                                                                     \
43
0
    }
868
0
                res = Status::InternalError(
869
0
                        "inject alter tablet failed. base_tablet={}, new_tablet={}",
870
0
                        request.base_tablet_id, request.new_tablet_id);
871
0
                LOG(WARNING) << "inject error. res=" << res;
872
0
                break;
873
0
            });
874
875
            // should check the max_version >= request.alter_version, if not the convert is useless
876
0
            if (max_rowset == nullptr || max_rowset->end_version() < request.alter_version) {
  Branch (876:17): [True: 0, False: 0]
  Branch (876:42): [True: 0, False: 0]
877
0
                res = Status::InternalError(
878
0
                        "base tablet's max version={} is less than request version={}",
879
0
                        (max_rowset == nullptr ? 0 : max_rowset->end_version()),
  Branch (879:26): [True: 0, False: 0]
880
0
                        request.alter_version);
881
0
                break;
882
0
            }
883
            // before calculating version_to_be_changed,
884
            // remove all data from new tablet, prevent to rewrite data(those double pushed when wait)
885
0
            LOG(INFO) << "begin to remove all data before end version from new tablet to prevent "
886
0
                         "rewrite."
887
0
                      << " new_tablet=" << _new_tablet->tablet_id()
888
0
                      << ", end_version=" << max_rowset->end_version();
889
0
            std::vector<RowsetSharedPtr> rowsets_to_delete;
890
0
            std::vector<std::pair<Version, RowsetSharedPtr>> version_rowsets;
891
0
            _new_tablet->acquire_version_and_rowsets(&version_rowsets);
892
0
            std::sort(version_rowsets.begin(), version_rowsets.end(),
893
0
                      [](const std::pair<Version, RowsetSharedPtr>& l,
894
0
                         const std::pair<Version, RowsetSharedPtr>& r) {
895
0
                          return l.first.first < r.first.first;
896
0
                      });
897
0
            for (auto& pair : version_rowsets) {
  Branch (897:29): [True: 0, False: 0]
898
0
                if (pair.first.second <= max_rowset->end_version()) {
  Branch (898:21): [True: 0, False: 0]
899
0
                    rowsets_to_delete.push_back(pair.second);
900
0
                } else if (pair.first.first <= max_rowset->end_version()) {
  Branch (900:28): [True: 0, False: 0]
901
                    // If max version is [X-10] and new tablet has version [7-9][10-12],
902
                    // we only can remove [7-9] from new tablet. If we add [X-10] to new tablet, it will has version
903
                    // cross: [X-10] [10-12].
904
                    // So, we should return OLAP_ERR_VERSION_ALREADY_MERGED for fast fail.
905
0
                    return Status::Error<VERSION_ALREADY_MERGED>(
906
0
                            "New tablet has a version {} crossing base tablet's max_version={}",
907
0
                            pair.first.to_string(), max_rowset->end_version());
908
0
                }
909
0
            }
910
0
            std::vector<RowsetSharedPtr> empty_vec;
911
0
            RETURN_IF_ERROR(_new_tablet->delete_rowsets(rowsets_to_delete, false));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
912
            // inherit cumulative_layer_point from base_tablet
913
            // check if new_tablet.ce_point > base_tablet.ce_point?
914
0
            _new_tablet->set_cumulative_layer_point(-1);
915
            // save tablet meta
916
0
            _new_tablet->save_meta();
917
0
            for (auto& rowset : rowsets_to_delete) {
  Branch (917:31): [True: 0, False: 0]
918
                // do not call rowset.remove directly, using gc thread to delete it
919
0
                _local_storage_engine.add_unused_rowset(rowset);
920
0
            }
921
922
            // init one delete handler
923
0
            for (auto& version : versions_to_be_changed) {
  Branch (923:32): [True: 0, False: 0]
924
0
                end_version = std::max(end_version, version.second);
925
0
            }
926
927
            // acquire data sources correspond to history versions
928
0
            RETURN_IF_ERROR(
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
929
0
                    _base_tablet->capture_rs_readers_unlocked(versions_to_be_changed, &rs_splits));
930
0
            if (rs_splits.empty()) {
  Branch (930:17): [True: 0, False: 0]
931
0
                res = Status::Error<ALTER_DELTA_DOES_NOT_EXISTS>(
932
0
                        "fail to acquire all data sources. version_num={}, data_source_num={}",
933
0
                        versions_to_be_changed.size(), rs_splits.size());
934
0
                break;
935
0
            }
936
0
            std::vector<RowsetMetaSharedPtr> del_preds;
937
0
            for (auto&& split : rs_splits) {
  Branch (937:31): [True: 0, False: 0]
938
0
                const auto& rs_meta = split.rs_reader->rowset()->rowset_meta();
939
0
                if (!rs_meta->has_delete_predicate() || rs_meta->start_version() > end_version) {
  Branch (939:21): [True: 0, False: 0]
  Branch (939:57): [True: 0, False: 0]
940
0
                    continue;
941
0
                }
942
0
                _base_tablet_schema->merge_dropped_columns(*rs_meta->tablet_schema());
943
0
                del_preds.push_back(rs_meta);
944
0
            }
945
0
            res = delete_handler.init(_base_tablet_schema, del_preds, end_version);
946
0
            if (!res) {
  Branch (946:17): [True: 0, False: 0]
947
0
                LOG(WARNING) << "init delete handler failed. base_tablet="
948
0
                             << _base_tablet->tablet_id() << ", end_version=" << end_version;
949
0
                break;
950
0
            }
951
952
0
            reader_context.reader_type = ReaderType::READER_ALTER_TABLE;
953
0
            reader_context.tablet_schema = _base_tablet_schema;
954
0
            reader_context.need_ordered_result = true;
955
0
            reader_context.delete_handler = &delete_handler;
956
0
            reader_context.return_columns = &return_columns;
957
0
            reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx();
958
0
            reader_context.is_unique = _base_tablet->keys_type() == UNIQUE_KEYS;
959
0
            reader_context.batch_size = ALTER_TABLE_BATCH_SIZE;
960
0
            reader_context.delete_bitmap = _base_tablet->tablet_meta()->delete_bitmap();
961
0
            reader_context.version = Version(0, end_version);
962
0
            for (auto& rs_split : rs_splits) {
  Branch (962:33): [True: 0, False: 0]
963
0
                res = rs_split.rs_reader->init(&reader_context);
964
0
                if (!res) {
  Branch (964:21): [True: 0, False: 0]
965
0
                    LOG(WARNING) << "failed to init rowset reader: " << _base_tablet->tablet_id();
966
0
                    break;
967
0
                }
968
0
            }
969
0
        } while (false);
  Branch (969:18): [Folded - Ignored]
970
0
    }
971
972
0
    do {
973
0
        if (!res) {
  Branch (973:13): [True: 0, False: 0]
974
0
            break;
975
0
        }
976
0
        SchemaChangeParams sc_params;
977
978
0
        RETURN_IF_ERROR(
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
979
0
                DescriptorTbl::create(&sc_params.pool, request.desc_tbl, &sc_params.desc_tbl));
980
0
        sc_params.ref_rowset_readers.reserve(rs_splits.size());
981
0
        for (RowSetSplits& split : rs_splits) {
  Branch (981:34): [True: 0, False: 0]
982
0
            sc_params.ref_rowset_readers.emplace_back(split.rs_reader);
983
0
        }
984
0
        sc_params.delete_handler = &delete_handler;
985
0
        sc_params.be_exec_version = request.be_exec_version;
986
0
        DCHECK(request.__isset.alter_tablet_type);
987
0
        switch (request.alter_tablet_type) {
  Branch (987:17): [True: 0, False: 0]
988
0
        case TAlterTabletType::SCHEMA_CHANGE:
  Branch (988:9): [True: 0, False: 0]
989
0
            sc_params.alter_tablet_type = AlterTabletType::SCHEMA_CHANGE;
990
0
            break;
991
0
        case TAlterTabletType::ROLLUP:
  Branch (991:9): [True: 0, False: 0]
992
0
            sc_params.alter_tablet_type = AlterTabletType::ROLLUP;
993
0
            break;
994
0
        case TAlterTabletType::MIGRATION:
  Branch (994:9): [True: 0, False: 0]
995
0
            sc_params.alter_tablet_type = AlterTabletType::MIGRATION;
996
0
            break;
997
0
        }
998
0
        if (request.__isset.materialized_view_params) {
  Branch (998:13): [True: 0, False: 0]
999
0
            for (auto item : request.materialized_view_params) {
  Branch (999:28): [True: 0, False: 0]
1000
0
                AlterMaterializedViewParam mv_param;
1001
0
                mv_param.column_name = item.column_name;
1002
1003
0
                if (item.__isset.mv_expr) {
  Branch (1003:21): [True: 0, False: 0]
1004
0
                    mv_param.expr = std::make_shared<TExpr>(item.mv_expr);
1005
0
                }
1006
0
                sc_params.materialized_params_map.insert(
1007
0
                        std::make_pair(to_lower(item.column_name), mv_param));
1008
0
            }
1009
0
        }
1010
0
        {
1011
0
            std::lock_guard<std::shared_mutex> wrlock(_mutex);
1012
0
            _tablet_ids_in_converting.insert(_new_tablet->tablet_id());
1013
0
        }
1014
0
        int64_t real_alter_version = 0;
1015
0
        sc_params.enable_unique_key_merge_on_write =
1016
0
                _new_tablet->enable_unique_key_merge_on_write();
1017
0
        res = _convert_historical_rowsets(sc_params, &real_alter_version);
1018
0
        {
1019
0
            std::lock_guard<std::shared_mutex> wrlock(_mutex);
1020
0
            _tablet_ids_in_converting.erase(_new_tablet->tablet_id());
1021
0
        }
1022
0
        if (!res) {
  Branch (1022:13): [True: 0, False: 0]
1023
0
            break;
1024
0
        }
1025
1026
0
        DCHECK_GE(real_alter_version, request.alter_version);
1027
1028
0
        if (_new_tablet->keys_type() == UNIQUE_KEYS &&
  Branch (1028:13): [True: 0, False: 0]
1029
0
            _new_tablet->enable_unique_key_merge_on_write()) {
  Branch (1029:13): [True: 0, False: 0]
1030
0
            res = _calc_delete_bitmap_for_mow_table(real_alter_version);
1031
0
            if (!res) {
  Branch (1031:17): [True: 0, False: 0]
1032
0
                break;
1033
0
            }
1034
0
        } else {
1035
            // set state to ready
1036
0
            std::lock_guard<std::shared_mutex> new_wlock(_new_tablet->get_header_lock());
1037
0
            SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
Line
Count
Source
31
0
    SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT(timeout, LOG(WARNING))
Line
Count
Source
41
0
    using namespace std::chrono_literals;                                               \
42
0
    auto VARNAME_LINENUM(scoped_simple_trace) = doris::MonotonicMicros();               \
43
0
    SCOPED_CLEANUP({                                                                    \
Line
Count
Source
34
0
    auto VARNAME_LINENUM(scoped_cleanup) = MakeScopedCleanup([&] { func_body });
44
0
        auto VARNAME_LINENUM(timeout_us) =                                              \
45
0
                std::chrono::duration_cast<std::chrono::microseconds>(timeout).count(); \
46
0
        auto VARNAME_LINENUM(cost_us) =                                                 \
47
0
                doris::MonotonicMicros() - VARNAME_LINENUM(scoped_simple_trace);        \
48
0
        if (VARNAME_LINENUM(cost_us) >= VARNAME_LINENUM(timeout_us)) {                  \
49
0
            stream << "Simple trace cost(us): " << VARNAME_LINENUM(cost_us);            \
50
0
        }                                                                               \
51
0
    })
1038
0
            res = _new_tablet->set_tablet_state(TabletState::TABLET_RUNNING);
1039
0
            if (!res) {
  Branch (1039:17): [True: 0, False: 0]
1040
0
                break;
1041
0
            }
1042
0
            _new_tablet->save_meta();
1043
0
        }
1044
0
    } while (false);
  Branch (1044:14): [Folded - Ignored]
1045
1046
0
    if (res) {
  Branch (1046:9): [True: 0, False: 0]
1047
        // _validate_alter_result should be outside the above while loop.
1048
        // to avoid requiring the header lock twice.
1049
0
        res = _validate_alter_result(request);
1050
0
    }
1051
1052
    // if failed convert history data, then just remove the new tablet
1053
0
    if (!res) {
  Branch (1053:9): [True: 0, False: 0]
1054
0
        LOG(WARNING) << "failed to alter tablet. base_tablet=" << _base_tablet->tablet_id()
1055
0
                     << ", drop new_tablet=" << _new_tablet->tablet_id();
1056
        // do not drop the new tablet and its data. GC thread will
1057
0
    }
1058
1059
0
    return res;
1060
0
}
1061
1062
0
bool SchemaChangeJob::tablet_in_converting(int64_t tablet_id) {
1063
0
    std::shared_lock rdlock(_mutex);
1064
0
    return _tablet_ids_in_converting.find(tablet_id) != _tablet_ids_in_converting.end();
1065
0
}
1066
1067
Status SchemaChangeJob::_get_versions_to_be_changed(std::vector<Version>* versions_to_be_changed,
1068
0
                                                    RowsetSharedPtr* max_rowset) {
1069
0
    RowsetSharedPtr rowset = _base_tablet->get_rowset_with_max_version();
1070
0
    if (rowset == nullptr) {
  Branch (1070:9): [True: 0, False: 0]
1071
0
        return Status::Error<ALTER_DELTA_DOES_NOT_EXISTS>("Tablet has no version. base_tablet={}",
1072
0
                                                          _base_tablet->tablet_id());
1073
0
    }
1074
0
    *max_rowset = rowset;
1075
0
    *versions_to_be_changed = DORIS_TRY(_base_tablet->capture_consistent_versions_unlocked(
Line
Count
Source
716
0
    ({                                           \
717
0
        auto&& res = (stmt);                     \
718
0
        using T = std::decay_t<decltype(res)>;   \
719
0
        if (!res.has_value()) [[unlikely]] {     \
  Branch (719:13): [True: 0, False: 0]
720
0
            return std::forward<T>(res).error(); \
721
0
        }                                        \
722
0
        std::forward<T>(res).value();            \
723
0
    });
1076
0
            Version(0, rowset->version().second), {}));
1077
0
    return Status::OK();
1078
0
}
1079
1080
// The `real_alter_version` parameter indicates that the version of [0-real_alter_version] is
1081
// converted from a base tablet, only used for the mow table now.
1082
Status SchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParams& sc_params,
1083
0
                                                    int64_t* real_alter_version) {
1084
0
    LOG(INFO) << "begin to convert historical rowsets for new_tablet from base_tablet."
1085
0
              << " base_tablet=" << _base_tablet->tablet_id()
1086
0
              << ", new_tablet=" << _new_tablet->tablet_id() << ", job_id=" << _job_id;
1087
1088
    // find end version
1089
0
    int32_t end_version = -1;
1090
0
    for (const auto& ref_rowset_reader : sc_params.ref_rowset_readers) {
  Branch (1090:40): [True: 0, False: 0]
1091
0
        if (ref_rowset_reader->version().second > end_version) {
  Branch (1091:13): [True: 0, False: 0]
1092
0
            end_version = ref_rowset_reader->version().second;
1093
0
        }
1094
0
    }
1095
1096
    // Add filter information in change, and filter column information will be set in parse_request
1097
    // And filter some data every time the row block changes
1098
0
    BlockChanger changer(_new_tablet_schema, *sc_params.desc_tbl);
1099
1100
0
    bool sc_sorting = false;
1101
0
    bool sc_directly = false;
1102
1103
    // a.Parse the Alter request and convert it into an internal representation
1104
0
    Status res = parse_request(sc_params, _base_tablet_schema.get(), _new_tablet_schema.get(),
1105
0
                               &changer, &sc_sorting, &sc_directly);
1106
0
    LOG(INFO) << "schema change type, sc_sorting: " << sc_sorting
1107
0
              << ", sc_directly: " << sc_directly << ", base_tablet=" << _base_tablet->tablet_id()
1108
0
              << ", new_tablet=" << _new_tablet->tablet_id();
1109
1110
0
    auto process_alter_exit = [&]() -> Status {
1111
0
        {
1112
            // save tablet meta here because rowset meta is not saved during add rowset
1113
0
            std::lock_guard new_wlock(_new_tablet->get_header_lock());
1114
0
            SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
Line
Count
Source
31
0
    SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT(timeout, LOG(WARNING))
Line
Count
Source
41
0
    using namespace std::chrono_literals;                                               \
42
0
    auto VARNAME_LINENUM(scoped_simple_trace) = doris::MonotonicMicros();               \
43
0
    SCOPED_CLEANUP({                                                                    \
Line
Count
Source
34
0
    auto VARNAME_LINENUM(scoped_cleanup) = MakeScopedCleanup([&] { func_body });
44
0
        auto VARNAME_LINENUM(timeout_us) =                                              \
45
0
                std::chrono::duration_cast<std::chrono::microseconds>(timeout).count(); \
46
0
        auto VARNAME_LINENUM(cost_us) =                                                 \
47
0
                doris::MonotonicMicros() - VARNAME_LINENUM(scoped_simple_trace);        \
48
0
        if (VARNAME_LINENUM(cost_us) >= VARNAME_LINENUM(timeout_us)) {                  \
49
0
            stream << "Simple trace cost(us): " << VARNAME_LINENUM(cost_us);            \
50
0
        }                                                                               \
51
0
    })
1115
0
            _new_tablet->save_meta();
1116
0
        }
1117
0
        if (res) {
  Branch (1117:13): [True: 0, False: 0]
1118
0
            Version test_version(0, end_version);
1119
0
            res = _new_tablet->check_version_integrity(test_version);
1120
0
        }
1121
1122
0
        LOG(INFO) << "finish converting rowsets for new_tablet from base_tablet. "
1123
0
                  << "base_tablet=" << _base_tablet->tablet_id()
1124
0
                  << ", new_tablet=" << _new_tablet->tablet_id();
1125
0
        return res;
1126
0
    };
1127
1128
0
    if (!res) {
  Branch (1128:9): [True: 0, False: 0]
1129
0
        LOG(WARNING) << "failed to parse the request. res=" << res;
1130
0
        return process_alter_exit();
1131
0
    }
1132
1133
0
    if (!sc_sorting && !sc_directly && sc_params.alter_tablet_type == AlterTabletType::ROLLUP) {
  Branch (1133:9): [True: 0, False: 0]
  Branch (1133:24): [True: 0, False: 0]
  Branch (1133:40): [True: 0, False: 0]
1134
0
        res = Status::Error<SCHEMA_SCHEMA_INVALID>(
1135
0
                "Don't support to add materialized view by linked schema change");
1136
0
        return process_alter_exit();
1137
0
    }
1138
1139
    // b. Generate historical data converter
1140
0
    auto sc_procedure = _get_sc_procedure(
1141
0
            changer, sc_sorting, sc_directly,
1142
0
            _local_storage_engine.memory_limitation_bytes_per_thread_for_schema_change());
1143
1144
0
    DBUG_EXECUTE_IF("SchemaChangeJob::_convert_historical_rowsets.block", DBUG_BLOCK);
Line
Count
Source
37
0
    if (UNLIKELY(config::enable_debug_points)) {                              \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
38
0
        auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \
39
0
        if (dp) {                                                             \
  Branch (39:13): [True: 0, False: 0]
40
0
            [[maybe_unused]] auto DP_NAME = debug_point_name;                 \
41
0
            { code; }                                                         \
  Branch (41:15): [True: 0, False: 0]
42
0
        }                                                                     \
43
0
    }
1145
1146
    // c.Convert historical data
1147
0
    bool have_failure_rowset = false;
1148
0
    for (const auto& rs_reader : sc_params.ref_rowset_readers) {
  Branch (1148:32): [True: 0, False: 0]
1149
        // set status for monitor
1150
        // As long as there is a new_table as running, ref table is set as running
1151
        // NOTE If the first sub_table fails first, it will continue to go as normal here
1152
        // When tablet create new rowset writer, it may change rowset type, in this case
1153
        // linked schema change will not be used.
1154
0
        RowsetWriterContext context;
1155
0
        context.version = rs_reader->version();
1156
0
        context.rowset_state = VISIBLE;
1157
0
        context.segments_overlap = rs_reader->rowset()->rowset_meta()->segments_overlap();
1158
0
        context.tablet_schema = _new_tablet_schema;
1159
0
        context.newest_write_timestamp = rs_reader->newest_write_timestamp();
1160
1161
0
        if (!rs_reader->rowset()->is_local()) {
  Branch (1161:13): [True: 0, False: 0]
1162
0
            context.storage_resource =
1163
0
                    *DORIS_TRY(rs_reader->rowset()->rowset_meta()->remote_storage_resource());
Line
Count
Source
716
0
    ({                                           \
717
0
        auto&& res = (stmt);                     \
718
0
        using T = std::decay_t<decltype(res)>;   \
719
0
        if (!res.has_value()) [[unlikely]] {     \
  Branch (719:13): [True: 0, False: 0]
720
0
            return std::forward<T>(res).error(); \
721
0
        }                                        \
722
0
        std::forward<T>(res).value();            \
723
0
    });
1164
0
        }
1165
1166
0
        context.write_type = DataWriteType::TYPE_SCHEMA_CHANGE;
1167
0
        auto result = _new_tablet->create_rowset_writer(context, false);
1168
0
        if (!result.has_value()) {
  Branch (1168:13): [True: 0, False: 0]
1169
0
            res = Status::Error<ROWSET_BUILDER_INIT>("create_rowset_writer failed, reason={}",
1170
0
                                                     result.error().to_string());
1171
0
            return process_alter_exit();
1172
0
        }
1173
0
        auto rowset_writer = std::move(result).value();
1174
0
        auto pending_rs_guard = _local_storage_engine.add_pending_rowset(context);
1175
1176
0
        if (res = sc_procedure->process(rs_reader, rowset_writer.get(), _new_tablet, _base_tablet,
1177
0
                                        _base_tablet_schema, _new_tablet_schema);
1178
0
            !res) {
  Branch (1178:13): [True: 0, False: 0]
1179
0
            LOG(WARNING) << "failed to process the version."
1180
0
                         << " version=" << rs_reader->version().first << "-"
1181
0
                         << rs_reader->version().second << ", " << res.to_string();
1182
0
            return process_alter_exit();
1183
0
        }
1184
        // Add the new version of the data to the header
1185
        // In order to prevent the occurrence of deadlock, we must first lock the old table, and then lock the new table
1186
0
        std::lock_guard lock(_new_tablet->get_push_lock());
1187
0
        RowsetSharedPtr new_rowset;
1188
0
        if (!(res = rowset_writer->build(new_rowset)).ok()) {
  Branch (1188:13): [True: 0, False: 0]
1189
0
            LOG(WARNING) << "failed to build rowset, exit alter process";
1190
0
            return process_alter_exit();
1191
0
        }
1192
0
        res = _new_tablet->add_rowset(new_rowset);
1193
0
        if (res.is<PUSH_VERSION_ALREADY_EXIST>()) {
  Branch (1193:13): [True: 0, False: 0]
1194
0
            LOG(WARNING) << "version already exist, version revert occurred. "
1195
0
                         << "tablet=" << _new_tablet->tablet_id() << ", version='"
1196
0
                         << rs_reader->version().first << "-" << rs_reader->version().second;
1197
0
            _local_storage_engine.add_unused_rowset(new_rowset);
1198
0
            have_failure_rowset = true;
1199
0
            res = Status::OK();
1200
0
        } else if (!res) {
  Branch (1200:20): [True: 0, False: 0]
1201
0
            LOG(WARNING) << "failed to register new version. "
1202
0
                         << " tablet=" << _new_tablet->tablet_id()
1203
0
                         << ", version=" << rs_reader->version().first << "-"
1204
0
                         << rs_reader->version().second;
1205
0
            _local_storage_engine.add_unused_rowset(new_rowset);
1206
0
            return process_alter_exit();
1207
0
        } else {
1208
0
            VLOG_NOTICE << "register new version. tablet=" << _new_tablet->tablet_id()
Line
Count
Source
42
0
#define VLOG_NOTICE VLOG(3)
1209
0
                        << ", version=" << rs_reader->version().first << "-"
1210
0
                        << rs_reader->version().second;
1211
0
        }
1212
0
        if (!have_failure_rowset) {
  Branch (1212:13): [True: 0, False: 0]
1213
0
            *real_alter_version = rs_reader->version().second;
1214
0
        }
1215
1216
0
        VLOG_TRACE << "succeed to convert a history version."
Line
Count
Source
40
0
#define VLOG_TRACE VLOG(10)
1217
0
                   << " version=" << rs_reader->version().first << "-"
1218
0
                   << rs_reader->version().second;
1219
0
    }
1220
1221
    // XXX:The SchemaChange state should not be canceled at this time, because the new Delta has to be converted to the old and new Schema version
1222
0
    return process_alter_exit();
1223
0
}
1224
1225
static const std::string WHERE_SIGN_LOWER = to_lower("__DORIS_WHERE_SIGN__");
1226
1227
// @static
1228
// Analyze the mapping of the column and the mapping of the filter key
1229
Status SchemaChangeJob::parse_request(const SchemaChangeParams& sc_params,
1230
                                      TabletSchema* base_tablet_schema,
1231
                                      TabletSchema* new_tablet_schema, BlockChanger* changer,
1232
0
                                      bool* sc_sorting, bool* sc_directly) {
1233
0
    changer->set_type(sc_params.alter_tablet_type);
1234
0
    changer->set_compatible_version(sc_params.be_exec_version);
1235
1236
0
    const std::unordered_map<std::string, AlterMaterializedViewParam>& materialized_function_map =
1237
0
            sc_params.materialized_params_map;
1238
0
    DescriptorTbl desc_tbl = *sc_params.desc_tbl;
1239
1240
    // set column mapping
1241
0
    for (int i = 0, new_schema_size = new_tablet_schema->num_columns(); i < new_schema_size; ++i) {
  Branch (1241:73): [True: 0, False: 0]
1242
0
        const TabletColumn& new_column = new_tablet_schema->column(i);
1243
0
        const std::string& column_name_lower = to_lower(new_column.name());
1244
0
        ColumnMapping* column_mapping = changer->get_mutable_column_mapping(i);
1245
0
        column_mapping->new_column = &new_column;
1246
1247
0
        column_mapping->ref_column_idx = base_tablet_schema->field_index(new_column.name());
1248
1249
0
        if (materialized_function_map.find(column_name_lower) != materialized_function_map.end()) {
  Branch (1249:13): [True: 0, False: 0]
1250
0
            auto mv_param = materialized_function_map.find(column_name_lower)->second;
1251
0
            column_mapping->expr = mv_param.expr;
1252
0
            if (column_mapping->expr != nullptr) {
  Branch (1252:17): [True: 0, False: 0]
1253
0
                continue;
1254
0
            }
1255
0
        }
1256
1257
0
        if (column_mapping->ref_column_idx >= 0) {
  Branch (1257:13): [True: 0, False: 0]
1258
0
            continue;
1259
0
        }
1260
1261
0
        if (sc_params.alter_tablet_type == ROLLUP) {
  Branch (1261:13): [True: 0, False: 0]
1262
0
            std::string materialized_function_map_str;
1263
0
            for (auto str : materialized_function_map) {
  Branch (1263:27): [True: 0, False: 0]
1264
0
                if (!materialized_function_map_str.empty()) {
  Branch (1264:21): [True: 0, False: 0]
1265
0
                    materialized_function_map_str += ',';
1266
0
                }
1267
0
                materialized_function_map_str += str.first;
1268
0
            }
1269
0
            return Status::InternalError(
1270
0
                    "referenced column was missing. [column={},materialized_function_map={}]",
1271
0
                    new_column.name(), materialized_function_map_str);
1272
0
        }
1273
1274
0
        if (new_column.name().find("__doris_shadow_") == 0) {
  Branch (1274:13): [True: 0, False: 0]
1275
            // Should delete in the future, just a protection for bug.
1276
0
            LOG(INFO) << "a shadow column is encountered " << new_column.name();
1277
0
            return Status::InternalError("failed due to operate on shadow column");
1278
0
        }
1279
        // Newly added column go here
1280
0
        column_mapping->ref_column_idx = -1;
1281
1282
0
        if (i < base_tablet_schema->num_short_key_columns()) {
  Branch (1282:13): [True: 0, False: 0]
1283
0
            *sc_directly = true;
1284
0
        }
1285
0
        RETURN_IF_ERROR(
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
1286
0
                _init_column_mapping(column_mapping, new_column, new_column.default_value()));
1287
1288
0
        LOG(INFO) << "A column with default value will be added after schema changing. "
1289
0
                  << "column=" << new_column.name()
1290
0
                  << ", default_value=" << new_column.default_value();
1291
0
    }
1292
1293
0
    if (materialized_function_map.contains(WHERE_SIGN_LOWER)) {
  Branch (1293:9): [True: 0, False: 0]
1294
0
        changer->set_where_expr(materialized_function_map.find(WHERE_SIGN_LOWER)->second.expr);
1295
0
    }
1296
1297
    // If the reference sequence of the Key column is out of order, it needs to be reordered
1298
0
    int num_default_value = 0;
1299
1300
0
    for (int i = 0, new_schema_size = new_tablet_schema->num_key_columns(); i < new_schema_size;
  Branch (1300:77): [True: 0, False: 0]
1301
0
         ++i) {
1302
0
        ColumnMapping* column_mapping = changer->get_mutable_column_mapping(i);
1303
1304
0
        if (!column_mapping->has_reference()) {
  Branch (1304:13): [True: 0, False: 0]
1305
0
            num_default_value++;
1306
0
            continue;
1307
0
        }
1308
1309
0
        if (column_mapping->ref_column_idx != i - num_default_value) {
  Branch (1309:13): [True: 0, False: 0]
1310
0
            *sc_sorting = true;
1311
0
            return Status::OK();
1312
0
        }
1313
0
    }
1314
1315
0
    if (base_tablet_schema->keys_type() != new_tablet_schema->keys_type()) {
  Branch (1315:9): [True: 0, False: 0]
1316
        // only when base table is dup and mv is agg
1317
        // the rollup job must be reagg.
1318
0
        *sc_sorting = true;
1319
0
        return Status::OK();
1320
0
    }
1321
1322
    // If the sort of key has not been changed but the new keys num is less then base's,
1323
    // the new table should be re agg.
1324
    // So we also need to set sc_sorting = true.
1325
    // A, B, C are keys(sort keys), D is value
1326
    // followings need resort:
1327
    //      old keys:    A   B   C   D
1328
    //      new keys:    A   B
1329
0
    if (new_tablet_schema->keys_type() != KeysType::DUP_KEYS &&
  Branch (1329:9): [True: 0, False: 0]
1330
0
        new_tablet_schema->num_key_columns() < base_tablet_schema->num_key_columns()) {
  Branch (1330:9): [True: 0, False: 0]
1331
        // this is a table with aggregate key type, and num of key columns in new schema
1332
        // is less, which means the data in new tablet should be more aggregated.
1333
        // so we use sorting schema change to sort and merge the data.
1334
0
        *sc_sorting = true;
1335
0
        return Status::OK();
1336
0
    }
1337
1338
0
    if (sc_params.alter_tablet_type == ROLLUP) {
  Branch (1338:9): [True: 0, False: 0]
1339
0
        *sc_directly = true;
1340
0
        return Status::OK();
1341
0
    }
1342
1343
0
    if (sc_params.enable_unique_key_merge_on_write &&
  Branch (1343:9): [True: 0, False: 0]
1344
0
        new_tablet_schema->num_key_columns() > base_tablet_schema->num_key_columns()) {
  Branch (1344:9): [True: 0, False: 0]
1345
0
        *sc_directly = true;
1346
0
        return Status::OK();
1347
0
    }
1348
1349
0
    if (base_tablet_schema->num_short_key_columns() != new_tablet_schema->num_short_key_columns()) {
  Branch (1349:9): [True: 0, False: 0]
1350
        // the number of short_keys changed, can't do linked schema change
1351
0
        *sc_directly = true;
1352
0
        return Status::OK();
1353
0
    }
1354
1355
0
    if (!sc_params.delete_handler->empty()) {
  Branch (1355:9): [True: 0, False: 0]
1356
        // there exists delete condition in header, can't do linked schema change
1357
0
        *sc_directly = true;
1358
0
        return Status::OK();
1359
0
    }
1360
1361
    // if new tablet enable row store, or new tablet has different row store columns
1362
0
    if ((!base_tablet_schema->exist_column(BeConsts::ROW_STORE_COL) &&
  Branch (1362:10): [True: 0, False: 0]
1363
0
         new_tablet_schema->exist_column(BeConsts::ROW_STORE_COL)) ||
  Branch (1363:10): [True: 0, False: 0]
1364
0
        !std::equal(new_tablet_schema->row_columns_uids().begin(),
  Branch (1364:9): [True: 0, False: 0]
1365
0
                    new_tablet_schema->row_columns_uids().end(),
1366
0
                    base_tablet_schema->row_columns_uids().begin(),
1367
0
                    base_tablet_schema->row_columns_uids().end())) {
1368
0
        *sc_directly = true;
1369
0
    }
1370
1371
0
    for (size_t i = 0; i < new_tablet_schema->num_columns(); ++i) {
  Branch (1371:24): [True: 0, False: 0]
1372
0
        ColumnMapping* column_mapping = changer->get_mutable_column_mapping(i);
1373
0
        if (column_mapping->expr != nullptr) {
  Branch (1373:13): [True: 0, False: 0]
1374
0
            *sc_directly = true;
1375
0
            return Status::OK();
1376
0
        } else if (column_mapping->ref_column_idx >= 0) {
  Branch (1376:20): [True: 0, False: 0]
1377
            // index changed
1378
0
            if (vectorized::schema_util::has_schema_index_diff(
  Branch (1378:17): [True: 0, False: 0]
1379
0
                        new_tablet_schema, base_tablet_schema, i, column_mapping->ref_column_idx)) {
1380
0
                *sc_directly = true;
1381
0
                return Status::OK();
1382
0
            }
1383
0
        }
1384
0
    }
1385
1386
    // if rs_reader has remote files, link schema change is not supported,
1387
    // use directly schema change instead.
1388
0
    if (!(*sc_directly) && !(*sc_sorting)) {
  Branch (1388:9): [True: 0, False: 0]
  Branch (1388:28): [True: 0, False: 0]
1389
        // check has remote rowset
1390
        // work for cloud and cold storage
1391
0
        for (const auto& rs_reader : sc_params.ref_rowset_readers) {
  Branch (1391:36): [True: 0, False: 0]
1392
0
            if (!rs_reader->rowset()->is_local()) {
  Branch (1392:17): [True: 0, False: 0]
1393
0
                *sc_directly = true;
1394
0
                break;
1395
0
            }
1396
0
        }
1397
0
    }
1398
1399
0
    return Status::OK();
1400
0
}
1401
1402
Status SchemaChangeJob::_init_column_mapping(ColumnMapping* column_mapping,
1403
                                             const TabletColumn& column_schema,
1404
0
                                             const std::string& value) {
1405
0
    if (auto field = WrapperField::create(column_schema); field.has_value()) {
  Branch (1405:59): [True: 0, False: 0]
1406
0
        column_mapping->default_value = field.value();
1407
0
    } else {
1408
0
        return field.error();
1409
0
    }
1410
1411
0
    if (column_schema.is_nullable() && value.length() == 0) {
  Branch (1411:9): [True: 0, False: 0]
  Branch (1411:40): [True: 0, False: 0]
1412
0
        column_mapping->default_value->set_null();
1413
0
    } else {
1414
0
        RETURN_IF_ERROR(column_mapping->default_value->from_string(value, column_schema.precision(),
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
1415
0
                                                                   column_schema.frac()));
1416
0
    }
1417
1418
0
    return Status::OK();
1419
0
}
1420
1421
0
Status SchemaChangeJob::_validate_alter_result(const TAlterTabletReqV2& request) {
1422
0
    Version max_continuous_version = {-1, 0};
1423
0
    _new_tablet->max_continuous_version_from_beginning(&max_continuous_version);
1424
0
    LOG(INFO) << "find max continuous version of tablet=" << _new_tablet->tablet_id()
1425
0
              << ", start_version=" << max_continuous_version.first
1426
0
              << ", end_version=" << max_continuous_version.second;
1427
0
    if (max_continuous_version.second < request.alter_version) {
  Branch (1427:9): [True: 0, False: 0]
1428
0
        return Status::InternalError("result version={} is less than request version={}",
1429
0
                                     max_continuous_version.second, request.alter_version);
1430
0
    }
1431
1432
0
    std::vector<std::pair<Version, RowsetSharedPtr>> version_rowsets;
1433
0
    {
1434
0
        std::shared_lock rdlock(_new_tablet->get_header_lock());
1435
0
        _new_tablet->acquire_version_and_rowsets(&version_rowsets);
1436
0
    }
1437
0
    for (auto& pair : version_rowsets) {
  Branch (1437:21): [True: 0, False: 0]
1438
0
        RowsetSharedPtr rowset = pair.second;
1439
0
        if (!rowset->check_file_exist()) {
  Branch (1439:13): [True: 0, False: 0]
1440
0
            return Status::Error<NOT_FOUND>(
1441
0
                    "SchemaChangeJob::_validate_alter_result meet invalid rowset");
1442
0
        }
1443
0
    }
1444
0
    return Status::OK();
1445
0
}
1446
1447
// For unique with merge-on-write table, should process delete bitmap here.
1448
// 1. During double write, the newly imported rowsets does not calculate
1449
// delete bitmap and publish successfully.
1450
// 2. After conversion, calculate delete bitmap for the rowsets imported
1451
// during double write. During this period, new data can still be imported
1452
// witout calculating delete bitmap and publish successfully.
1453
// 3. Block the new publish, calculate the delete bitmap of the
1454
// incremental rowsets.
1455
// 4. Switch the tablet status to TABLET_RUNNING. The newly imported
1456
// data will calculate delete bitmap.
1457
0
Status SchemaChangeJob::_calc_delete_bitmap_for_mow_table(int64_t alter_version) {
1458
0
    DBUG_EXECUTE_IF("SchemaChangeJob._calc_delete_bitmap_for_mow_table.random_failed", {
Line
Count
Source
37
0
    if (UNLIKELY(config::enable_debug_points)) {                              \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
38
0
        auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \
39
0
        if (dp) {                                                             \
  Branch (39:13): [True: 0, False: 0]
40
0
            [[maybe_unused]] auto DP_NAME = debug_point_name;                 \
41
0
            { code; }                                                         \
  Branch (41:15): [True: 0, False: 0]
42
0
        }                                                                     \
43
0
    }
1459
0
        if (rand() % 100 < (100 * dp->param("percent", 0.1))) {
1460
0
            LOG_WARNING("SchemaChangeJob._calc_delete_bitmap_for_mow_table.random_failed");
1461
0
            return Status::InternalError("debug schema change calc delete bitmap random failed");
1462
0
        }
1463
0
    });
1464
1465
    // can't do compaction when calc delete bitmap, if the rowset being calculated does
1466
    // a compaction, it may cause the delete bitmap to be missed.
1467
0
    std::lock_guard base_compaction_lock(_new_tablet->get_base_compaction_lock());
1468
0
    std::lock_guard cumu_compaction_lock(_new_tablet->get_cumulative_compaction_lock());
1469
1470
    // step 2
1471
0
    int64_t max_version = _new_tablet->max_version().second;
1472
0
    std::vector<RowsetSharedPtr> rowsets;
1473
0
    if (alter_version < max_version) {
  Branch (1473:9): [True: 0, False: 0]
1474
0
        LOG(INFO) << "alter table for unique with merge-on-write, calculate delete bitmap of "
1475
0
                  << "double write rowsets for version: " << alter_version + 1 << "-" << max_version
1476
0
                  << " new_tablet=" << _new_tablet->tablet_id();
1477
0
        std::shared_lock rlock(_new_tablet->get_header_lock());
1478
0
        auto ret = DORIS_TRY(_new_tablet->capture_consistent_rowsets_unlocked(
Line
Count
Source
716
0
    ({                                           \
717
0
        auto&& res = (stmt);                     \
718
0
        using T = std::decay_t<decltype(res)>;   \
719
0
        if (!res.has_value()) [[unlikely]] {     \
  Branch (719:13): [True: 0, False: 0]
720
0
            return std::forward<T>(res).error(); \
721
0
        }                                        \
722
0
        std::forward<T>(res).value();            \
723
0
    });
1479
0
                {alter_version + 1, max_version}, CaptureRowsetOps {}));
1480
0
        rowsets = std::move(ret.rowsets);
1481
0
    }
1482
0
    for (auto rowset_ptr : rowsets) {
  Branch (1482:26): [True: 0, False: 0]
1483
0
        std::lock_guard rwlock(_new_tablet->get_rowset_update_lock());
1484
0
        std::shared_lock rlock(_new_tablet->get_header_lock());
1485
0
        RETURN_IF_ERROR(Tablet::update_delete_bitmap_without_lock(_new_tablet, rowset_ptr));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
1486
0
    }
1487
1488
    // step 3
1489
0
    std::lock_guard rwlock(_new_tablet->get_rowset_update_lock());
1490
0
    std::lock_guard new_wlock(_new_tablet->get_header_lock());
1491
0
    SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
Line
Count
Source
31
0
    SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT(timeout, LOG(WARNING))
Line
Count
Source
41
0
    using namespace std::chrono_literals;                                               \
42
0
    auto VARNAME_LINENUM(scoped_simple_trace) = doris::MonotonicMicros();               \
43
0
    SCOPED_CLEANUP({                                                                    \
Line
Count
Source
34
0
    auto VARNAME_LINENUM(scoped_cleanup) = MakeScopedCleanup([&] { func_body });
44
0
        auto VARNAME_LINENUM(timeout_us) =                                              \
45
0
                std::chrono::duration_cast<std::chrono::microseconds>(timeout).count(); \
46
0
        auto VARNAME_LINENUM(cost_us) =                                                 \
47
0
                doris::MonotonicMicros() - VARNAME_LINENUM(scoped_simple_trace);        \
48
0
        if (VARNAME_LINENUM(cost_us) >= VARNAME_LINENUM(timeout_us)) {                  \
49
0
            stream << "Simple trace cost(us): " << VARNAME_LINENUM(cost_us);            \
50
0
        }                                                                               \
51
0
    })
1492
0
    int64_t new_max_version = _new_tablet->max_version_unlocked();
1493
0
    rowsets.clear();
1494
0
    if (max_version < new_max_version) {
  Branch (1494:9): [True: 0, False: 0]
1495
0
        LOG(INFO) << "alter table for unique with merge-on-write, calculate delete bitmap of "
1496
0
                  << "incremental rowsets for version: " << max_version + 1 << "-"
1497
0
                  << new_max_version << " new_tablet=" << _new_tablet->tablet_id();
1498
0
        auto ret = DORIS_TRY(_new_tablet->capture_consistent_rowsets_unlocked(
Line
Count
Source
716
0
    ({                                           \
717
0
        auto&& res = (stmt);                     \
718
0
        using T = std::decay_t<decltype(res)>;   \
719
0
        if (!res.has_value()) [[unlikely]] {     \
  Branch (719:13): [True: 0, False: 0]
720
0
            return std::forward<T>(res).error(); \
721
0
        }                                        \
722
0
        std::forward<T>(res).value();            \
723
0
    });
1499
0
                {max_version + 1, new_max_version}, CaptureRowsetOps {}));
1500
0
        rowsets = std::move(ret.rowsets);
1501
0
    }
1502
0
    for (auto&& rowset_ptr : rowsets) {
  Branch (1502:28): [True: 0, False: 0]
1503
0
        RETURN_IF_ERROR(Tablet::update_delete_bitmap_without_lock(_new_tablet, rowset_ptr));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
1504
0
    }
1505
    // step 4
1506
0
    RETURN_IF_ERROR(_new_tablet->set_tablet_state(TabletState::TABLET_RUNNING));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
1507
0
    _new_tablet->save_meta();
1508
0
    return Status::OK();
1509
0
}
1510
1511
} // namespace doris