Coverage Report

Created: 2026-04-24 20:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_schema_change_job.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_schema_change_job.h"
19
20
#include <gen_cpp/Types_types.h>
21
#include <gen_cpp/cloud.pb.h>
22
23
#include <algorithm>
24
#include <chrono>
25
#include <memory>
26
#include <mutex>
27
#include <random>
28
#include <ranges>
29
#include <thread>
30
31
#include "cloud/cloud_meta_mgr.h"
32
#include "cloud/cloud_tablet_mgr.h"
33
#include "common/status.h"
34
#include "service/backend_options.h"
35
#include "storage/delete/delete_handler.h"
36
#include "storage/index/inverted/inverted_index_desc.h"
37
#include "storage/olap_define.h"
38
#include "storage/rowset/beta_rowset.h"
39
#include "storage/rowset/rowset.h"
40
#include "storage/rowset/rowset_factory.h"
41
#include "storage/storage_engine.h"
42
#include "storage/tablet/tablet.h"
43
#include "storage/tablet/tablet_fwd.h"
44
#include "storage/tablet/tablet_meta.h"
45
#include "util/debug_points.h"
46
47
namespace doris {
48
using namespace ErrorCode;
49
50
static constexpr int ALTER_TABLE_BATCH_SIZE = 4096;
51
static constexpr int SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID = -2;
52
53
std::unique_ptr<SchemaChange> get_sc_procedure(const BlockChanger& changer, bool sc_sorting,
54
0
                                               int64_t mem_limit) {
55
0
    if (sc_sorting) {
56
0
        return std::make_unique<VBaseSchemaChangeWithSorting>(changer, mem_limit);
57
0
    }
58
    // else sc_directly
59
0
    return std::make_unique<VSchemaChangeDirectly>(changer);
60
0
}
61
62
CloudSchemaChangeJob::CloudSchemaChangeJob(CloudStorageEngine& cloud_storage_engine,
63
                                           std::string job_id, int64_t expiration)
64
3
        : _cloud_storage_engine(cloud_storage_engine),
65
3
          _job_id(std::move(job_id)),
66
3
          _expiration(expiration) {
67
3
    _initiator = boost::uuids::hash_value(UUIDGenerator::instance()->next_uuid()) &
68
3
                 std::numeric_limits<int64_t>::max();
69
3
}
70
71
3
CloudSchemaChangeJob::~CloudSchemaChangeJob() = default;
72
73
3
Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& request) {
74
3
    DBUG_EXECUTE_IF("CloudSchemaChangeJob::process_alter_tablet.block", DBUG_BLOCK);
75
    // new tablet has to exist
76
3
    _new_tablet = DORIS_TRY(_cloud_storage_engine.tablet_mgr().get_tablet(request.new_tablet_id));
77
3
    if (_new_tablet->tablet_state() == TABLET_RUNNING) {
78
0
        LOG(INFO) << "schema change job has already finished. base_tablet_id="
79
0
                  << request.base_tablet_id << ", new_tablet_id=" << request.new_tablet_id
80
0
                  << ", alter_version=" << request.alter_version << ", job_id=" << _job_id;
81
0
        return Status::OK();
82
0
    }
83
84
3
    _base_tablet = DORIS_TRY(_cloud_storage_engine.tablet_mgr().get_tablet(request.base_tablet_id));
85
86
3
    static constexpr long TRY_LOCK_TIMEOUT = 30;
87
3
    std::unique_lock schema_change_lock(_base_tablet->get_schema_change_lock(), std::defer_lock);
88
3
    bool owns_lock = schema_change_lock.try_lock_for(std::chrono::seconds(TRY_LOCK_TIMEOUT));
89
90
3
    _new_tablet->set_alter_failed(false);
91
3
    Defer defer([this] {
92
        // if tablet state is not TABLET_RUNNING when return, indicates that alter has failed.
93
3
        if (_new_tablet->tablet_state() != TABLET_RUNNING) {
94
3
            _new_tablet->set_alter_failed(true);
95
3
        }
96
3
    });
97
98
3
    if (!owns_lock) {
99
0
        LOG(WARNING) << "Failed to obtain schema change lock, there might be inverted index being "
100
0
                        "built on base_tablet="
101
0
                     << request.base_tablet_id;
102
0
        return Status::Error<TRY_LOCK_FAILED>(
103
0
                "Failed to obtain schema change lock, there might be inverted index being "
104
0
                "built on base_tablet=",
105
0
                request.base_tablet_id);
106
0
    }
107
    // MUST sync rowsets before capturing rowset readers and building DeleteHandler
108
3
    SyncOptions options;
109
3
    options.query_version = request.alter_version;
110
3
    RETURN_IF_ERROR(_base_tablet->sync_rowsets(options));
111
    // ATTN: Only convert rowsets of version larger than 1, MUST let the new tablet cache have rowset [0-1]
112
3
    _output_cumulative_point = _base_tablet->cumulative_layer_point();
113
3
    std::vector<RowSetSplits> rs_splits;
114
3
    int64_t base_max_version = _base_tablet->max_version_unlocked();
115
3
    DBUG_EXECUTE_IF("CloudSchemaChangeJob::process_alter_tablet.override_base_max_version", {
116
3
        auto v = dp->param<int64_t>("version", -1);
117
3
        if (v > 0) {
118
3
            LOG(INFO) << "override base_max_version from " << base_max_version << " to " << v;
119
3
            base_max_version = v;
120
3
        }
121
3
    });
122
3
    cloud::TabletJobInfoPB job;
123
3
    auto* idx = job.mutable_idx();
124
3
    idx->set_tablet_id(_base_tablet->tablet_id());
125
3
    idx->set_table_id(_base_tablet->table_id());
126
3
    idx->set_index_id(_base_tablet->index_id());
127
3
    idx->set_partition_id(_base_tablet->partition_id());
128
3
    auto* sc_job = job.mutable_schema_change();
129
3
    sc_job->set_id(_job_id);
130
3
    sc_job->set_initiator(BackendOptions::get_localhost() + ':' +
131
3
                          std::to_string(config::heartbeat_service_port));
132
3
    sc_job->set_alter_version(base_max_version);
133
3
    auto* new_tablet_idx = sc_job->mutable_new_tablet_idx();
134
3
    new_tablet_idx->set_tablet_id(_new_tablet->tablet_id());
135
3
    new_tablet_idx->set_table_id(_new_tablet->table_id());
136
3
    new_tablet_idx->set_index_id(_new_tablet->index_id());
137
3
    new_tablet_idx->set_partition_id(_new_tablet->partition_id());
138
3
    cloud::StartTabletJobResponse start_resp;
139
3
    auto st = _cloud_storage_engine.meta_mgr().prepare_tablet_job(job, &start_resp);
140
3
    if (!st.ok()) {
141
0
        if (start_resp.status().code() == cloud::JOB_ALREADY_SUCCESS) {
142
0
            st = _new_tablet->sync_rowsets();
143
0
            if (!st.ok()) {
144
0
                LOG_WARNING("failed to sync new tablet")
145
0
                        .tag("tablet_id", _new_tablet->tablet_id())
146
0
                        .error(st);
147
0
            }
148
0
            return Status::OK();
149
0
        }
150
0
        return st;
151
0
    }
152
3
    DBUG_EXECUTE_IF("CloudSchemaChangeJob::process_alter_tablet.alter_fail", {
153
3
        auto res =
154
3
                Status::InternalError("inject alter tablet failed. base_tablet={}, new_tablet={}",
155
3
                                      request.base_tablet_id, request.new_tablet_id);
156
3
        LOG(WARNING) << "inject error. res=" << res;
157
3
        return res;
158
3
    });
159
160
    // Check for cross-V1 compaction rowsets on new tablet.
161
    // A compaction may have committed a rowset that crosses the alter_version
162
    // boundary (V1) before prepare_tablet_job's clear_compaction took effect.
163
    // If detected, abort the SC job in meta-service so the next retry registers
164
    // a fresh job with a higher V1 (where the crossing rowset falls within range).
165
3
    {
166
3
        RETURN_IF_ERROR(_new_tablet->sync_rowsets());
167
3
        std::shared_lock rlock(_new_tablet->get_header_lock());
168
3
        for (auto& [v, rs] : _new_tablet->rowset_map()) {
169
3
            if (v.first > 1 && v.first <= start_resp.alter_version() &&
170
3
                v.second > start_resp.alter_version()) {
171
3
                LOG(WARNING) << "cross-V1 compaction detected on new tablet"
172
3
                             << ", tablet_id=" << _new_tablet->tablet_id() << ", rowset=["
173
3
                             << v.first << "-" << v.second << "]"
174
3
                             << ", alter_version=" << start_resp.alter_version()
175
3
                             << ", job_id=" << _job_id << ". Aborting SC job and retrying.";
176
                // Abort the SC job so the next retry can register with a higher alter_version.
177
                // retry_rpc() may already have committed an ABORT but lost the reply; the
178
                // replay then sees INVALID_ARGUMENT "there is no running schema_change",
179
                // which means the job is in fact cleared. Treat that as success so FE can
180
                // still retry. Any other failure is ambiguous — the stale job may remain
181
                // and subsequent retries would hit meta-service idempotency, so return a
182
                // non-retryable error to avoid burning retries on a stuck state.
183
3
                auto abort_st = _cloud_storage_engine.meta_mgr().abort_tablet_job(job);
184
3
                bool job_already_cleared =
185
3
                        abort_st.is<ErrorCode::INVALID_ARGUMENT>() &&
186
3
                        abort_st.to_string().find("no running schema_change") != std::string::npos;
187
3
                if (!abort_st.ok() && !job_already_cleared) {
188
1
                    LOG(WARNING) << "failed to abort SC job after cross-V1 detection"
189
1
                                 << ", tablet_id=" << _new_tablet->tablet_id()
190
1
                                 << ", error=" << abort_st
191
1
                                 << ". Returning non-retryable error to avoid stale job retries.";
192
1
                    return Status::InternalError(
193
1
                            "cross-V1 compaction detected but failed to abort SC job, "
194
1
                            "tablet_id={}, rowset=[{}-{}], alter_version={}, abort_err={}",
195
1
                            _new_tablet->tablet_id(), v.first, v.second, start_resp.alter_version(),
196
1
                            abort_st.to_string());
197
1
                }
198
2
                if (job_already_cleared) {
199
1
                    LOG(INFO) << "SC job already cleared (idempotent abort replay), safe to retry"
200
1
                              << ", tablet_id=" << _new_tablet->tablet_id();
201
1
                }
202
2
                return Status::Error<ErrorCode::SC_COMPACTION_CONFLICT>(
203
2
                        "cross-V1 compaction detected on new tablet, tablet_id={}, "
204
2
                        "rowset=[{}-{}], alter_version={}",
205
2
                        _new_tablet->tablet_id(), v.first, v.second, start_resp.alter_version());
206
3
            }
207
3
        }
208
3
    }
209
210
0
    if (request.alter_version > 1) {
211
        // [0-1] is a placeholder rowset, no need to convert
212
0
        RETURN_IF_ERROR(_base_tablet->capture_rs_readers({2, start_resp.alter_version()},
213
0
                                                         &rs_splits,
214
0
                                                         {.skip_missing_versions = false,
215
0
                                                          .enable_prefer_cached_rowset = false,
216
0
                                                          .query_freshness_tolerance_ms = -1}));
217
0
    }
218
    // Between prepare_tablet_job (SC job registered in meta-service) and
219
    // set_alter_version (local alter_version update). Used to test cross-V1 race.
220
0
    DBUG_EXECUTE_IF("CloudSchemaChangeJob::process_alter_tablet.after_prepare_job", DBUG_BLOCK);
221
222
0
    Defer defer2 {[&]() {
223
0
        _new_tablet->set_alter_version(-1);
224
0
        _base_tablet->set_alter_version(-1);
225
0
    }};
226
0
    _new_tablet->set_alter_version(start_resp.alter_version());
227
0
    _base_tablet->set_alter_version(start_resp.alter_version());
228
0
    LOG(INFO) << "Begin to alter tablet. base_tablet_id=" << request.base_tablet_id
229
0
              << ", new_tablet_id=" << request.new_tablet_id
230
0
              << ", alter_version=" << start_resp.alter_version() << ", job_id=" << _job_id;
231
0
    sc_job->set_alter_version(start_resp.alter_version());
232
233
    // FIXME(cyx): Should trigger compaction on base_tablet if there are too many rowsets to convert.
234
235
    // Create a new tablet schema, should merge with dropped columns in light weight schema change
236
0
    _base_tablet_schema = std::make_shared<TabletSchema>();
237
0
    _base_tablet_schema->update_tablet_columns(*_base_tablet->tablet_schema(), request.columns);
238
0
    _new_tablet_schema = _new_tablet->tablet_schema();
239
240
0
    std::vector<ColumnId> return_columns;
241
0
    return_columns.resize(_base_tablet_schema->num_columns());
242
0
    std::iota(return_columns.begin(), return_columns.end(), 0);
243
244
    // delete handlers to filter out deleted rows
245
0
    DeleteHandler delete_handler;
246
0
    std::vector<RowsetMetaSharedPtr> delete_predicates;
247
0
    for (auto& split : rs_splits) {
248
0
        auto& rs_meta = split.rs_reader->rowset()->rowset_meta();
249
0
        if (rs_meta->has_delete_predicate()) {
250
0
            _base_tablet_schema->merge_dropped_columns(*rs_meta->tablet_schema());
251
0
            delete_predicates.push_back(rs_meta);
252
0
        }
253
0
    }
254
0
    RETURN_IF_ERROR(delete_handler.init(_base_tablet_schema, delete_predicates,
255
0
                                        start_resp.alter_version()));
256
257
    // reader_context is stack variables, it's lifetime MUST keep the same with rs_readers
258
0
    RowsetReaderContext reader_context;
259
0
    reader_context.reader_type = ReaderType::READER_ALTER_TABLE;
260
0
    reader_context.tablet_schema = _base_tablet_schema;
261
0
    reader_context.need_ordered_result = true;
262
0
    reader_context.delete_handler = &delete_handler;
263
0
    reader_context.return_columns = &return_columns;
264
0
    reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx();
265
0
    reader_context.is_unique = _base_tablet->keys_type() == UNIQUE_KEYS;
266
0
    reader_context.batch_size = ALTER_TABLE_BATCH_SIZE;
267
0
    reader_context.delete_bitmap = _base_tablet->tablet_meta()->delete_bitmap_ptr();
268
0
    reader_context.version = Version(0, start_resp.alter_version());
269
0
    std::vector<uint32_t> cluster_key_idxes;
270
0
    if (!_base_tablet_schema->cluster_key_uids().empty()) {
271
0
        for (const auto& uid : _base_tablet_schema->cluster_key_uids()) {
272
0
            cluster_key_idxes.emplace_back(_base_tablet_schema->field_index(uid));
273
0
        }
274
0
        reader_context.read_orderby_key_columns = &cluster_key_idxes;
275
0
        reader_context.is_unique = false;
276
0
        reader_context.sequence_id_idx = -1;
277
0
    }
278
279
0
    for (auto& split : rs_splits) {
280
0
        RETURN_IF_ERROR(split.rs_reader->init(&reader_context));
281
0
    }
282
283
0
    SchemaChangeParams sc_params;
284
285
    // cache schema change output to file cache
286
0
    std::vector<RowsetSharedPtr> rowsets;
287
0
    rowsets.resize(rs_splits.size());
288
0
    std::transform(rs_splits.begin(), rs_splits.end(), rowsets.begin(),
289
0
                   [](RowSetSplits& split) { return split.rs_reader->rowset(); });
290
0
    sc_params.output_to_file_cache = _should_cache_sc_output(rowsets);
291
0
    if (request.__isset.query_globals && request.__isset.query_options) {
292
0
        sc_params.runtime_state =
293
0
                std::make_shared<RuntimeState>(request.query_options, request.query_globals);
294
0
    } else {
295
        // for old version request compatibility
296
0
        sc_params.runtime_state = std::make_shared<RuntimeState>();
297
0
    }
298
299
0
    RETURN_IF_ERROR(DescriptorTbl::create(&sc_params.pool, request.desc_tbl, &sc_params.desc_tbl));
300
0
    sc_params.ref_rowset_readers.reserve(rs_splits.size());
301
0
    for (RowSetSplits& split : rs_splits) {
302
0
        sc_params.ref_rowset_readers.emplace_back(std::move(split.rs_reader));
303
0
    }
304
0
    sc_params.delete_handler = &delete_handler;
305
0
    sc_params.be_exec_version = request.be_exec_version;
306
0
    DCHECK(request.__isset.alter_tablet_type);
307
0
    switch (request.alter_tablet_type) {
308
0
    case TAlterTabletType::SCHEMA_CHANGE:
309
0
        sc_params.alter_tablet_type = AlterTabletType::SCHEMA_CHANGE;
310
0
        break;
311
0
    case TAlterTabletType::ROLLUP:
312
0
        sc_params.alter_tablet_type = AlterTabletType::ROLLUP;
313
0
        break;
314
0
    case TAlterTabletType::MIGRATION:
315
0
        sc_params.alter_tablet_type = AlterTabletType::MIGRATION;
316
0
        break;
317
0
    }
318
0
    sc_params.vault_id = request.storage_vault_id;
319
0
    if (!request.__isset.materialized_view_params) {
320
0
        return _convert_historical_rowsets(sc_params, job);
321
0
    }
322
0
    for (auto item : request.materialized_view_params) {
323
0
        AlterMaterializedViewParam mv_param;
324
0
        mv_param.column_name = item.column_name;
325
        /*
326
         * origin_column_name is always be set now,
327
         * but origin_column_name may be not set in some materialized view function. eg:count(1)
328
        */
329
0
        if (item.__isset.origin_column_name) {
330
0
            mv_param.origin_column_name = item.origin_column_name;
331
0
        }
332
333
0
        if (item.__isset.mv_expr) {
334
0
            mv_param.expr = std::make_shared<TExpr>(item.mv_expr);
335
0
        }
336
0
        sc_params.materialized_params_map.insert(
337
0
                std::make_pair(to_lower(item.column_name), mv_param));
338
0
    }
339
0
    sc_params.enable_unique_key_merge_on_write = _new_tablet->enable_unique_key_merge_on_write();
340
0
    return _convert_historical_rowsets(sc_params, job);
341
0
}
342
343
Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParams& sc_params,
344
0
                                                         cloud::TabletJobInfoPB& job) {
345
0
    LOG(INFO) << "Begin to convert historical rowsets for new_tablet from base_tablet. base_tablet="
346
0
              << _base_tablet->tablet_id() << ", new_tablet=" << _new_tablet->tablet_id()
347
0
              << ", job_id=" << _job_id;
348
349
    // Add filter information in change, and filter column information will be set in _parse_request
350
    // And filter some data every time the row block changes
351
0
    BlockChanger changer(_new_tablet->tablet_schema(), *sc_params.desc_tbl,
352
0
                         sc_params.runtime_state);
353
354
0
    bool sc_sorting = false;
355
0
    bool sc_directly = false;
356
357
    // 1. Parse the Alter request and convert it into an internal representation
358
0
    RETURN_IF_ERROR(SchemaChangeJob::parse_request(sc_params, _base_tablet_schema.get(),
359
0
                                                   _new_tablet_schema.get(), &changer, &sc_sorting,
360
0
                                                   &sc_directly));
361
0
    if (!sc_sorting && !sc_directly && sc_params.alter_tablet_type == AlterTabletType::ROLLUP) {
362
0
        LOG(INFO) << "Don't support to add materialized view by linked schema change";
363
0
        return Status::InternalError(
364
0
                "Don't support to add materialized view by linked schema change");
365
0
    }
366
367
0
    LOG(INFO) << "schema change type, sc_sorting: " << sc_sorting
368
0
              << ", sc_directly: " << sc_directly << ", base_tablet=" << _base_tablet->tablet_id()
369
0
              << ", new_tablet=" << _new_tablet->tablet_id();
370
371
    // 2. Generate historical data converter
372
0
    auto sc_procedure = get_sc_procedure(
373
0
            changer, sc_sorting,
374
0
            _cloud_storage_engine.memory_limitation_bytes_per_thread_for_schema_change());
375
376
0
    DBUG_EXECUTE_IF("CloudSchemaChangeJob::_convert_historical_rowsets.block", DBUG_BLOCK);
377
378
    // 3. Convert historical data
379
0
    bool already_exist_any_version = false;
380
0
    for (const auto& rs_reader : sc_params.ref_rowset_readers) {
381
0
        VLOG_TRACE << "Begin to convert a history rowset. version=" << rs_reader->version();
382
383
0
        RowsetWriterContext context;
384
0
        context.txn_id = rs_reader->rowset()->txn_id();
385
0
        context.txn_expiration = _expiration;
386
0
        context.version = rs_reader->version();
387
0
        context.rowset_state = VISIBLE;
388
0
        context.segments_overlap = rs_reader->rowset()->rowset_meta()->segments_overlap();
389
0
        context.tablet_schema = _new_tablet->tablet_schema();
390
0
        context.newest_write_timestamp = rs_reader->newest_write_timestamp();
391
0
        context.storage_resource = _cloud_storage_engine.get_storage_resource(sc_params.vault_id);
392
0
        context.job_id = _job_id;
393
0
        context.write_file_cache = sc_params.output_to_file_cache;
394
0
        context.tablet = _new_tablet;
395
0
        if (!context.storage_resource) {
396
0
            return Status::InternalError("vault id not found, maybe not sync, vault id {}",
397
0
                                         sc_params.vault_id);
398
0
        }
399
400
0
        context.write_type = DataWriteType::TYPE_SCHEMA_CHANGE;
401
        // TODO if support VerticalSegmentWriter, also need to handle cluster key primary key index
402
0
        bool vertical = false;
403
0
        if (sc_sorting && !_new_tablet->tablet_schema()->cluster_key_uids().empty()) {
404
            // see VBaseSchemaChangeWithSorting::_external_sorting
405
0
            vertical = true;
406
0
        }
407
0
        auto rowset_writer = DORIS_TRY(_new_tablet->create_rowset_writer(context, vertical));
408
409
0
        RowsetMetaSharedPtr existed_rs_meta;
410
0
        auto st = _cloud_storage_engine.meta_mgr().prepare_rowset(*rowset_writer->rowset_meta(),
411
0
                                                                  _job_id, &existed_rs_meta);
412
0
        if (!st.ok()) {
413
0
            if (st.is<ALREADY_EXIST>()) {
414
0
                LOG(INFO) << "Rowset " << rs_reader->version() << " has already existed in tablet "
415
0
                          << _new_tablet->tablet_id();
416
                // Add already committed rowset to _output_rowsets.
417
0
                DCHECK(existed_rs_meta != nullptr);
418
0
                RowsetSharedPtr rowset;
419
                // schema is nullptr implies using RowsetMeta.tablet_schema
420
0
                RETURN_IF_ERROR(
421
0
                        RowsetFactory::create_rowset(nullptr, "", existed_rs_meta, &rowset));
422
0
                _output_rowsets.push_back(std::move(rowset));
423
0
                already_exist_any_version = true;
424
0
                continue;
425
0
            } else {
426
0
                return st;
427
0
            }
428
0
        }
429
430
0
        st = sc_procedure->process(rs_reader, rowset_writer.get(), _new_tablet, _base_tablet,
431
0
                                   _base_tablet_schema, _new_tablet_schema);
432
0
        if (!st.ok()) {
433
0
            return Status::InternalError(
434
0
                    "failed to process schema change on rowset, version=[{}-{}], status={}",
435
0
                    rs_reader->version().first, rs_reader->version().second, st.to_string());
436
0
        }
437
438
0
        RowsetSharedPtr new_rowset;
439
0
        st = rowset_writer->build(new_rowset);
440
0
        if (!st.ok()) {
441
0
            return Status::InternalError("failed to build rowset, version=[{}-{}] status={}",
442
0
                                         rs_reader->version().first, rs_reader->version().second,
443
0
                                         st.to_string());
444
0
        }
445
446
0
        st = _cloud_storage_engine.meta_mgr().commit_rowset(*rowset_writer->rowset_meta(), _job_id,
447
0
                                                            &existed_rs_meta);
448
0
        if (!st.ok()) {
449
0
            if (st.is<ALREADY_EXIST>()) {
450
0
                LOG(INFO) << "Rowset " << rs_reader->version() << " has already existed in tablet "
451
0
                          << _new_tablet->tablet_id();
452
                // Add already committed rowset to _output_rowsets.
453
0
                DCHECK(existed_rs_meta != nullptr);
454
0
                RowsetSharedPtr rowset;
455
                // schema is nullptr implies using RowsetMeta.tablet_schema
456
0
                RETURN_IF_ERROR(
457
0
                        RowsetFactory::create_rowset(nullptr, "", existed_rs_meta, &rowset));
458
0
                _output_rowsets.push_back(std::move(rowset));
459
0
                continue;
460
0
            } else {
461
0
                return st;
462
0
            }
463
0
        }
464
0
        _output_rowsets.push_back(std::move(new_rowset));
465
466
0
        VLOG_TRACE << "Successfully convert a history version " << rs_reader->version();
467
0
    }
468
0
    auto* sc_job = job.mutable_schema_change();
469
0
    if (!sc_params.ref_rowset_readers.empty()) {
470
0
        int64_t num_output_rows = 0;
471
0
        int64_t size_output_rowsets = 0;
472
0
        int64_t num_output_segments = 0;
473
0
        int64_t index_size_output_rowsets = 0;
474
0
        int64_t segment_size_output_rowsets = 0;
475
0
        for (auto& rs : _output_rowsets) {
476
0
            sc_job->add_txn_ids(rs->txn_id());
477
0
            sc_job->add_output_versions(rs->end_version());
478
0
            num_output_rows += rs->num_rows();
479
0
            size_output_rowsets += rs->total_disk_size();
480
0
            num_output_segments += rs->num_segments();
481
0
            index_size_output_rowsets += rs->index_disk_size();
482
0
            segment_size_output_rowsets += rs->data_disk_size();
483
0
        }
484
0
        sc_job->set_num_output_rows(num_output_rows);
485
0
        sc_job->set_size_output_rowsets(size_output_rowsets);
486
0
        sc_job->set_num_output_segments(num_output_segments);
487
0
        sc_job->set_num_output_rowsets(_output_rowsets.size());
488
0
        sc_job->set_index_size_output_rowsets(index_size_output_rowsets);
489
0
        sc_job->set_segment_size_output_rowsets(segment_size_output_rowsets);
490
0
    }
491
0
    _output_cumulative_point = std::min(_output_cumulative_point, sc_job->alter_version() + 1);
492
0
    sc_job->set_output_cumulative_point(_output_cumulative_point);
493
494
0
    DBUG_EXECUTE_IF("CloudSchemaChangeJob.process_alter_tablet.sleep", DBUG_BLOCK);
495
    // process delete bitmap if the table is MOW
496
0
    bool has_stop_token {false};
497
0
    bool should_clear_stop_token {true};
498
0
    Defer defer {[&]() {
499
0
        if (has_stop_token) {
500
0
            static_cast<void>(_cloud_storage_engine.unregister_compaction_stop_token(
501
0
                    _new_tablet, should_clear_stop_token));
502
0
        }
503
0
    }};
504
0
    if (_new_tablet->enable_unique_key_merge_on_write()) {
505
0
        has_stop_token = true;
506
        // If there are historical versions of rowsets, we need to recalculate their delete
507
        // bitmaps, otherwise we will miss the delete bitmaps of incremental rowsets
508
0
        int64_t start_calc_delete_bitmap_version =
509
                // [0-1] is a placeholder rowset, start from 2.
510
0
                already_exist_any_version ? 2 : sc_job->alter_version() + 1;
511
0
        RETURN_IF_ERROR(_process_delete_bitmap(sc_job->alter_version(),
512
0
                                               start_calc_delete_bitmap_version, _initiator,
513
0
                                               sc_params.vault_id));
514
0
        sc_job->set_delete_bitmap_lock_initiator(_initiator);
515
0
    }
516
517
0
    cloud::FinishTabletJobResponse finish_resp;
518
0
    DBUG_EXECUTE_IF("CloudSchemaChangeJob::_convert_historical_rowsets.test_conflict", {
519
0
        std::srand(static_cast<unsigned int>(std::time(nullptr)));
520
0
        int random_value = std::rand() % 100;
521
0
        if (random_value < 20) {
522
0
            return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>("test txn conflict");
523
0
        }
524
0
    });
525
0
    DBUG_EXECUTE_IF("CloudSchemaChangeJob::_convert_historical_rowsets.fail.before.commit_job", {
526
0
        LOG_INFO("inject retryable error before commit sc job, tablet={}",
527
0
                 _new_tablet->tablet_id());
528
0
        return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>("injected retryable error");
529
0
    });
530
0
    DBUG_EXECUTE_IF("CloudSchemaChangeJob::_convert_historical_rowsets.before.commit_job",
531
0
                    DBUG_BLOCK);
532
0
    auto st = _cloud_storage_engine.meta_mgr().commit_tablet_job(job, &finish_resp);
533
0
    if (!st.ok()) {
534
0
        if (finish_resp.status().code() == cloud::JOB_ALREADY_SUCCESS) {
535
0
            st = _new_tablet->sync_rowsets();
536
0
            if (!st.ok()) {
537
0
                LOG_WARNING("failed to sync new tablet")
538
0
                        .tag("tablet_id", _new_tablet->tablet_id())
539
0
                        .error(st);
540
0
            }
541
0
            return Status::OK();
542
0
        }
543
0
        return st;
544
0
    } else {
545
0
        should_clear_stop_token = false;
546
0
    }
547
0
    const auto& stats = finish_resp.stats();
548
0
    {
549
        // to prevent the converted historical rowsets be replaced by rowsets written on new tablet
550
        // during double write phase by `CloudMetaMgr::sync_tablet_rowsets` in another thread
551
0
        std::unique_lock lock {_new_tablet->get_sync_meta_lock()};
552
0
        std::unique_lock wlock(_new_tablet->get_header_lock());
553
        // Mirror MS behavior: delete rowsets in [2, alter_version] before adding
554
        // SC output rowsets to avoid stale compaction rowsets remaining visible.
555
0
        {
556
0
            int64_t alter_ver = sc_job->alter_version();
557
0
            std::vector<RowsetSharedPtr> to_delete;
558
0
            for (auto& [v, rs] : _new_tablet->rowset_map()) {
559
0
                if (v.first >= 2 && v.second <= alter_ver) {
560
0
                    to_delete.push_back(rs);
561
0
                }
562
0
            }
563
0
            if (!to_delete.empty()) {
564
0
                LOG_INFO(
565
0
                        "schema change: delete {} local rowsets in [2, {}] before adding SC "
566
0
                        "output, tablet_id={}, versions=[{}]",
567
0
                        to_delete.size(), alter_ver, _new_tablet->tablet_id(),
568
0
                        fmt::join(to_delete | std::views::transform([](const auto& rs) {
569
0
                                      return rs->version().to_string();
570
0
                                  }),
571
0
                                  ", "));
572
0
                _new_tablet->delete_rowsets_for_schema_change(to_delete, wlock);
573
0
            }
574
0
        }
575
0
        _new_tablet->add_rowsets(std::move(_output_rowsets), true, wlock, false);
576
0
        _new_tablet->set_cumulative_layer_point(_output_cumulative_point);
577
0
        _new_tablet->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(),
578
0
                                             stats.num_rows(), stats.data_size());
579
0
        RETURN_IF_ERROR(_new_tablet->set_tablet_state(TABLET_RUNNING));
580
0
    }
