Coverage Report

Created: 2026-03-12 17:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_schema_change_job.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_schema_change_job.h"
19
20
#include <gen_cpp/Types_types.h>
21
#include <gen_cpp/cloud.pb.h>
22
23
#include <algorithm>
24
#include <chrono>
25
#include <memory>
26
#include <mutex>
27
#include <random>
28
#include <thread>
29
30
#include "cloud/cloud_meta_mgr.h"
31
#include "cloud/cloud_tablet_mgr.h"
32
#include "common/status.h"
33
#include "service/backend_options.h"
34
#include "storage/delete/delete_handler.h"
35
#include "storage/index/inverted/inverted_index_desc.h"
36
#include "storage/olap_define.h"
37
#include "storage/rowset/beta_rowset.h"
38
#include "storage/rowset/rowset.h"
39
#include "storage/rowset/rowset_factory.h"
40
#include "storage/storage_engine.h"
41
#include "storage/tablet/tablet.h"
42
#include "storage/tablet/tablet_fwd.h"
43
#include "storage/tablet/tablet_meta.h"
44
#include "util/debug_points.h"
45
46
namespace doris {
47
using namespace ErrorCode;
48
49
static constexpr int ALTER_TABLE_BATCH_SIZE = 4096;
50
static constexpr int SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID = -2;
51
52
std::unique_ptr<SchemaChange> get_sc_procedure(const BlockChanger& changer, bool sc_sorting,
53
11.7k
                                               int64_t mem_limit) {
54
11.7k
    if (sc_sorting) {
55
8.67k
        return std::make_unique<VBaseSchemaChangeWithSorting>(changer, mem_limit);
56
8.67k
    }
57
    // else sc_directly
58
3.04k
    return std::make_unique<VSchemaChangeDirectly>(changer);
59
11.7k
}
60
61
CloudSchemaChangeJob::CloudSchemaChangeJob(CloudStorageEngine& cloud_storage_engine,
62
                                           std::string job_id, int64_t expiration)
63
11.9k
        : _cloud_storage_engine(cloud_storage_engine),
64
11.9k
          _job_id(std::move(job_id)),
65
11.9k
          _expiration(expiration) {
66
11.9k
    _initiator = boost::uuids::hash_value(UUIDGenerator::instance()->next_uuid()) &
67
11.9k
                 std::numeric_limits<int64_t>::max();
68
11.9k
}
69
70
11.9k
CloudSchemaChangeJob::~CloudSchemaChangeJob() = default;
71
72
11.9k
Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& request) {
73
11.9k
    DBUG_EXECUTE_IF("CloudSchemaChangeJob::process_alter_tablet.block", DBUG_BLOCK);
74
    // new tablet has to exist
75
11.9k
    _new_tablet = DORIS_TRY(_cloud_storage_engine.tablet_mgr().get_tablet(request.new_tablet_id));
76
11.9k
    if (_new_tablet->tablet_state() == TABLET_RUNNING) {
77
190
        LOG(INFO) << "schema change job has already finished. base_tablet_id="
78
190
                  << request.base_tablet_id << ", new_tablet_id=" << request.new_tablet_id
79
190
                  << ", alter_version=" << request.alter_version << ", job_id=" << _job_id;
80
190
        return Status::OK();
81
190
    }
82
83
11.7k
    _base_tablet = DORIS_TRY(_cloud_storage_engine.tablet_mgr().get_tablet(request.base_tablet_id));
84
85
11.7k
    static constexpr long TRY_LOCK_TIMEOUT = 30;
86
11.7k
    std::unique_lock schema_change_lock(_base_tablet->get_schema_change_lock(), std::defer_lock);
87
11.7k
    bool owns_lock = schema_change_lock.try_lock_for(std::chrono::seconds(TRY_LOCK_TIMEOUT));
88
89
11.7k
    _new_tablet->set_alter_failed(false);
90
11.7k
    Defer defer([this] {
91
        // if tablet state is not TABLET_RUNNING when return, indicates that alter has failed.
92
11.2k
        if (_new_tablet->tablet_state() != TABLET_RUNNING) {
93
82
            _new_tablet->set_alter_failed(true);
94
82
        }
95
11.2k
    });
96
97
11.7k
    if (!owns_lock) {
98
0
        LOG(WARNING) << "Failed to obtain schema change lock, there might be inverted index being "
99
0
                        "built on base_tablet="
100
0
                     << request.base_tablet_id;
101
0
        return Status::Error<TRY_LOCK_FAILED>(
102
0
                "Failed to obtain schema change lock, there might be inverted index being "
103
0
                "built on base_tablet=",
104
0
                request.base_tablet_id);
105
0
    }
106
    // MUST sync rowsets before capturing rowset readers and building DeleteHandler
107
11.7k
    SyncOptions options;
108
11.7k
    options.query_version = request.alter_version;
109
11.7k
    RETURN_IF_ERROR(_base_tablet->sync_rowsets(options));
110
    // ATTN: Only convert rowsets of version larger than 1, MUST let the new tablet cache have rowset [0-1]
111
11.7k
    _output_cumulative_point = _base_tablet->cumulative_layer_point();
112
11.7k
    std::vector<RowSetSplits> rs_splits;
113
11.7k
    int64_t base_max_version = _base_tablet->max_version_unlocked();
114
11.7k
    cloud::TabletJobInfoPB job;
115
11.7k
    auto* idx = job.mutable_idx();
116
11.7k
    idx->set_tablet_id(_base_tablet->tablet_id());
117
11.7k
    idx->set_table_id(_base_tablet->table_id());
118
11.7k
    idx->set_index_id(_base_tablet->index_id());
119
11.7k
    idx->set_partition_id(_base_tablet->partition_id());
120
11.7k
    auto* sc_job = job.mutable_schema_change();
121
11.7k
    sc_job->set_id(_job_id);
122
11.7k
    sc_job->set_initiator(BackendOptions::get_localhost() + ':' +
123
11.7k
                          std::to_string(config::heartbeat_service_port));
124
11.7k
    sc_job->set_alter_version(base_max_version);
125
11.7k
    auto* new_tablet_idx = sc_job->mutable_new_tablet_idx();
126
11.7k
    new_tablet_idx->set_tablet_id(_new_tablet->tablet_id());
127
11.7k
    new_tablet_idx->set_table_id(_new_tablet->table_id());
128
11.7k
    new_tablet_idx->set_index_id(_new_tablet->index_id());
129
11.7k
    new_tablet_idx->set_partition_id(_new_tablet->partition_id());
130
11.7k
    cloud::StartTabletJobResponse start_resp;
131
11.7k
    auto st = _cloud_storage_engine.meta_mgr().prepare_tablet_job(job, &start_resp);
132
11.7k
    if (!st.ok()) {
133
0
        if (start_resp.status().code() == cloud::JOB_ALREADY_SUCCESS) {
134
0
            st = _new_tablet->sync_rowsets();
135
0
            if (!st.ok()) {
136
0
                LOG_WARNING("failed to sync new tablet")
137
0
                        .tag("tablet_id", _new_tablet->tablet_id())
138
0
                        .error(st);
139
0
            }
140
0
            return Status::OK();
141
0
        }
142
0
        return st;
143
0
    }
144
11.7k
    DBUG_EXECUTE_IF("CloudSchemaChangeJob::process_alter_tablet.alter_fail", {
145
11.7k
        auto res =
146
11.7k
                Status::InternalError("inject alter tablet failed. base_tablet={}, new_tablet={}",
147
11.7k
                                      request.base_tablet_id, request.new_tablet_id);
148
11.7k
        LOG(WARNING) << "inject error. res=" << res;
149
11.7k
        return res;
150
11.7k
    });
151
11.7k
    if (request.alter_version > 1) {
152
        // [0-1] is a placeholder rowset, no need to convert
153
5.56k
        RETURN_IF_ERROR(_base_tablet->capture_rs_readers({2, start_resp.alter_version()},
154
5.56k
                                                         &rs_splits,
155
5.56k
                                                         {.skip_missing_versions = false,
156
5.56k
                                                          .enable_prefer_cached_rowset = false,
157
5.56k
                                                          .query_freshness_tolerance_ms = -1}));
158
5.56k
    }
159
11.7k
    Defer defer2 {[&]() {
160
11.3k
        _new_tablet->set_alter_version(-1);
161
11.3k
        _base_tablet->set_alter_version(-1);
162
11.3k
    }};
163
11.7k
    _new_tablet->set_alter_version(start_resp.alter_version());
164
11.7k
    _base_tablet->set_alter_version(start_resp.alter_version());
165
11.7k
    LOG(INFO) << "Begin to alter tablet. base_tablet_id=" << request.base_tablet_id
166
11.7k
              << ", new_tablet_id=" << request.new_tablet_id
167
11.7k
              << ", alter_version=" << start_resp.alter_version() << ", job_id=" << _job_id;
168
11.7k
    sc_job->set_alter_version(start_resp.alter_version());
169
170
    // FIXME(cyx): Should trigger compaction on base_tablet if there are too many rowsets to convert.
171
172
    // Create a new tablet schema, should merge with dropped columns in light weight schema change
173
11.7k
    _base_tablet_schema = std::make_shared<TabletSchema>();
174
11.7k
    _base_tablet_schema->update_tablet_columns(*_base_tablet->tablet_schema(), request.columns);
175
11.7k
    _new_tablet_schema = _new_tablet->tablet_schema();
176
177
11.7k
    std::vector<ColumnId> return_columns;
178
11.7k
    return_columns.resize(_base_tablet_schema->num_columns());
179
11.7k
    std::iota(return_columns.begin(), return_columns.end(), 0);
180
181
    // delete handlers to filter out deleted rows
182
11.7k
    DeleteHandler delete_handler;
183
11.7k
    std::vector<RowsetMetaSharedPtr> delete_predicates;
184
11.7k
    for (auto& split : rs_splits) {
185
9.20k
        auto& rs_meta = split.rs_reader->rowset()->rowset_meta();
186
9.20k
        if (rs_meta->has_delete_predicate()) {
187
54
            _base_tablet_schema->merge_dropped_columns(*rs_meta->tablet_schema());
188
54
            delete_predicates.push_back(rs_meta);
189
54
        }
190
9.20k
    }
191
11.7k
    RETURN_IF_ERROR(delete_handler.init(_base_tablet_schema, delete_predicates,
192
11.7k
                                        start_resp.alter_version()));
193
194
    // reader_context is stack variables, it's lifetime MUST keep the same with rs_readers
195
11.7k
    RowsetReaderContext reader_context;
196
11.7k
    reader_context.reader_type = ReaderType::READER_ALTER_TABLE;
197
11.7k
    reader_context.tablet_schema = _base_tablet_schema;
198
11.7k
    reader_context.need_ordered_result = true;
199
11.7k
    reader_context.delete_handler = &delete_handler;
200
11.7k
    reader_context.return_columns = &return_columns;
201
11.7k
    reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx();
202
11.7k
    reader_context.is_unique = _base_tablet->keys_type() == UNIQUE_KEYS;
203
11.7k
    reader_context.batch_size = ALTER_TABLE_BATCH_SIZE;
204
11.7k
    reader_context.delete_bitmap = _base_tablet->tablet_meta()->delete_bitmap_ptr();
205
11.7k
    reader_context.version = Version(0, start_resp.alter_version());
206
11.7k
    std::vector<uint32_t> cluster_key_idxes;
207
11.7k
    if (!_base_tablet_schema->cluster_key_uids().empty()) {
208
698
        for (const auto& uid : _base_tablet_schema->cluster_key_uids()) {
209
698
            cluster_key_idxes.emplace_back(_base_tablet_schema->field_index(uid));
210
698
        }
211
132
        reader_context.read_orderby_key_columns = &cluster_key_idxes;
212
132
        reader_context.is_unique = false;
213
132
        reader_context.sequence_id_idx = -1;
214
132
    }
215
216
11.7k
    for (auto& split : rs_splits) {
217
9.19k
        RETURN_IF_ERROR(split.rs_reader->init(&reader_context));
218
9.19k
    }
219
220
11.7k
    SchemaChangeParams sc_params;
221
222
    // cache schema change output to file cache
223
11.7k
    std::vector<RowsetSharedPtr> rowsets;
224
11.7k
    rowsets.resize(rs_splits.size());
225
11.7k
    std::transform(rs_splits.begin(), rs_splits.end(), rowsets.begin(),
226
11.7k
                   [](RowSetSplits& split) { return split.rs_reader->rowset(); });
227
11.7k
    sc_params.output_to_file_cache = _should_cache_sc_output(rowsets);
228
11.7k
    if (request.__isset.query_globals && request.__isset.query_options) {
229
11.7k
        sc_params.runtime_state =
230
11.7k
                std::make_shared<RuntimeState>(request.query_options, request.query_globals);
231
11.7k
    } else {
232
        // for old version request compatibility
233
2
        sc_params.runtime_state = std::make_shared<RuntimeState>();
234
2
    }
235
236
11.7k
    RETURN_IF_ERROR(DescriptorTbl::create(&sc_params.pool, request.desc_tbl, &sc_params.desc_tbl));
237
11.7k
    sc_params.ref_rowset_readers.reserve(rs_splits.size());
238
11.7k
    for (RowSetSplits& split : rs_splits) {
239
9.20k
        sc_params.ref_rowset_readers.emplace_back(std::move(split.rs_reader));
240
9.20k
    }
241
11.7k
    sc_params.delete_handler = &delete_handler;
242
11.7k
    sc_params.be_exec_version = request.be_exec_version;
243
11.7k
    DCHECK(request.__isset.alter_tablet_type);
244
11.7k
    switch (request.alter_tablet_type) {
245
3.11k
    case TAlterTabletType::SCHEMA_CHANGE:
246
3.11k
        sc_params.alter_tablet_type = AlterTabletType::SCHEMA_CHANGE;
247
3.11k
        break;
248
8.47k
    case TAlterTabletType::ROLLUP:
249
8.47k
        sc_params.alter_tablet_type = AlterTabletType::ROLLUP;
250
8.47k
        break;
251
0
    case TAlterTabletType::MIGRATION:
252
0
        sc_params.alter_tablet_type = AlterTabletType::MIGRATION;
253
0
        break;
254
11.7k
    }
255
11.6k
    sc_params.vault_id = request.storage_vault_id;
256
11.6k
    if (!request.__isset.materialized_view_params) {
257
4.39k
        return _convert_historical_rowsets(sc_params, job);
258
4.39k
    }
259
27.5k
    for (auto item : request.materialized_view_params) {
260
27.5k
        AlterMaterializedViewParam mv_param;
261
27.5k
        mv_param.column_name = item.column_name;
262
        /*
263
         * origin_column_name is always be set now,
264
         * but origin_column_name may be not set in some materialized view function. eg:count(1)
265
        */
266
27.5k
        if (item.__isset.origin_column_name) {
267
0
            mv_param.origin_column_name = item.origin_column_name;
268
0
        }
269
270
27.8k
        if (item.__isset.mv_expr) {
271
27.8k
            mv_param.expr = std::make_shared<TExpr>(item.mv_expr);
272
27.8k
        }
273
27.5k
        sc_params.materialized_params_map.insert(
274
27.5k
                std::make_pair(to_lower(item.column_name), mv_param));
275
27.5k
    }
276
7.21k
    sc_params.enable_unique_key_merge_on_write = _new_tablet->enable_unique_key_merge_on_write();
277
7.21k
    return _convert_historical_rowsets(sc_params, job);
278
11.6k
}
279
280
Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParams& sc_params,
281
11.6k
                                                         cloud::TabletJobInfoPB& job) {
282
11.6k
    LOG(INFO) << "Begin to convert historical rowsets for new_tablet from base_tablet. base_tablet="
283
11.6k
              << _base_tablet->tablet_id() << ", new_tablet=" << _new_tablet->tablet_id()
284
11.6k
              << ", job_id=" << _job_id;
285
286
    // Add filter information in change, and filter column information will be set in _parse_request
287
    // And filter some data every time the row block changes
288
11.6k
    BlockChanger changer(_new_tablet->tablet_schema(), *sc_params.desc_tbl,
289
11.6k
                         sc_params.runtime_state);
290
291
11.6k
    bool sc_sorting = false;
292
11.6k
    bool sc_directly = false;
293
294
    // 1. Parse the Alter request and convert it into an internal representation
295
11.6k
    RETURN_IF_ERROR(SchemaChangeJob::parse_request(sc_params, _base_tablet_schema.get(),
296
11.6k
                                                   _new_tablet_schema.get(), &changer, &sc_sorting,
297
11.6k
                                                   &sc_directly));
298
11.6k
    if (!sc_sorting && !sc_directly && sc_params.alter_tablet_type == AlterTabletType::ROLLUP) {
299
0
        LOG(INFO) << "Don't support to add materialized view by linked schema change";
300
0
        return Status::InternalError(
301
0
                "Don't support to add materialized view by linked schema change");
302
0
    }
303
304
11.6k
    LOG(INFO) << "schema change type, sc_sorting: " << sc_sorting
305
11.6k
              << ", sc_directly: " << sc_directly << ", base_tablet=" << _base_tablet->tablet_id()
306
11.6k
              << ", new_tablet=" << _new_tablet->tablet_id();
307
308
    // 2. Generate historical data converter
309
11.6k
    auto sc_procedure = get_sc_procedure(
310
11.6k
            changer, sc_sorting,
311
11.6k
            _cloud_storage_engine.memory_limitation_bytes_per_thread_for_schema_change());
312
313
11.6k
    DBUG_EXECUTE_IF("CloudSchemaChangeJob::_convert_historical_rowsets.block", DBUG_BLOCK);
314
315
    // 3. Convert historical data
316
11.6k
    bool already_exist_any_version = false;
317
11.6k
    for (const auto& rs_reader : sc_params.ref_rowset_readers) {
318
9.05k
        VLOG_TRACE << "Begin to convert a history rowset. version=" << rs_reader->version();
319
320
9.05k
        RowsetWriterContext context;
321
9.05k
        context.txn_id = rs_reader->rowset()->txn_id();
322
9.05k
        context.txn_expiration = _expiration;
323
9.05k
        context.version = rs_reader->version();
324
9.05k
        context.rowset_state = VISIBLE;
325
9.05k
        context.segments_overlap = rs_reader->rowset()->rowset_meta()->segments_overlap();
326
9.05k
        context.tablet_schema = _new_tablet->tablet_schema();
327
9.05k
        context.newest_write_timestamp = rs_reader->newest_write_timestamp();
328
9.05k
        context.storage_resource = _cloud_storage_engine.get_storage_resource(sc_params.vault_id);
329
9.05k
        context.job_id = _job_id;
330
9.05k
        context.write_file_cache = sc_params.output_to_file_cache;
331
9.05k
        context.tablet = _new_tablet;
332
9.05k
        if (!context.storage_resource) {
333
0
            return Status::InternalError("vault id not found, maybe not sync, vault id {}",
334
0
                                         sc_params.vault_id);
335
0
        }
336
337
9.05k
        context.write_type = DataWriteType::TYPE_SCHEMA_CHANGE;
338
        // TODO if support VerticalSegmentWriter, also need to handle cluster key primary key index
339
9.05k
        bool vertical = false;
340
9.05k
        if (sc_sorting && !_new_tablet->tablet_schema()->cluster_key_uids().empty()) {
341
            // see VBaseSchemaChangeWithSorting::_external_sorting
342
0
            vertical = true;
343
0
        }
344
9.05k
        auto rowset_writer = DORIS_TRY(_new_tablet->create_rowset_writer(context, vertical));
345
346
9.05k
        RowsetMetaSharedPtr existed_rs_meta;
347
9.05k
        auto st = _cloud_storage_engine.meta_mgr().prepare_rowset(*rowset_writer->rowset_meta(),
348
9.05k
                                                                  _job_id, &existed_rs_meta);
349
9.05k
        if (!st.ok()) {
350
1
            if (st.is<ALREADY_EXIST>()) {
351
1
                LOG(INFO) << "Rowset " << rs_reader->version() << " has already existed in tablet "
352
1
                          << _new_tablet->tablet_id();
353
                // Add already committed rowset to _output_rowsets.
354
1
                DCHECK(existed_rs_meta != nullptr);
355
1
                RowsetSharedPtr rowset;
356
                // schema is nullptr implies using RowsetMeta.tablet_schema
357
1
                RETURN_IF_ERROR(
358
1
                        RowsetFactory::create_rowset(nullptr, "", existed_rs_meta, &rowset));
359
1
                _output_rowsets.push_back(std::move(rowset));
360
1
                already_exist_any_version = true;
361
1
                continue;
362
1
            } else {
363
0
                return st;
364
0
            }
365
1
        }
366
367
9.05k
        st = sc_procedure->process(rs_reader, rowset_writer.get(), _new_tablet, _base_tablet,
368
9.05k
                                   _base_tablet_schema, _new_tablet_schema);
369
9.05k
        if (!st.ok()) {
370
82
            return Status::InternalError(
371
82
                    "failed to process schema change on rowset, version=[{}-{}], status={}",
372
82
                    rs_reader->version().first, rs_reader->version().second, st.to_string());
373
82
        }
374
375
8.97k
        RowsetSharedPtr new_rowset;
376
8.97k
        st = rowset_writer->build(new_rowset);
377
8.97k
        if (!st.ok()) {
378
0
            return Status::InternalError("failed to build rowset, version=[{}-{}] status={}",
379
0
                                         rs_reader->version().first, rs_reader->version().second,
380
0
                                         st.to_string());
381
0
        }
382
383
8.97k
        st = _cloud_storage_engine.meta_mgr().commit_rowset(*rowset_writer->rowset_meta(), _job_id,
384
8.97k
                                                            &existed_rs_meta);
385
8.97k
        if (!st.ok()) {
386
0
            if (st.is<ALREADY_EXIST>()) {
387
0
                LOG(INFO) << "Rowset " << rs_reader->version() << " has already existed in tablet "
388
0
                          << _new_tablet->tablet_id();
389
                // Add already committed rowset to _output_rowsets.
390
0
                DCHECK(existed_rs_meta != nullptr);
391
0
                RowsetSharedPtr rowset;
392
                // schema is nullptr implies using RowsetMeta.tablet_schema
393
0
                RETURN_IF_ERROR(
394
0
                        RowsetFactory::create_rowset(nullptr, "", existed_rs_meta, &rowset));
395
0
                _output_rowsets.push_back(std::move(rowset));
396
0
                continue;
397
0
            } else {
398
0
                return st;
399
0
            }
400
0
        }
401
8.97k
        _output_rowsets.push_back(std::move(new_rowset));
402
403
8.97k
        VLOG_TRACE << "Successfully convert a history version " << rs_reader->version();
404
8.97k
    }
405
11.5k
    auto* sc_job = job.mutable_schema_change();
406
11.5k
    if (!sc_params.ref_rowset_readers.empty()) {
407
5.47k
        int64_t num_output_rows = 0;
408
5.47k
        int64_t size_output_rowsets = 0;
409
5.47k
        int64_t num_output_segments = 0;
410
5.47k
        int64_t index_size_output_rowsets = 0;
411
5.47k
        int64_t segment_size_output_rowsets = 0;
412
8.82k
        for (auto& rs : _output_rowsets) {
413
8.82k
            sc_job->add_txn_ids(rs->txn_id());
414
8.82k
            sc_job->add_output_versions(rs->end_version());
415
8.82k
            num_output_rows += rs->num_rows();
416
8.82k
            size_output_rowsets += rs->total_disk_size();
417
8.82k
            num_output_segments += rs->num_segments();
418
8.82k
            index_size_output_rowsets += rs->index_disk_size();
419
8.82k
            segment_size_output_rowsets += rs->data_disk_size();
420
8.82k
        }
421
5.47k
        sc_job->set_num_output_rows(num_output_rows);
422
5.47k
        sc_job->set_size_output_rowsets(size_output_rowsets);
423
5.47k
        sc_job->set_num_output_segments(num_output_segments);
424
5.47k
        sc_job->set_num_output_rowsets(_output_rowsets.size());
425
5.47k
        sc_job->set_index_size_output_rowsets(index_size_output_rowsets);
426
5.47k
        sc_job->set_segment_size_output_rowsets(segment_size_output_rowsets);
427
5.47k
    }
428
11.5k
    _output_cumulative_point = std::min(_output_cumulative_point, sc_job->alter_version() + 1);
429
11.5k
    sc_job->set_output_cumulative_point(_output_cumulative_point);
430
431
11.5k
    DBUG_EXECUTE_IF("CloudSchemaChangeJob.process_alter_tablet.sleep", DBUG_BLOCK);
432
    // process delete bitmap if the table is MOW
433
11.5k
    bool has_stop_token {false};
434
11.5k
    bool should_clear_stop_token {true};
435
11.5k
    Defer defer {[&]() {
436
11.4k
        if (has_stop_token) {
437
1.18k
            static_cast<void>(_cloud_storage_engine.unregister_compaction_stop_token(
438
1.18k
                    _new_tablet, should_clear_stop_token));
439
1.18k
        }
440
11.4k
    }};
441
11.5k
    if (_new_tablet->enable_unique_key_merge_on_write()) {
442
1.17k
        has_stop_token = true;
443
        // If there are historical versions of rowsets, we need to recalculate their delete
444
        // bitmaps, otherwise we will miss the delete bitmaps of incremental rowsets
445
1.17k
        int64_t start_calc_delete_bitmap_version =
446
                // [0-1] is a placeholder rowset, start from 2.
447
1.17k
                already_exist_any_version ? 2 : sc_job->alter_version() + 1;
448
1.17k
        RETURN_IF_ERROR(_process_delete_bitmap(sc_job->alter_version(),
449
1.17k
                                               start_calc_delete_bitmap_version, _initiator,
450
1.17k
                                               sc_params.vault_id));
451
1.17k
        sc_job->set_delete_bitmap_lock_initiator(_initiator);
452
1.17k
    }
453
454
11.5k
    cloud::FinishTabletJobResponse finish_resp;
455
11.5k
    DBUG_EXECUTE_IF("CloudSchemaChangeJob::_convert_historical_rowsets.test_conflict", {
456
11.5k
        std::srand(static_cast<unsigned int>(std::time(nullptr)));
457
11.5k
        int random_value = std::rand() % 100;
458
11.5k
        if (random_value < 20) {
459
11.5k
            return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>("test txn conflict");
460
11.5k
        }
461
11.5k
    });
462
11.5k
    DBUG_EXECUTE_IF("CloudSchemaChangeJob::_convert_historical_rowsets.fail.before.commit_job", {
463
11.5k
        LOG_INFO("inject retryable error before commit sc job, tablet={}",
464
11.5k
                 _new_tablet->tablet_id());
465
11.5k
        return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>("injected retryable error");
466
11.5k
    });
467
11.5k
    DBUG_EXECUTE_IF("CloudSchemaChangeJob::_convert_historical_rowsets.before.commit_job",
468
11.5k
                    DBUG_BLOCK);
469
11.5k
    auto st = _cloud_storage_engine.meta_mgr().commit_tablet_job(job, &finish_resp);
470
11.5k
    if (!st.ok()) {
471
0
        if (finish_resp.status().code() == cloud::JOB_ALREADY_SUCCESS) {
472
0
            st = _new_tablet->sync_rowsets();
473
0
            if (!st.ok()) {
474
0
                LOG_WARNING("failed to sync new tablet")
475
0
                        .tag("tablet_id", _new_tablet->tablet_id())
476
0
                        .error(st);
477
0
            }
478
0
            return Status::OK();
479
0
        }
480
0
        return st;
481
11.5k
    } else {
482
11.5k
        should_clear_stop_token = false;
483
11.5k
    }
484
11.5k
    const auto& stats = finish_resp.stats();
485
11.5k
    {
486
        // to prevent the converted historical rowsets be replaced by rowsets written on new tablet
487
        // during double write phase by `CloudMetaMgr::sync_tablet_rowsets` in another thread
488
11.5k
        std::unique_lock lock {_new_tablet->get_sync_meta_lock()};
489
11.5k
        std::unique_lock wlock(_new_tablet->get_header_lock());
490
11.5k
        _new_tablet->add_rowsets(std::move(_output_rowsets), true, wlock, false);
491
11.5k
        _new_tablet->set_cumulative_layer_point(_output_cumulative_point);
492
11.5k
        _new_tablet->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(),
493
11.5k
                                             stats.num_rows(), stats.data_size());
494
11.5k
        RETURN_IF_ERROR(_new_tablet->set_tablet_state(TABLET_RUNNING));
495
11.5k
    }
