Coverage Report

Created: 2024-11-21 15:53

/root/doris/be/src/olap/cumulative_compaction.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "olap/cumulative_compaction.h"
19
20
#include <memory>
21
#include <mutex>
22
#include <ostream>
23
24
#include "common/config.h"
25
#include "common/logging.h"
26
#include "olap/cumulative_compaction_policy.h"
27
#include "olap/cumulative_compaction_time_series_policy.h"
28
#include "olap/olap_define.h"
29
#include "olap/rowset/rowset_meta.h"
30
#include "olap/tablet.h"
31
#include "runtime/thread_context.h"
32
#include "util/doris_metrics.h"
33
#include "util/time.h"
34
#include "util/trace.h"
35
36
namespace doris {
37
using namespace ErrorCode;
38
39
void CumulativeCompaction::find_longest_consecutive_version(std::vector<RowsetSharedPtr>* rowsets,
40
17
                                                            std::vector<Version>* missing_version) {
41
17
    if (rowsets->empty()) {
42
0
        return;
43
0
    }
44
45
17
    RowsetSharedPtr prev_rowset = rowsets->front();
46
17
    size_t i = 1;
47
17
    int max_start = 0;
48
17
    int max_length = 1;
49
50
17
    int start = 0;
51
17
    int length = 1;
52
323
    for (; i < rowsets->size(); ++i) {
53
306
        RowsetSharedPtr rowset = (*rowsets)[i];
54
306
        if (rowset->start_version() != prev_rowset->end_version() + 1) {
55
6
            if (missing_version != nullptr) {
56
6
                missing_version->push_back(prev_rowset->version());
57
6
                missing_version->push_back(rowset->version());
58
6
            }
59
6
            start = i;
60
6
            length = 1;
61
300
        } else {
62
300
            length++;
63
300
        }
64
65
306
        if (length > max_length) {
66
290
            max_start = start;
67
290
            max_length = length;
68
290
        }
69
70
306
        prev_rowset = rowset;
71
306
    }
72
17
    *rowsets = {rowsets->begin() + max_start, rowsets->begin() + max_start + max_length};
73
17
}
74
75
CumulativeCompaction::CumulativeCompaction(StorageEngine& engine, const TabletSharedPtr& tablet)
76
        : CompactionMixin(engine, tablet,
77
13
                          "CumulativeCompaction:" + std::to_string(tablet->tablet_id())) {}
78
79
13
CumulativeCompaction::~CumulativeCompaction() = default;
80
81
10
Status CumulativeCompaction::prepare_compact() {
82
10
    if (!tablet()->init_succeeded()) {
83
0
        return Status::Error<CUMULATIVE_INVALID_PARAMETERS, false>("_tablet init failed");
84
0
    }
85
86
10
    std::unique_lock<std::mutex> lock(tablet()->get_cumulative_compaction_lock(), std::try_to_lock);
87
10
    if (!lock.owns_lock()) {
88
0
        return Status::Error<TRY_LOCK_FAILED, false>(
89
0
                "The tablet is under cumulative compaction. tablet={}", _tablet->tablet_id());
90
0
    }
91
92
10
    tablet()->calculate_cumulative_point();
93
10
    VLOG_CRITICAL << "after calculate, current cumulative point is "
94
0
                  << tablet()->cumulative_layer_point() << ", tablet=" << _tablet->tablet_id();
95
96
10
    RETURN_IF_ERROR(pick_rowsets_to_compact());
97
10
    COUNTER_UPDATE(_input_rowsets_counter, _input_rowsets.size());
98
99
10
    return Status::OK();
100
10
}
101
102
0
Status CumulativeCompaction::execute_compact() {
103
0
    std::unique_lock<std::mutex> lock(tablet()->get_cumulative_compaction_lock(), std::try_to_lock);
104
0
    if (!lock.owns_lock()) {
105
0
        return Status::Error<TRY_LOCK_FAILED, false>(
106
0
                "The tablet is under cumulative compaction. tablet={}", _tablet->tablet_id());
107
0
    }
108
109
0
    SCOPED_ATTACH_TASK(_mem_tracker);
110
111
0
    RETURN_IF_ERROR(CompactionMixin::execute_compact());
112
0
    DCHECK_EQ(_state, CompactionState::SUCCESS);
113
0
    if (tablet()->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
114
0
        tablet()->cumulative_compaction_policy()->update_compaction_level(tablet(), _input_rowsets,
115
0
                                                                          _output_rowset);
116
0
    }
117
118
0
    tablet()->cumulative_compaction_policy()->update_cumulative_point(
119
0
            tablet(), _input_rowsets, _output_rowset, _last_delete_version);
120
0
    VLOG_CRITICAL << "after cumulative compaction, current cumulative point is "
121
0
                  << tablet()->cumulative_layer_point() << ", tablet=" << _tablet->tablet_id();
122
    // TIME_SERIES_POLICY, generating an empty rowset doesn't need to update the timestamp.
123
0
    if (!(tablet()->tablet_meta()->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY &&
124
0
          _output_rowset->num_segments() == 0)) {
125
0
        tablet()->set_last_cumu_compaction_success_time(UnixMillis());
126
0
    }
127
0
    DorisMetrics::instance()->cumulative_compaction_deltas_total->increment(_input_rowsets.size());
128
0
    DorisMetrics::instance()->cumulative_compaction_bytes_total->increment(
129
0
            _input_rowsets_total_size);
130
131
0
    return Status::OK();
132
0
}
133
134
10
Status CumulativeCompaction::pick_rowsets_to_compact() {
135
10
    auto candidate_rowsets = tablet()->pick_candidate_rowsets_to_cumulative_compaction();
136
10
    if (candidate_rowsets.empty()) {
137
0
        return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("candidate_rowsets is empty");
138
0
    }
139
140
    // candidate_rowsets may not be continuous
141
    // So we need to choose the longest continuous path from it.
142
10
    std::vector<Version> missing_versions;
143
10
    find_longest_consecutive_version(&candidate_rowsets, &missing_versions);
144
10
    if (!missing_versions.empty()) {
145
0
        DCHECK(missing_versions.size() % 2 == 0);
146
0
        LOG(WARNING) << "There are missed versions among rowsets. "
147
0
                     << "total missed version size: " << missing_versions.size() / 2
148
0
                     << " first missed version prev rowset verison=" << missing_versions[0]
149
0
                     << ", first missed version next rowset version=" << missing_versions[1]
150
0
                     << ", tablet=" << _tablet->tablet_id();
151
0
    }
152
153
10
    int64_t max_score = config::cumulative_compaction_max_deltas;
154
10
    auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage();
155
10
    bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit() * 0.8;
156
10
    if (tablet()->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || memory_usage_high) {
157
0
        max_score = std::max(config::cumulative_compaction_max_deltas /
158
0
                                     config::cumulative_compaction_max_deltas_factor,
159
0
                             config::cumulative_compaction_min_deltas + 1);
160
0
    }
161
162
10
    size_t compaction_score = 0;
163
10
    tablet()->cumulative_compaction_policy()->pick_input_rowsets(
164
10
            tablet(), candidate_rowsets, max_score, config::cumulative_compaction_min_deltas,
165
10
            &_input_rowsets, &_last_delete_version, &compaction_score,
166
10
            _allow_delete_in_cumu_compaction);
167
168
    // Cumulative compaction will process with at least 1 rowset.
169
    // So when there is no rowset being chosen, we should return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>():
170
10
    if (_input_rowsets.empty()) {
171
0
        if (_last_delete_version.first != -1) {
172
            // we meet a delete version, should increase the cumulative point to let base compaction handle the delete version.
173
            // plus 1 to skip the delete version.
174
            // NOTICE: after that, the cumulative point may be larger than max version of this tablet, but it doesn't matter.
175
0
            tablet()->set_cumulative_layer_point(_last_delete_version.first + 1);
176
0
            return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>(
177
0
                    "_last_delete_version.first not equal to -1");
178
0
        }
179
180
        // we did not meet any delete version. which means compaction_score is not enough to do cumulative compaction.
181
        // We should wait until there are more rowsets to come, and keep the cumulative point unchanged.
182
        // But in order to avoid the stall of compaction because no new rowset arrives later, we should increase
183
        // the cumulative point after waiting for a long time, to ensure that the base compaction can continue.
184
185
        // check both last success time of base and cumulative compaction
186
0
        int64_t now = UnixMillis();
187
0
        int64_t last_cumu = tablet()->last_cumu_compaction_success_time();
188
0
        int64_t last_base = tablet()->last_base_compaction_success_time();
189
0
        if (last_cumu != 0 || last_base != 0) {
190
0
            int64_t interval_threshold = config::pick_rowset_to_compact_interval_sec * 1000;
191
0
            int64_t cumu_interval = now - last_cumu;
192
0
            int64_t base_interval = now - last_base;
193
0
            if (cumu_interval > interval_threshold && base_interval > interval_threshold) {
194
                // before increasing cumulative point, we should make sure all rowsets are non-overlapping.
195
                // if at least one rowset is overlapping, we should compact them first.
196
0
                for (auto& rs : candidate_rowsets) {
197
0
                    if (rs->rowset_meta()->is_segments_overlapping()) {
198
0
                        _input_rowsets = candidate_rowsets;
199
0
                        return Status::OK();
200
0
                    }
201
0
                }
202
203
                // all candidate rowsets are non-overlapping, increase the cumulative point
204
0
                tablet()->set_cumulative_layer_point(candidate_rowsets.back()->start_version() + 1);
205
0
            }
206
0
        } else {
207
            // init the compaction success time for first time
208
0
            if (last_cumu == 0) {
209
0
                tablet()->set_last_cumu_compaction_success_time(now);
210
0
            }
211
212
0
            if (last_base == 0) {
213
0
                tablet()->set_last_base_compaction_success_time(now);
214
0
            }
215
0
        }
216
217
0
        return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("_input_rowsets is empty");
218
0
    }
219
220
10
    return Status::OK();
221
10
}
222
223
} // namespace doris