581
0
    return Status::OK();
582
0
}
583
584
Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
585
                                                    int64_t start_calc_delete_bitmap_version,
586
                                                    int64_t initiator,
587
0
                                                    const std::string& vault_id) {
588
0
    LOG_INFO("process mow table")
589
0
            .tag("new_tablet_id", _new_tablet->tablet_id())
590
0
            .tag("out_rowset_size", _output_rowsets.size())
591
0
            .tag("start_calc_delete_bitmap_version", start_calc_delete_bitmap_version)
592
0
            .tag("alter_version", alter_version);
593
0
    RETURN_IF_ERROR(_cloud_storage_engine.register_compaction_stop_token(_new_tablet, initiator));
594
0
    TabletMetaSharedPtr tmp_meta = std::make_shared<TabletMeta>(*(_new_tablet->tablet_meta()));
595
0
    tmp_meta->delete_bitmap().delete_bitmap.clear();
596
    // Keep only version [0-1] rowset, other rowsets will be added in _output_rowsets
597
0
    auto& rs_metas = tmp_meta->all_mutable_rs_metas();
598
0
    for (auto it = rs_metas.begin(); it != rs_metas.end();) {
599
0
        const auto& rs_meta = it->second;
600
0
        if (rs_meta->version().first == 0 && rs_meta->version().second == 1) {
601
0
            ++it;
602
0
        } else {
603
0
            it = rs_metas.erase(it);
604
0
        }
605
0
    }
606
607
0
    std::shared_ptr<CloudTablet> tmp_tablet =
608
0
            std::make_shared<CloudTablet>(_cloud_storage_engine, tmp_meta);
609
0
    {
610
0
        std::unique_lock wlock(tmp_tablet->get_header_lock());
611
0
        tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false);
612
        // Set alter version to let the tmp_tablet can fill hole rowset greater than alter_version
613
0
        tmp_tablet->set_alter_version(alter_version);
614
0
    }
