Coverage Report

Created: 2024-11-21 16:04

/root/doris/be/src/olap/cumulative_compaction.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "olap/cumulative_compaction.h"
19
20
#include <stddef.h>
21
#include <stdint.h>
22
23
#include <memory>
24
#include <mutex>
25
#include <ostream>
26
27
#include "common/config.h"
28
#include "common/logging.h"
29
#include "olap/cumulative_compaction_policy.h"
30
#include "olap/olap_define.h"
31
#include "olap/rowset/rowset_meta.h"
32
#include "runtime/thread_context.h"
33
#include "util/doris_metrics.h"
34
#include "util/time.h"
35
#include "util/trace.h"
36
37
namespace doris {
38
using namespace ErrorCode;
39
40
CumulativeCompaction::CumulativeCompaction(const TabletSharedPtr& tablet)
41
14
        : Compaction(tablet, "CumulativeCompaction:" + std::to_string(tablet->tablet_id())) {}
42
43
14
CumulativeCompaction::~CumulativeCompaction() = default;
44
45
11
Status CumulativeCompaction::prepare_compact() {
46
11
    if (!_tablet->init_succeeded()) {
47
0
        return Status::Error<CUMULATIVE_INVALID_PARAMETERS, false>("_tablet init failed");
48
0
    }
49
50
11
    std::unique_lock<std::mutex> lock(_tablet->get_cumulative_compaction_lock(), std::try_to_lock);
51
11
    if (!lock.owns_lock()) {
52
0
        return Status::Error<TRY_LOCK_FAILED, false>(
53
0
                "The tablet is under cumulative compaction. tablet={}", _tablet->tablet_id());
54
0
    }
55
56
    // 1. calculate cumulative point
57
11
    _tablet->calculate_cumulative_point();
58
11
    VLOG_CRITICAL << "after calculate, current cumulative point is "
59
0
                  << _tablet->cumulative_layer_point() << ", tablet=" << _tablet->tablet_id();
60
61
    // 2. pick rowsets to compact
62
11
    RETURN_IF_ERROR(pick_rowsets_to_compact());
63
10
    COUNTER_UPDATE(_input_rowsets_counter, _input_rowsets.size());
64
65
10
    return Status::OK();
66
11
}
67
68
0
Status CumulativeCompaction::execute_compact_impl() {
69
0
    std::unique_lock<std::mutex> lock(_tablet->get_cumulative_compaction_lock(), std::try_to_lock);
70
0
    if (!lock.owns_lock()) {
71
0
        return Status::Error<TRY_LOCK_FAILED, false>(
72
0
                "The tablet is under cumulative compaction. tablet={}", _tablet->tablet_id());
73
0
    }
74
75
0
    SCOPED_ATTACH_TASK(_mem_tracker);
76
77
    // 3. do cumulative compaction, merge rowsets
78
0
    int64_t permits = get_compaction_permits();
79
0
    RETURN_IF_ERROR(do_compaction(permits));
80
81
    // 4. set state to success
82
0
    _state = CompactionState::SUCCESS;
83
84
    // 5. set cumulative level
85
0
    if (_tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
86
0
        _tablet->cumulative_compaction_policy()->update_compaction_level(
87
0
                _tablet.get(), _input_rowsets, _output_rowset);
88
0
    }
89
90
    // 6. set cumulative point
91
0
    _tablet->cumulative_compaction_policy()->update_cumulative_point(
92
0
            _tablet.get(), _input_rowsets, _output_rowset, _last_delete_version);
93
0
    VLOG_CRITICAL << "after cumulative compaction, current cumulative point is "
94
0
                  << _tablet->cumulative_layer_point() << ", tablet=" << _tablet->tablet_id();
95
96
    // 7. add metric to cumulative compaction
97
0
    DorisMetrics::instance()->cumulative_compaction_deltas_total->increment(_input_rowsets.size());
98
0
    DorisMetrics::instance()->cumulative_compaction_bytes_total->increment(_input_rowsets_size);
99
100
0
    return Status::OK();
101
0
}
102
103
11
Status CumulativeCompaction::pick_rowsets_to_compact() {
104
11
    auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction();
105
11
    if (candidate_rowsets.empty()) {
106
1
        return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("candidate_rowsets is empty");
107
1
    }
108
109
    // candidate_rowsets may not be continuous
110
    // So we need to choose the longest continuous path from it.
111
10
    std::vector<Version> missing_versions;
112
10
    RETURN_IF_ERROR(find_longest_consecutive_version(&candidate_rowsets, &missing_versions));
113
10
    if (!missing_versions.empty()) {
114
0
        DCHECK(missing_versions.size() == 2);
115
0
        LOG(WARNING) << "There are missed versions among rowsets. "
116
0
                     << "prev rowset verison=" << missing_versions[0]
117
0
                     << ", next rowset version=" << missing_versions[1]
118
0
                     << ", tablet=" << _tablet->tablet_id();
119
0
    }
120
121
10
    int64_t max_score = config::cumulative_compaction_max_deltas;
122
10
    auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage();
123
10
    bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit() * 0.8;
124
10
    if (_tablet->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || memory_usage_high) {
125
0
        max_score = std::max(config::cumulative_compaction_max_deltas /
126
0
                                     config::cumulative_compaction_max_deltas_factor,
127
0
                             config::cumulative_compaction_min_deltas + 1);
128
0
    }
129
130
10
    size_t compaction_score = 0;
131
10
    _tablet->cumulative_compaction_policy()->pick_input_rowsets(
132
10
            _tablet.get(), candidate_rowsets, max_score, config::cumulative_compaction_min_deltas,
133
10
            &_input_rowsets, &_last_delete_version, &compaction_score,
134
10
            allow_delete_in_cumu_compaction());
135
136
    // Cumulative compaction will process with at least 1 rowset.
137
    // So when there is no rowset being chosen, we should return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>():
138
10
    if (_input_rowsets.empty()) {
139
0
        if (_last_delete_version.first != -1) {
140
            // we meet a delete version, should increase the cumulative point to let base compaction handle the delete version.
141
            // plus 1 to skip the delete version.
142
            // NOTICE: after that, the cumulative point may be larger than max version of this tablet, but it doesn't matter.
143
0
            _tablet->set_cumulative_layer_point(_last_delete_version.first + 1);
144
0
            return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>(
145
0
                    "_last_delete_version.first not equal to -1");
146
0
        }
147
148
        // we did not meet any delete version. which means compaction_score is not enough to do cumulative compaction.
149
        // We should wait until there are more rowsets to come, and keep the cumulative point unchanged.
150
        // But in order to avoid the stall of compaction because no new rowset arrives later, we should increase
151
        // the cumulative point after waiting for a long time, to ensure that the base compaction can continue.
152
153
        // check both last success time of base and cumulative compaction
154
0
        int64_t now = UnixMillis();
155
0
        int64_t last_cumu = _tablet->last_cumu_compaction_success_time();
156
0
        int64_t last_base = _tablet->last_base_compaction_success_time();
157
0
        if (last_cumu != 0 || last_base != 0) {
158
0
            int64_t interval_threshold = config::pick_rowset_to_compact_interval_sec * 1000;
159
0
            int64_t cumu_interval = now - last_cumu;
160
0
            int64_t base_interval = now - last_base;
161
0
            if (cumu_interval > interval_threshold && base_interval > interval_threshold) {
162
                // before increasing cumulative point, we should make sure all rowsets are non-overlapping.
163
                // if at least one rowset is overlapping, we should compact them first.
164
0
                for (auto& rs : candidate_rowsets) {
165
0
                    if (rs->rowset_meta()->is_segments_overlapping()) {
166
0
                        _input_rowsets = candidate_rowsets;
167
0
                        return Status::OK();
168
0
                    }
169
0
                }
170
171
                // all candidate rowsets are non-overlapping, increase the cumulative point
172
0
                _tablet->set_cumulative_layer_point(candidate_rowsets.back()->start_version() + 1);
173
0
            }
174
0
        } else {
175
            // init the compaction success time for first time
176
0
            if (last_cumu == 0) {
177
0
                _tablet->set_last_cumu_compaction_success_time(now);
178
0
            }
179
180
0
            if (last_base == 0) {
181
0
                _tablet->set_last_base_compaction_success_time(now);
182
0
            }
183
0
        }
184
185
0
        return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("_input_rowsets is empty");
186
0
    }
187
188
10
    return Status::OK();
189
10
}
190
191
} // namespace doris