Coverage Report

Created: 2026-03-12 17:15

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_delete_task.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_delete_task.h"
19
20
#include <gen_cpp/AgentService_types.h>
21
#include <thrift/protocol/TDebugProtocol.h>
22
23
#include "cloud/cloud_meta_mgr.h"
24
#include "cloud/cloud_storage_engine.h"
25
#include "cloud/cloud_tablet.h"
26
#include "cloud/cloud_tablet_mgr.h"
27
#include "common/logging.h"
28
#include "storage/delete/delete_handler.h"
29
30
namespace doris {
31
using namespace ErrorCode;
32
33
0
Status CloudDeleteTask::execute(CloudStorageEngine& engine, const TPushReq& request) {
34
0
    VLOG_DEBUG << "begin to process delete data. request=" << ThriftDebugString(request);
35
36
0
    if (!request.__isset.transaction_id) {
37
0
        return Status::InvalidArgument("transaction_id is not set");
38
0
    }
39
40
0
    auto tablet = DORIS_TRY(engine.tablet_mgr().get_tablet(request.tablet_id));
41
42
0
    if (!request.__isset.schema_version) {
43
0
        return Status::InternalError("No valid schema version in request, tablet_id={}",
44
0
                                     tablet->tablet_id());
45
0
    }
46
47
0
    using namespace std::chrono;
48
0
    tablet->last_load_time_ms =
49
0
            duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
50
    // check if version number exceed limit
51
52
0
    int32_t max_version_config = tablet->max_version_config();
53
0
    if (tablet->fetch_add_approximate_num_rowsets(0) > max_version_config) {
54
0
        LOG_WARNING("tablet exceeds max version num limit")
55
0
                .tag("limit", max_version_config)
56
0
                .tag("tablet_id", tablet->tablet_id());
57
0
        return Status::Error<TOO_MANY_VERSION>(
58
0
                "too many versions, versions={} tablet={}. Please reduce the frequency of loading "
59
0
                "data or adjust the max_tablet_version_num or time_series_max_tablet_version_num "
60
0
                "in be.conf to a larger value.",
61
0
                max_version_config, tablet->tablet_id());
62
0
    }
63
64
    // check delete condition if push for delete
65
0
    DeletePredicatePB del_pred;
66
0
    auto tablet_schema = std::make_shared<TabletSchema>();
67
    // FIXME(plat1ko): Rewrite columns updating logic
68
0
    tablet_schema->update_tablet_columns(*tablet->tablet_schema(), request.columns_desc);
69
0
    tablet_schema->update_indexes_from_thrift(request.index_list);
70
71
0
    tablet_schema->set_schema_version(request.schema_version);
72
0
    RETURN_IF_ERROR(DeleteHandler::generate_delete_predicate(*tablet_schema,
73
0
                                                             request.delete_conditions, &del_pred));
74
75
0
    PUniqueId load_id;
76
0
    load_id.set_hi(0);
77
0
    load_id.set_lo(0);
78
0
    RowsetWriterContext context;
79
0
    context.storage_resource = engine.get_storage_resource(request.storage_vault_id);
80
0
    if (!context.storage_resource) {
81
0
        return Status::InternalError("vault id not found, maybe not sync, vault id {}",
82
0
                                     request.storage_vault_id);
83
0
    }
84
85
0
    context.txn_id = request.transaction_id;
86
0
    context.load_id = load_id;
87
0
    context.rowset_state = PREPARED;
88
0
    context.segments_overlap = OVERLAP_UNKNOWN;
89
0
    context.tablet_schema = tablet_schema;
90
    // ATTN: `request.timeout` is always 0 in current version, so we MUST ensure that the retention time of the
91
    //  recycler is much longer than the duration of the push task
92
0
    context.txn_expiration = ::time(nullptr) + request.timeout;
93
0
    auto rowset_writer = DORIS_TRY(tablet->create_rowset_writer(context, false));
94
95
0
    RowsetSharedPtr rowset;
96
0
    RETURN_IF_ERROR(rowset_writer->build(rowset));
97
0
    rowset->rowset_meta()->set_delete_predicate(std::move(del_pred));
98
99
0
    auto st = engine.meta_mgr().prepare_rowset(*rowset_writer->rowset_meta(), "");
100
0
    if (!st.ok()) {
101
0
        LOG(WARNING) << "failed to prepare rowset, status=" << st.to_string();
102
0
        return st;
103
0
    }
104
105
0
    st = engine.meta_mgr().commit_rowset(*rowset->rowset_meta(), "");
106
107
    // Update tablet stats
108
0
    tablet->fetch_add_approximate_num_rowsets(1);
109
0
    tablet->fetch_add_approximate_cumu_num_rowsets(1);
110
111
    // TODO(liaoxin) delete operator don't send calculate delete bitmap task from fe,
112
    //  then we don't need to set_txn_related_delete_bitmap here.
113
0
    if (tablet->enable_unique_key_merge_on_write()) {
114
0
        DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(tablet->tablet_id());
115
0
        RowsetIdUnorderedSet rowset_ids;
116
0
        engine.txn_delete_bitmap_cache().set_tablet_txn_info(
117
0
                request.transaction_id, tablet->tablet_id(), delete_bitmap, rowset_ids, rowset,
118
0
                request.timeout, nullptr);
119
0
    }
120
121
0
    return st;
122
0
}
123
124
} // namespace doris