615
616
    // step 1, process incremental rowset without delete bitmap update lock
617
0
    RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get()));
618
0
    int64_t max_version = tmp_tablet->max_version().second;
619
0
    LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
620
0
              << "incremental rowsets without lock, version: " << start_calc_delete_bitmap_version
621
0
              << "-" << max_version << " new_table_id: " << _new_tablet->tablet_id();
622
0
    if (max_version >= start_calc_delete_bitmap_version) {
623
0
        auto ret = DORIS_TRY(tmp_tablet->capture_consistent_rowsets_unlocked(
624
0
                {start_calc_delete_bitmap_version, max_version}, CaptureRowsetOps {}));
625
0
        DBUG_EXECUTE_IF("CloudSchemaChangeJob::_process_delete_bitmap.after.capture_without_lock",
626
0
                        DBUG_BLOCK);
627
0
        {
628
0
            std::unique_lock wlock(tmp_tablet->get_header_lock());
629
0
            tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false);
630
0
        }
631
0
        for (auto rowset : ret.rowsets) {
632
0
            RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset));
633
0
        }
634
0
    }
635
636
0
    DBUG_EXECUTE_IF("CloudSchemaChangeJob::_process_delete_bitmap.before_new_inc.block",
637
0
                    DBUG_BLOCK);