496
11.5k
    return Status::OK();
497
11.5k
}
498
499
Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
500
                                                    int64_t start_calc_delete_bitmap_version,
501
                                                    int64_t initiator,
502
1.17k
                                                    const std::string& vault_id) {
503
1.17k
    LOG_INFO("process mow table")
504
1.17k
            .tag("new_tablet_id", _new_tablet->tablet_id())
505
1.17k
            .tag("out_rowset_size", _output_rowsets.size())
506
1.17k
            .tag("start_calc_delete_bitmap_version", start_calc_delete_bitmap_version)
507
1.17k
            .tag("alter_version", alter_version);
508
1.17k
    RETURN_IF_ERROR(_cloud_storage_engine.register_compaction_stop_token(_new_tablet, initiator));
509
1.17k
    TabletMetaSharedPtr tmp_meta = std::make_shared<TabletMeta>(*(_new_tablet->tablet_meta()));
510
1.17k
    tmp_meta->delete_bitmap().delete_bitmap.clear();
511
    // Keep only version [0-1] rowset, other rowsets will be added in _output_rowsets
512
1.17k
    auto& rs_metas = tmp_meta->all_mutable_rs_metas();
513
2.37k
    for (auto it = rs_metas.begin(); it != rs_metas.end();) {
514
1.19k
        const auto& rs_meta = it->second;
515
1.19k
        if (rs_meta->version().first == 0 && rs_meta->version().second == 1) {
516
1.18k
            ++it;
517
1.18k
        } else {
518
15
            it = rs_metas.erase(it);
519
15
        }
520
1.19k
    }
521
522
1.17k
    std::shared_ptr<CloudTablet> tmp_tablet =
523
1.17k
            std::make_shared<CloudTablet>(_cloud_storage_engine, tmp_meta);
524
1.17k
    {
525
1.17k
        std::unique_lock wlock(tmp_tablet->get_header_lock());
526
1.17k
        tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false);
527
        // Set alter version to let the tmp_tablet can fill hole rowset greater than alter_version
528
1.17k
        tmp_tablet->set_alter_version(alter_version);
529
1.17k
    }
