Coverage Report

Created: 2026-01-07 22:41

/root/doris/be/src/olap/full_compaction.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "olap/full_compaction.h"
19
20
#include <glog/logging.h>
21
#include <time.h>
22
23
#include <memory>
24
#include <mutex>
25
#include <ostream>
26
#include <shared_mutex>
27
28
#include "common/config.h"
29
#include "common/status.h"
30
#include "olap/base_tablet.h"
31
#include "olap/compaction.h"
32
#include "olap/cumulative_compaction_policy.h"
33
#include "olap/olap_common.h"
34
#include "olap/olap_define.h"
35
#include "olap/rowset/beta_rowset.h"
36
#include "olap/rowset/rowset.h"
37
#include "olap/schema_change.h"
38
#include "olap/tablet_meta.h"
39
#include "runtime/thread_context.h"
40
#include "util/debug_points.h"
41
#include "util/thread.h"
42
#include "util/trace.h"
43
44
namespace doris {
45
using namespace ErrorCode;
46
47
FullCompaction::FullCompaction(StorageEngine& engine, const TabletSharedPtr& tablet)
48
0
        : CompactionMixin(engine, tablet, "FullCompaction:" + std::to_string(tablet->tablet_id())) {
49
0
}
50
51
0
FullCompaction::~FullCompaction() {
52
0
    tablet()->set_is_full_compaction_running(false);
53
0
}
54
55
0
Status FullCompaction::prepare_compact() {
56
0
    Status st;
57
0
    Defer defer_set_st([&] {
58
0
        if (!st.ok()) {
59
0
            tablet()->set_last_full_compaction_status(st.to_string());
60
0
            tablet()->set_last_full_compaction_failure_time(UnixMillis());
61
0
        }
62
0
    });
63
0
    if (!tablet()->init_succeeded()) {
64
0
        st = Status::Error<INVALID_ARGUMENT, false>("Full compaction init failed");
65
0
        return st;
66
0
    }
67
68
0
    std::unique_lock base_lock(tablet()->get_base_compaction_lock());
69
0
    std::unique_lock cumu_lock(tablet()->get_cumulative_compaction_lock());
70
0
    tablet()->set_is_full_compaction_running(true);
71
72
0
    DBUG_EXECUTE_IF("FullCompaction.prepare_compact.set_cumu_point",
73
0
                    { tablet()->set_cumulative_layer_point(tablet()->max_version_unlocked() + 1); })
74
75
    // 1. pick rowsets to compact
76
0
    st = pick_rowsets_to_compact();
77
0
    RETURN_IF_ERROR(st);
78
79
0
    st = Status::OK();
80
0
    return st;
81
0
}
82
83
0
Status FullCompaction::execute_compact() {
84
0
    Status st;
85
0
    Defer defer_set_st([&] {
86
0
        tablet()->set_last_full_compaction_status(st.to_string());
87
0
        if (!st.ok()) {
88
0
            tablet()->set_last_full_compaction_failure_time(UnixMillis());
89
0
        } else {
90
0
            tablet()->set_last_full_compaction_success_time(UnixMillis());
91
0
        }
92
0
    });
93
0
    std::unique_lock base_lock(tablet()->get_base_compaction_lock());
94
0
    std::unique_lock cumu_lock(tablet()->get_cumulative_compaction_lock());
95
96
0
    SCOPED_ATTACH_TASK(_mem_tracker);
97
98
0
    st = CompactionMixin::execute_compact();
99
0
    RETURN_IF_ERROR(st);
100
101
0
    tablet()->cumulative_compaction_policy()->update_compaction_level(tablet(), _input_rowsets,
102
0
                                                                      _output_rowset);
103
104
0
    Version last_version = _input_rowsets.back()->version();
105
0
    tablet()->cumulative_compaction_policy()->update_cumulative_point(tablet(), _input_rowsets,
106
0
                                                                      _output_rowset, last_version);
107
0
    VLOG_CRITICAL << "after cumulative compaction, current cumulative point is "
108
0
                  << tablet()->cumulative_layer_point() << ", tablet=" << _tablet->tablet_id();
109
110
0
    st = Status::OK();
111
0
    return st;
112
0
}
113
114
0
Status FullCompaction::pick_rowsets_to_compact() {
115
0
    _input_rowsets = tablet()->pick_candidate_rowsets_to_full_compaction();
116
0
    RETURN_IF_ERROR(check_version_continuity(_input_rowsets));
117
0
    RETURN_IF_ERROR(_check_all_version(_input_rowsets));
118
0
    if (_input_rowsets.size() <= 1) {
119
0
        return Status::Error<FULL_NO_SUITABLE_VERSION>("There is no suitable version");
120
0
    }
121
122
0
    if (_input_rowsets.size() == 2 && _input_rowsets[0]->end_version() == 1) {
123
        // the tablet is with rowset: [0-1], [2-y]
124
        // and [0-1] has no data. in this situation, no need to do full compaction.
125
0
        return Status::Error<FULL_NO_SUITABLE_VERSION>("There is no suitable version");
126
0
    }
127
128
0
    return Status::OK();
129
0
}
130
131
0
Status FullCompaction::modify_rowsets() {
132
0
    std::vector<RowsetSharedPtr> output_rowsets {_output_rowset};
133
0
    if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
134
0
        _tablet->enable_unique_key_merge_on_write()) {
135
0
        std::vector<RowsetSharedPtr> tmp_rowsets {};
136
137
        // tablet is under alter process. The delete bitmap will be calculated after conversion.
138
0
        if (_tablet->tablet_state() == TABLET_NOTREADY) {
139
0
            LOG(INFO) << "tablet is under alter process, update delete bitmap later, tablet_id="
140
0
                      << _tablet->tablet_id();
141
0
            return Status::OK();
142
0
        }
143
144
0
        int64_t max_version = tablet()->max_version().second;
145
0
        DCHECK(max_version >= _output_rowset->version().second);
146
0
        if (max_version > _output_rowset->version().second) {
147
0
            auto ret = DORIS_TRY(_tablet->capture_consistent_rowsets_unlocked(
148
0
                    {_output_rowset->version().second + 1, max_version}, CaptureRowsetOps {}));
149
0
            tmp_rowsets = std::move(ret.rowsets);
150
0
        }
151
152
0
        for (const auto& it : tmp_rowsets) {
153
0
            const int64_t& cur_version = it->rowset_meta()->start_version();
154
0
            RETURN_IF_ERROR(_full_compaction_calc_delete_bitmap(it, _output_rowset, cur_version,
155
0
                                                                _output_rs_writer.get()));
156
0
        }
157
158
0
        DBUG_EXECUTE_IF("FullCompaction.modify_rowsets.before.block", DBUG_BLOCK);
159
0
        std::lock_guard rowset_update_lock(tablet()->get_rowset_update_lock());
160
0
        std::lock_guard header_lock(_tablet->get_header_lock());
161
0
        SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
162
0
        for (const auto& it : tablet()->rowset_map()) {
163
0
            const int64_t& cur_version = it.first.first;
164
0
            const RowsetSharedPtr& published_rowset = it.second;
165
0
            if (cur_version > max_version) {
166
0
                RETURN_IF_ERROR(_full_compaction_calc_delete_bitmap(
167
0
                        published_rowset, _output_rowset, cur_version, _output_rs_writer.get()));
168
0
            }
169
0
        }
170
0
        RETURN_IF_ERROR(tablet()->modify_rowsets(output_rowsets, _input_rowsets, true));
171
0
        DBUG_EXECUTE_IF("FullCompaction.modify_rowsets.sleep", { sleep(5); })
172
0
        tablet()->save_meta();
173
0
    } else {
174
0
        DBUG_EXECUTE_IF("FullCompaction.modify_rowsets.before.block", DBUG_BLOCK);
175
0
        std::lock_guard<std::mutex> rowset_update_wlock(tablet()->get_rowset_update_lock());
176
0
        std::lock_guard<std::shared_mutex> meta_wlock(_tablet->get_header_lock());
177
0
        RETURN_IF_ERROR(tablet()->modify_rowsets(output_rowsets, _input_rowsets, true));
178
0
        DBUG_EXECUTE_IF("FullCompaction.modify_rowsets.sleep", { sleep(5); })
179
0
        tablet()->save_meta();
180
0
    }
181
0
    return Status::OK();
182
0
}
183
184
0
Status FullCompaction::_check_all_version(const std::vector<RowsetSharedPtr>& rowsets) {
185
0
    if (rowsets.empty()) {
186
0
        return Status::Error<FULL_MISS_VERSION, false>(
187
0
                "There is no input rowset when do full compaction");
188
0
    }
189
0
    const RowsetSharedPtr& last_rowset = rowsets.back();
190
0
    const RowsetSharedPtr& first_rowset = rowsets.front();
191
0
    auto max_version = tablet()->max_version();
192
0
    if (last_rowset->version() != max_version || first_rowset->version().first != 0) {
193
0
        return Status::Error<FULL_MISS_VERSION, false>(
194
0
                "Full compaction rowsets' versions not equal to all exist rowsets' versions. "
195
0
                "full compaction rowsets max version={}-{}"
196
0
                ", current rowsets max version={}-{}"
197
0
                ", full compaction rowsets min version={}-{}, current rowsets min version=0-1",
198
0
                last_rowset->start_version(), last_rowset->end_version(), max_version.first,
199
0
                max_version.second, first_rowset->start_version(), first_rowset->end_version());
200
0
    }
201
0
    return Status::OK();
202
0
}
203
204
Status FullCompaction::_full_compaction_calc_delete_bitmap(const RowsetSharedPtr& published_rowset,
205
                                                           const RowsetSharedPtr& rowset,
206
                                                           int64_t cur_version,
207
0
                                                           RowsetWriter* rowset_writer) {
208
0
    std::vector<segment_v2::SegmentSharedPtr> segments;
209
0
    RETURN_IF_ERROR(
210
0
            std::static_pointer_cast<BetaRowset>(published_rowset)->load_segments(&segments));
211
0
    DeleteBitmapPtr delete_bitmap =
212
0
            std::make_shared<DeleteBitmap>(_tablet->tablet_meta()->tablet_id());
213
0
    std::vector<RowsetSharedPtr> specified_rowsets {rowset};
214
215
0
    OlapStopWatch watch;
216
0
    RETURN_IF_ERROR(BaseTablet::calc_delete_bitmap(_tablet, published_rowset, segments,
217
0
                                                   specified_rowsets, delete_bitmap, cur_version,
218
0
                                                   nullptr, rowset_writer));
219
0
    size_t total_rows = std::accumulate(
220
0
            segments.begin(), segments.end(), 0,
221
0
            [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); });
222
0
    for (const auto& [k, v] : delete_bitmap->delete_bitmap) {
223
0
        if (std::get<1>(k) != DeleteBitmap::INVALID_SEGMENT_ID) {
224
0
            _tablet->tablet_meta()->delete_bitmap()->merge(
225
0
                    {std::get<0>(k), std::get<1>(k), cur_version}, v);
226
0
        }
227
0
    }
228
0
    VLOG_DEBUG << "[Full compaction] construct delete bitmap tablet: " << _tablet->tablet_id()
229
0
               << ", published rowset version: [" << published_rowset->version().first << "-"
230
0
               << published_rowset->version().second << "]"
231
0
               << ", full compaction rowset version: [" << rowset->version().first << "-"
232
0
               << rowset->version().second << "]"
233
0
               << ", cost: " << watch.get_elapse_time_us() << "(us), total rows: " << total_rows;
234
0
    return Status::OK();
235
0
}
236
237
} // namespace doris