638
639
    // step 2, process incremental rowset with delete bitmap update lock
640
0
    RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().get_delete_bitmap_update_lock(
641
0
            *_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator));
642
0
    RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get()));
643
0
    int64_t new_max_version = tmp_tablet->max_version().second;
644
0
    LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
645
0
              << "incremental rowsets with lock, version: " << max_version + 1 << "-"
646
0
              << new_max_version << " new_tablet_id: " << _new_tablet->tablet_id();
647
0
    if (new_max_version > max_version) {
648
0
        auto ret = DORIS_TRY(tmp_tablet->capture_consistent_rowsets_unlocked(
649
0
                {max_version + 1, new_max_version}, CaptureRowsetOps {}));
650
0
        {
651
0
            std::unique_lock wlock(tmp_tablet->get_header_lock());
652
0
            tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false);
653
0
        }
654
0
        for (auto rowset : ret.rowsets) {
655
0
            RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset));
656
0
        }
657
0
    }
658
659
0
    DBUG_EXECUTE_IF("CloudSchemaChangeJob::_process_delete_bitmap.inject_sleep", {
660
0
        auto p = dp->param("percent", 0.01);
661
0
        auto sleep_time = dp->param("sleep", 100);
662
0
        std::mt19937 gen {std::random_device {}()};
663
0
        std::bernoulli_distribution inject_fault {p};
664
0
        if (inject_fault(gen)) {
665
0
            LOG_INFO("injection sleep for {} seconds, tablet_id={}, sc job_id={}", sleep_time,
666
0
                     _new_tablet->tablet_id(), _job_id);
667
0
            std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
668
0
        }
669
0
    });