530
531
    // step 1, process incremental rowset without delete bitmap update lock
532
1.17k
    RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get()));
533
1.17k
    int64_t max_version = tmp_tablet->max_version().second;
534
1.17k
    LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
535
1.17k
              << "incremental rowsets without lock, version: " << start_calc_delete_bitmap_version
536
1.17k
              << "-" << max_version << " new_table_id: " << _new_tablet->tablet_id();
537
1.17k
    if (max_version >= start_calc_delete_bitmap_version) {
538
8
        auto ret = DORIS_TRY(tmp_tablet->capture_consistent_rowsets_unlocked(
539
8
                {start_calc_delete_bitmap_version, max_version}, CaptureRowsetOps {}));
540
8
        DBUG_EXECUTE_IF("CloudSchemaChangeJob::_process_delete_bitmap.after.capture_without_lock",
541
8
                        DBUG_BLOCK);
542
8
        {
543
8
            std::unique_lock wlock(tmp_tablet->get_header_lock());
544
8
            tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false);
545
8
        }
546
17
        for (auto rowset : ret.rowsets) {
547
17
            RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset));
548
17
        }
549
8
    }
550
551
1.17k
    DBUG_EXECUTE_IF("CloudSchemaChangeJob::_process_delete_bitmap.before_new_inc.block",
552
1.17k
                    DBUG_BLOCK);
