Coverage Report

Created: 2025-05-20 23:17

/root/doris/be/src/olap/cumulative_compaction.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "olap/cumulative_compaction.h"
19
20
#include <memory>
21
#include <mutex>
22
#include <ostream>
23
24
#include "common/config.h"
25
#include "common/logging.h"
26
#include "olap/cumulative_compaction_policy.h"
27
#include "olap/cumulative_compaction_time_series_policy.h"
28
#include "olap/olap_define.h"
29
#include "olap/rowset/rowset_meta.h"
30
#include "olap/tablet.h"
31
#include "runtime/thread_context.h"
32
#include "util/doris_metrics.h"
33
#include "util/time.h"
34
#include "util/trace.h"
35
36
namespace doris {
37
using namespace ErrorCode;
38
39
void CumulativeCompaction::find_longest_consecutive_version(std::vector<RowsetSharedPtr>* rowsets,
40
27
                                                            std::vector<Version>* missing_version) {
41
27
    if (rowsets->empty()) {
42
0
        return;
43
0
    }
44
45
27
    RowsetSharedPtr prev_rowset = rowsets->front();
46
27
    size_t i = 1;
47
27
    int max_start = 0;
48
27
    int max_length = 1;
49
50
27
    int start = 0;
51
27
    int length = 1;
52
603
    for (; i < rowsets->size(); ++i) {
53
576
        RowsetSharedPtr rowset = (*rowsets)[i];
54
576
        if (rowset->start_version() != prev_rowset->end_version() + 1) {
55
6
            if (missing_version != nullptr) {
56
6
                missing_version->push_back(prev_rowset->version());
57
6
                missing_version->push_back(rowset->version());
58
6
            }
59
6
            start = i;
60
6
            length = 1;
61
570
        } else {
62
570
            length++;
63
570
        }
64
65
576
        if (length > max_length) {
66
560
            max_start = start;
67
560
            max_length = length;
68
560
        }
69
70
576
        prev_rowset = rowset;
71
576
    }
72
27
    *rowsets = {rowsets->begin() + max_start, rowsets->begin() + max_start + max_length};
73
27
}
74
75
CumulativeCompaction::CumulativeCompaction(StorageEngine& engine, const TabletSharedPtr& tablet)
76
        : CompactionMixin(engine, tablet,
77
23
                          "CumulativeCompaction:" + std::to_string(tablet->tablet_id())) {}
78
79
23
CumulativeCompaction::~CumulativeCompaction() = default;
80
81
20
Status CumulativeCompaction::prepare_compact() {
82
20
    Status st;
83
20
    Defer defer_set_st([&] {
84
20
        if (!st.ok()) {
85
0
            tablet()->set_last_cumu_compaction_status(st.to_string());
86
0
            tablet()->set_last_cumu_compaction_failure_time(UnixMillis());
87
0
        }
88
20
    });
89
90
20
    if (!tablet()->init_succeeded()) {
91
0
        st = Status::Error<CUMULATIVE_INVALID_PARAMETERS, false>("_tablet init failed");
92
0
        return st;
93
0
    }
94
95
20
    std::unique_lock<std::mutex> lock(tablet()->get_cumulative_compaction_lock(), std::try_to_lock);
96
20
    if (!lock.owns_lock()) {
97
0
        st = Status::Error<TRY_LOCK_FAILED, false>(
98
0
                "The tablet is under cumulative compaction. tablet={}", _tablet->tablet_id());
99
0
        return st;
100
0
    }
101
102
20
    tablet()->calculate_cumulative_point();
103
20
    VLOG_CRITICAL << "after calculate, current cumulative point is "
104
0
                  << tablet()->cumulative_layer_point() << ", tablet=" << _tablet->tablet_id();
105
106
20
    st = pick_rowsets_to_compact();
107
20
    RETURN_IF_ERROR(st);
108
109
20
    COUNTER_UPDATE(_input_rowsets_counter, _input_rowsets.size());
110
111
20
    st = Status::OK();
112
20
    return st;
113
20
}
114
115
0
Status CumulativeCompaction::execute_compact() {
116
0
    DBUG_EXECUTE_IF("CumulativeCompaction::execute_compact.block", {
117
0
        auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
118
0
        if (target_tablet_id == _tablet->tablet_id()) {
119
0
            LOG(INFO) << "start debug block "
120
0
                      << "CumulativeCompaction::execute_compact.block";
121
0
            while (DebugPoints::instance()->is_enable(
122
0
                    "CumulativeCompaction::execute_compact.block")) {
123
0
                std::this_thread::sleep_for(std::chrono::milliseconds(200));
124
0
            }
125
0
            LOG(INFO) << "end debug block "
126
0
                      << "CumulativeCompaction::execute_compact.block";
127
0
        }
128
0
    })
129
130
0
    Status st;
131
0
    Defer defer_set_st([&] {
132
0
        tablet()->set_last_cumu_compaction_status(st.to_string());
133
0
        if (!st.ok()) {
134
0
            tablet()->set_last_cumu_compaction_failure_time(UnixMillis());
135
0
        } else {
136
            // TIME_SERIES_POLICY, generating an empty rowset doesn't need to update the timestamp.
137
0
            if (!(tablet()->tablet_meta()->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY &&
138
0
                  _output_rowset->num_segments() == 0)) {
139
0
                tablet()->set_last_cumu_compaction_success_time(UnixMillis());
140
0
            }
141
0
        }
142
0
    });
143
0
    std::unique_lock<std::mutex> lock(tablet()->get_cumulative_compaction_lock(), std::try_to_lock);
144
0
    if (!lock.owns_lock()) {
145
0
        st = Status::Error<TRY_LOCK_FAILED, false>(
146
0
                "The tablet is under cumulative compaction. tablet={}", _tablet->tablet_id());
147
0
        return st;
148
0
    }
149
150
0
    SCOPED_ATTACH_TASK(_mem_tracker);
151
152
0
    st = CompactionMixin::execute_compact();
153
0
    RETURN_IF_ERROR(st);
154
155
0
    DCHECK_EQ(_state, CompactionState::SUCCESS);
156
157
0
    tablet()->cumulative_compaction_policy()->update_cumulative_point(
158
0
            tablet(), _input_rowsets, _output_rowset, _last_delete_version);
159
0
    VLOG_CRITICAL << "after cumulative compaction, current cumulative point is "
160
0
                  << tablet()->cumulative_layer_point() << ", tablet=" << _tablet->tablet_id();
161
0
    DorisMetrics::instance()->cumulative_compaction_deltas_total->increment(_input_rowsets.size());
162
0
    DorisMetrics::instance()->cumulative_compaction_bytes_total->increment(
163
0
            _input_rowsets_total_size);
164
165
0
    st = Status::OK();
166
0
    return st;
167
0
}
168
169
20
Status CumulativeCompaction::pick_rowsets_to_compact() {
170
20
    auto candidate_rowsets = tablet()->pick_candidate_rowsets_to_cumulative_compaction();
171
20
    if (candidate_rowsets.empty()) {
172
0
        return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("candidate_rowsets is empty");
173
0
    }
174
175
    // candidate_rowsets may not be continuous
176
    // So we need to choose the longest continuous path from it.
177
20
    std::vector<Version> missing_versions;
178
20
    find_longest_consecutive_version(&candidate_rowsets, &missing_versions);
179
20
    if (!missing_versions.empty()) {
180
0
        DCHECK(missing_versions.size() % 2 == 0);
181
0
        LOG(WARNING) << "There are missed versions among rowsets. "
182
0
                     << "total missed version size: " << missing_versions.size() / 2
183
0
                     << ", first missed version prev rowset verison=" << missing_versions[0]
184
0
                     << ", first missed version next rowset version=" << missing_versions[1]
185
0
                     << ", tablet=" << _tablet->tablet_id();
186
0
    }
187
188
20
    int64_t max_score = config::cumulative_compaction_max_deltas;
189
20
    auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage();
190
20
    bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit() * 0.8;
191
20
    if (tablet()->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || memory_usage_high) {
192
0
        max_score = std::max(config::cumulative_compaction_max_deltas /
193
0
                                     config::cumulative_compaction_max_deltas_factor,
194
0
                             config::cumulative_compaction_min_deltas + 1);
195
0
    }
196
197
20
    size_t compaction_score = 0;
198
20
    tablet()->cumulative_compaction_policy()->pick_input_rowsets(
199
20
            tablet(), candidate_rowsets, max_score, config::cumulative_compaction_min_deltas,
200
20
            &_input_rowsets, &_last_delete_version, &compaction_score,
201
20
            _allow_delete_in_cumu_compaction);
202
203
    // Cumulative compaction will process with at least 1 rowset.
204
    // So when there is no rowset being chosen, we should return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>():
205
20
    if (_input_rowsets.empty()) {
206
0
        if (_last_delete_version.first != -1) {
207
            // we meet a delete version, should increase the cumulative point to let base compaction handle the delete version.
208
            // plus 1 to skip the delete version.
209
            // NOTICE: after that, the cumulative point may be larger than max version of this tablet, but it doesn't matter.
210
0
            tablet()->set_cumulative_layer_point(_last_delete_version.first + 1);
211
0
            LOG_INFO(
212
0
                    "cumulative compaction meet delete rowset, increase cumu point without "
213
0
                    "other "
214
0
                    "operation.")
215
0
                    .tag("tablet id:", tablet()->tablet_id())
216
0
                    .tag("after cumulative compaction, cumu point:",
217
0
                         tablet()->cumulative_layer_point());
218
0
            return Status::Error<CUMULATIVE_MEET_DELETE_VERSION>(
219
0
                    "cumulative compaction meet delete version");
220
0
        }
221
222
        // we did not meet any delete version. which means compaction_score is not enough to do cumulative compaction.
223
        // We should wait until there are more rowsets to come, and keep the cumulative point unchanged.
224
        // But in order to avoid the stall of compaction because no new rowset arrives later, we should increase
225
        // the cumulative point after waiting for a long time, to ensure that the base compaction can continue.
226
227
        // check both last success time of base and cumulative compaction
228
0
        int64_t now = UnixMillis();
229
0
        int64_t last_cumu = tablet()->last_cumu_compaction_success_time();
230
0
        int64_t last_base = tablet()->last_base_compaction_success_time();
231
0
        if (last_cumu != 0 || last_base != 0) {
232
0
            int64_t interval_threshold = config::pick_rowset_to_compact_interval_sec * 1000;
233
0
            int64_t cumu_interval = now - last_cumu;
234
0
            int64_t base_interval = now - last_base;
235
0
            if (cumu_interval > interval_threshold && base_interval > interval_threshold) {
236
                // before increasing cumulative point, we should make sure all rowsets are non-overlapping.
237
                // if at least one rowset is overlapping, we should compact them first.
238
0
                for (auto& rs : candidate_rowsets) {
239
0
                    if (rs->rowset_meta()->is_segments_overlapping()) {
240
0
                        _input_rowsets = candidate_rowsets;
241
0
                        return Status::OK();
242
0
                    }
243
0
                }
244
245
                // all candidate rowsets are non-overlapping, increase the cumulative point
246
0
                tablet()->set_cumulative_layer_point(candidate_rowsets.back()->start_version() + 1);
247
0
            }
248
0
        } else {
249
            // init the compaction success time for first time
250
0
            if (last_cumu == 0) {
251
0
                tablet()->set_last_cumu_compaction_success_time(now);
252
0
            }
253
254
0
            if (last_base == 0) {
255
0
                tablet()->set_last_base_compaction_success_time(now);
256
0
            }
257
0
        }
258
259
0
        return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("_input_rowsets is empty");
260
0
    }
261
262
20
    return Status::OK();
263
20
}
264
265
} // namespace doris