670
671
0
    auto& delete_bitmap = tmp_tablet->tablet_meta()->delete_bitmap();
672
0
    auto storage_resource = _cloud_storage_engine.get_storage_resource(vault_id);
673
    // step4, store delete bitmap
674
0
    RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().update_delete_bitmap(
675
0
            *_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator, &delete_bitmap,
676
0
            &delete_bitmap, "", storage_resource, config::delete_bitmap_store_write_version));
677
678
0
    _new_tablet->tablet_meta()->delete_bitmap() = delete_bitmap;
679
0
    return Status::OK();
680
0
}
681
682
0
void CloudSchemaChangeJob::clean_up_on_failure() {
683
0
    if (_new_tablet == nullptr) {
684
0
        return;
685
0
    }
686
0
    if (_new_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
687
0
        _new_tablet->enable_unique_key_merge_on_write()) {
688
0
        _cloud_storage_engine.meta_mgr().remove_delete_bitmap_update_lock(
689
0
                _new_tablet->table_id(), SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, _initiator,
690
0
                _new_tablet->tablet_id());
691
0
    }
692
0
    for (const auto& output_rs : _output_rowsets) {
693
0
        if (output_rs.use_count() > 2) {
694
0
            LOG(WARNING) << "Rowset " << output_rs->rowset_id().to_string() << " has "
695
0
                         << output_rs.use_count()
696
0
                         << " references. File Cache won't be recycled when query is using it.";
697
0
            return;
698
0
        }
699
0
        output_rs->clear_cache();
700
0
    }
701
0
}
702
703
bool CloudSchemaChangeJob::_should_cache_sc_output(
704
0
        const std::vector<RowsetSharedPtr>& input_rowsets) {
705
0
    int64_t total_size = 0;
706
0
    int64_t cached_index_size = 0;
707
0
    int64_t cached_data_size = 0;
708
709
0
    for (const auto& rs : input_rowsets) {
710
0
        const RowsetMetaSharedPtr& rs_meta = rs->rowset_meta();
711
0
        total_size += rs_meta->total_disk_size();
712
0
        cached_index_size += rs->approximate_cache_index_size();
713
0
        cached_data_size += rs->approximate_cached_data_size();
714
0
    }
715
716
0
    double input_hit_rate = static_cast<double>(cached_index_size + cached_data_size) / total_size;
717
718
0
    LOG(INFO) << "CloudSchemaChangeJob check cache sc output strategy. "
719
0
              << "job_id=" << _job_id << ", input_rowsets_count=" << input_rowsets.size()
720
0
              << ", total_size=" << total_size << ", cached_index_size=" << cached_index_size
721
0
              << ", cached_data_size=" << cached_data_size << ", input_hit_rate=" << input_hit_rate
722
0
              << ", min_hit_ratio_threshold="
723
0
              << config::file_cache_keep_schema_change_output_min_hit_ratio << ", should_cache="
724
0
              << (input_hit_rate > config::file_cache_keep_schema_change_output_min_hit_ratio);
725
726
0
    if (input_hit_rate > config::file_cache_keep_schema_change_output_min_hit_ratio) {
727
0
        return true;
728
0
    }
729
730
0
    return false;
731
0
}
732
733
} // namespace doris