553
554
    // step 2, process incremental rowset with delete bitmap update lock
555
1.17k
    RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().get_delete_bitmap_update_lock(
556
1.17k
            *_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator));
557
1.17k
    RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get()));
558
1.17k
    int64_t new_max_version = tmp_tablet->max_version().second;
559
1.17k
    LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
560
1.17k
              << "incremental rowsets with lock, version: " << max_version + 1 << "-"
561
1.17k
              << new_max_version << " new_tablet_id: " << _new_tablet->tablet_id();
562
1.17k
    if (new_max_version > max_version) {
563
2
        auto ret = DORIS_TRY(tmp_tablet->capture_consistent_rowsets_unlocked(
564
2
                {max_version + 1, new_max_version}, CaptureRowsetOps {}));
565
2
        {
566
2
            std::unique_lock wlock(tmp_tablet->get_header_lock());
567
2
            tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false);
568
2
        }
569
9
        for (auto rowset : ret.rowsets) {
570
9
            RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset));
571
9
        }
572
2
    }
573
574
1.17k
    DBUG_EXECUTE_IF("CloudSchemaChangeJob::_process_delete_bitmap.inject_sleep", {
575
1.17k
        auto p = dp->param("percent", 0.01);
576
1.17k
        auto sleep_time = dp->param("sleep", 100);
577
1.17k
        std::mt19937 gen {std::random_device {}()};
578
1.17k
        std::bernoulli_distribution inject_fault {p};
579
1.17k
        if (inject_fault(gen)) {
580
1.17k
            LOG_INFO("injection sleep for {} seconds, tablet_id={}, sc job_id={}", sleep_time,
581
1.17k
                     _new_tablet->tablet_id(), _job_id);
582
1.17k
            std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
583
1.17k
        }
584
1.17k
    });
