Coverage Report

Created: 2026-06-02 16:32

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/compaction/binlog_compaction.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/compaction/binlog_compaction.h"
19
20
#include <cpp/sync_point.h>
21
22
#include <mutex>
23
#include <string>
24
#include <vector>
25
26
#include "common/logging.h"
27
#include "runtime/thread_context.h"
28
#include "storage/compaction/binlog_compaction_policy.h"
29
#include "storage/rowset/rowset.h"
30
#include "storage/rowset/rowset_meta.h"
31
#include "storage/tablet/tablet.h"
32
#include "util/defer_op.h"
33
#include "util/time.h"
34
#include "util/trace.h"
35
36
namespace doris {
37
using namespace ErrorCode;
38
39
BinlogCompaction::BinlogCompaction(StorageEngine& engine, const TabletSharedPtr& tablet,
40
                                   int8_t compaction_level)
41
0
        : CompactionMixin(engine, tablet,
42
0
                          "BinlogCompaction:" + std::to_string(tablet->tablet_id())),
43
0
          _compaction_level(compaction_level) {}
44
45
0
BinlogCompaction::~BinlogCompaction() = default;
46
47
0
Status BinlogCompaction::prepare_compact() {
48
0
    Status st;
49
0
    Defer defer_set_st([&] {
50
0
        if (!st.ok()) {
51
0
            tablet()->set_last_binlog_compaction_status(st.to_string());
52
0
        }
53
0
    });
54
55
0
    if (!tablet()->init_succeeded()) {
56
0
        st = Status::Error<BINLOG_COMPACTION_INVALID_PARAMETERS, false>("_tablet init failed");
57
0
        return st;
58
0
    }
59
60
0
    std::unique_lock<std::mutex> lock(tablet()->get_binlog_compaction_lock(), std::try_to_lock);
61
0
    if (!lock.owns_lock()) {
62
0
        st = Status::Error<TRY_LOCK_FAILED, false>(
63
0
                "The tablet is under binlog compaction. tablet={}", _tablet->tablet_id());
64
0
        return st;
65
0
    }
66
67
0
    st = pick_rowsets_to_compact();
68
0
    RETURN_IF_ERROR(st);
69
70
0
    COUNTER_UPDATE(_input_rowsets_counter, _input_rowsets.size());
71
72
0
    st = Status::OK();
73
0
    return st;
74
0
}
75
76
0
Status BinlogCompaction::execute_compact() {
77
0
    Status st;
78
0
    Defer defer_set_st([&] {
79
0
        tablet()->set_last_binlog_compaction_status(st.to_string());
80
0
        if (!st.ok()) {
81
0
            tablet()->set_last_binlog_compaction_failure_time(UnixMillis());
82
0
        } else {
83
0
            tablet()->set_last_binlog_compaction_success_time(_compaction_level, UnixMillis());
84
0
        }
85
0
    });
86
87
0
    std::unique_lock<std::mutex> lock(tablet()->get_binlog_compaction_lock(), std::try_to_lock);
88
0
    if (!lock.owns_lock()) {
89
0
        st = Status::Error<TRY_LOCK_FAILED, false>(
90
0
                "The tablet is under binlog compaction. tablet={}", _tablet->tablet_id());
91
0
        return st;
92
0
    }
93
94
0
    SCOPED_ATTACH_TASK(_mem_tracker);
95
96
0
    st = CompactionMixin::execute_compact();
97
0
    RETURN_IF_ERROR(st);
98
99
0
    TEST_SYNC_POINT_RETURN_WITH_VALUE("binlog_compaction::BinlogCompaction::execute_compact",
100
0
                                      Status::OK());
101
102
0
    DCHECK_EQ(_state, CompactionState::SUCCESS);
103
104
0
    st = Status::OK();
105
0
    return st;
106
0
}
107
108
0
Status BinlogCompaction::pick_rowsets_to_compact() {
109
0
    auto candidate_rowsets = tablet()->pick_candidate_rowsets_to_binlog_compaction();
110
0
    if (candidate_rowsets.empty()) {
111
0
        return Status::Error<BINLOG_COMPACTION_NO_SUITABLE_VERSION>("candidate_rowsets is empty");
112
0
    }
113
114
    // candidate_rowsets may not be continuous
115
    // So we need to choose the longest continuous path from it.
116
0
    std::vector<Version> missing_versions;
117
0
    find_longest_consecutive_version(&candidate_rowsets, &missing_versions);
118
0
    if (!missing_versions.empty()) {
119
0
        DCHECK(missing_versions.size() % 2 == 0);
120
0
        LOG(WARNING) << "There are missed versions among binlog rowsets. "
121
0
                     << "total missed version size: " << missing_versions.size() / 2
122
0
                     << ", first missed version prev rowset verison=" << missing_versions[0]
123
0
                     << ", first missed version next rowset version=" << missing_versions[1]
124
0
                     << ", tablet=" << _tablet->tablet_id();
125
0
    }
126
127
0
    tablet()->binlog_compaction_policy()->pick_input_rowsets(tablet(), candidate_rowsets,
128
0
                                                             _compaction_level, &_input_rowsets);
129
130
    // Binlog compaction will process with at least 1 rowset.
131
    // So when there is no rowset being chosen, we should return Status::Error<BINLOG_COMPACTION_NO_SUITABLE_VERSION>():
132
0
    if (_input_rowsets.empty()) {
133
0
        VLOG_DEBUG << "binlog compaction can't get input rowsets, tablet=" << _tablet->tablet_id();
134
0
        return Status::Error<BINLOG_COMPACTION_NO_SUITABLE_VERSION>("_input_rowsets is empty");
135
0
    }
136
137
0
    return Status::OK();
138
0
}
139
140
0
Status BinlogCompaction::modify_rowsets() {
141
0
    std::vector<RowsetSharedPtr> output_rowsets;
142
0
    output_rowsets.push_back(_output_rowset);
143
144
0
    if (_is_ordered_data_compaction &&
145
0
        _compaction_level == BinlogCompactionPolicy::kBinlogCompactionMaxLevel - 1) {
146
0
        _output_rowset->rowset_meta()->set_segments_overlap(OVERLAPPING);
147
0
    }
148
0
    tablet()->binlog_compaction_policy()->update_compaction_level(tablet(), _input_rowsets,
149
0
                                                                  _output_rowset);
150
0
    {
151
0
        std::lock_guard<std::mutex> wrlock_(tablet()->get_rowset_update_lock());
152
0
        std::lock_guard<std::shared_mutex> wrlock(_tablet->get_header_lock());
153
0
        SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
154
0
        RETURN_IF_ERROR(tablet()->modify_row_binlog_rowsets(output_rowsets, _input_rowsets));
155
0
    }
156
0
    {
157
0
        std::shared_lock rlock(_tablet->get_header_lock());
158
0
        tablet()->save_meta();
159
0
    }
160
0
    return Status::OK();
161
0
}
162
163
} // namespace doris