585
586
1.17k
    auto& delete_bitmap = tmp_tablet->tablet_meta()->delete_bitmap();
587
1.17k
    auto storage_resource = _cloud_storage_engine.get_storage_resource(vault_id);
588
    // step4, store delete bitmap
589
1.17k
    RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().update_delete_bitmap(
590
1.17k
            *_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator, &delete_bitmap,
591
1.17k
            &delete_bitmap, "", storage_resource, config::delete_bitmap_store_write_version));
592
593
1.17k
    _new_tablet->tablet_meta()->delete_bitmap() = delete_bitmap;
594
1.17k
    return Status::OK();
595
1.17k
}
596
597
82
void CloudSchemaChangeJob::clean_up_on_failure() {
598
82
    if (_new_tablet == nullptr) {
599
0
        return;
600
0
    }
601
82
    if (_new_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
602
82
        _new_tablet->enable_unique_key_merge_on_write()) {
603
35
        _cloud_storage_engine.meta_mgr().remove_delete_bitmap_update_lock(
604
35
                _new_tablet->table_id(), SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, _initiator,
605
35
                _new_tablet->tablet_id());
606
35
    }
607
131
    for (const auto& output_rs : _output_rowsets) {
608
131
        if (output_rs.use_count() > 2) {
609
0
            LOG(WARNING) << "Rowset " << output_rs->rowset_id().to_string() << " has "
610
0
                         << output_rs.use_count()
611
0
                         << " references. File Cache won't be recycled when query is using it.";
612
0
            return;
613
0
        }
614
131
        output_rs->clear_cache();
615
131
    }
616
82
}
617
618
bool CloudSchemaChangeJob::_should_cache_sc_output(
619
11.4k
        const std::vector<RowsetSharedPtr>& input_rowsets) {
620
11.4k
    int64_t total_size = 0;
621
11.4k
    int64_t cached_index_size = 0;
622
11.4k
    int64_t cached_data_size = 0;
623
624
11.4k
    for (const auto& rs : input_rowsets) {
625
9.17k
        const RowsetMetaSharedPtr& rs_meta = rs->rowset_meta();
626
9.17k
        total_size += rs_meta->total_disk_size();
627
9.17k
        cached_index_size += rs->approximate_cache_index_size();
628
9.17k
        cached_data_size += rs->approximate_cached_data_size();
629
9.17k
    }
630
631
11.4k
    double input_hit_rate = static_cast<double>(cached_index_size + cached_data_size) / total_size;
632
633
11.4k
    LOG(INFO) << "CloudSchemaChangeJob check cache sc output strategy. "
634
11.4k
              << "job_id=" << _job_id << ", input_rowsets_count=" << input_rowsets.size()
635
11.4k
              << ", total_size=" << total_size << ", cached_index_size=" << cached_index_size
636
11.4k
              << ", cached_data_size=" << cached_data_size << ", input_hit_rate=" << input_hit_rate
637
11.4k
              << ", min_hit_ratio_threshold="
638
11.4k
              << config::file_cache_keep_schema_change_output_min_hit_ratio << ", should_cache="
639
11.4k
              << (input_hit_rate > config::file_cache_keep_schema_change_output_min_hit_ratio);
640
641
11.4k
    if (input_hit_rate > config::file_cache_keep_schema_change_output_min_hit_ratio) {
642
1.68k
        return true;
643
1.68k
    }
644
645
9.77k
    return false;
646
11.4k
}
647
648
} // namespace doris