Coverage Report

Created: 2026-05-14 07:56

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_compaction_action.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_compaction_action.h"
19
20
// IWYU pragma: no_include <bits/chrono.h>
21
#include <chrono> // IWYU pragma: keep
22
#include <exception>
23
#include <future>
24
#include <memory>
25
#include <mutex>
26
#include <sstream>
27
#include <string>
28
#include <thread>
29
#include <utility>
30
31
#include "absl/strings/substitute.h"
32
#include "cloud/cloud_base_compaction.h"
33
#include "cloud/cloud_compaction_action.h"
34
#include "cloud/cloud_cumulative_compaction.h"
35
#include "cloud/cloud_full_compaction.h"
36
#include "cloud/cloud_tablet.h"
37
#include "cloud/cloud_tablet_mgr.h"
38
#include "common/logging.h"
39
#include "common/metrics/doris_metrics.h"
40
#include "common/status.h"
41
#include "service/http/http_channel.h"
42
#include "service/http/http_headers.h"
43
#include "service/http/http_request.h"
44
#include "service/http/http_status.h"
45
#include "storage/compaction/base_compaction.h"
46
#include "storage/compaction/cumulative_compaction.h"
47
#include "storage/compaction/cumulative_compaction_policy.h"
48
#include "storage/compaction/cumulative_compaction_time_series_policy.h"
49
#include "storage/compaction/full_compaction.h"
50
#include "storage/compaction_task_tracker.h"
51
#include "storage/olap_define.h"
52
#include "storage/storage_engine.h"
53
#include "storage/tablet/tablet_manager.h"
54
#include "util/stopwatch.hpp"
55
56
namespace doris {
57
using namespace ErrorCode;
58
59
namespace {}
60
61
const static std::string HEADER_JSON = "application/json";
62
63
CloudCompactionAction::CloudCompactionAction(CompactionActionType ctype, ExecEnv* exec_env,
64
                                             CloudStorageEngine& engine, TPrivilegeHier::type hier,
65
                                             TPrivilegeType::type ptype)
66
3
        : HttpHandlerWithAuth(exec_env, hier, ptype), _engine(engine), _compaction_type(ctype) {}
67
68
/// check param and fetch tablet_id & table_id from req
69
587
static Status _check_param(HttpRequest* req, uint64_t* tablet_id, uint64_t* table_id) {
70
    // req tablet id and table id, we have to set only one of them.
71
587
    std::string req_tablet_id = req->param(TABLET_ID_KEY);
72
587
    std::string req_table_id = req->param(TABLE_ID_KEY);
73
587
    if (req_tablet_id == "") {
74
100
        if (req_table_id == "") {
75
            // both tablet id and table id are empty, return error.
76
0
            return Status::InternalError(
77
0
                    "tablet id and table id can not be empty at the same time!");
78
100
        } else {
79
100
            try {
80
100
                *table_id = std::stoull(req_table_id);
81
100
            } catch (const std::exception& e) {
82
0
                return Status::InternalError("convert table_id failed, {}", e.what());
83
0
            }
84
100
            return Status::OK();
85
100
        }
86
487
    } else {
87
487
        if (req_table_id == "") {
88
487
            try {
89
487
                *tablet_id = std::stoull(req_tablet_id);
90
487
            } catch (const std::exception& e) {
91
0
                return Status::InternalError("convert tablet_id failed, {}", e.what());
92
0
            }
93
487
            return Status::OK();
94
487
        } else {
95
            // both tablet id and table id are not empty, return err.
96
0
            return Status::InternalError("tablet id and table id can not be set at the same time!");
97
0
        }
98
487
    }
99
587
}
100
101
/// retrieve specific id from req
102
3.78k
static Status _check_param(HttpRequest* req, uint64_t* id_param, const std::string param_name) {
103
3.78k
    const auto& req_id_param = req->param(param_name);
104
3.78k
    if (!req_id_param.empty()) {
105
3.78k
        try {
106
3.78k
            *id_param = std::stoull(req_id_param);
107
3.78k
        } catch (const std::exception& e) {
108
0
            return Status::InternalError("convert {} failed, {}", param_name, e.what());
109
0
        }
110
3.78k
    }
111
112
3.78k
    return Status::OK();
113
3.78k
}
114
115
// for viewing the compaction status
116
2.90k
Status CloudCompactionAction::_handle_show_compaction(HttpRequest* req, std::string* json_result) {
117
2.90k
    uint64_t tablet_id = 0;
118
2.90k
    RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id, TABLET_ID_KEY),
119
2.90k
                                   "check param failed");
120
2.90k
    if (tablet_id == 0) {
121
0
        return Status::InternalError("check param failed: missing tablet_id");
122
0
    }
123
124
2.90k
    LOG(INFO) << "begin to handle show compaction, tablet id: " << tablet_id;
125
126
    //TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id);
127
2.90k
    CloudTabletSPtr tablet = DORIS_TRY(_engine.tablet_mgr().get_tablet(tablet_id));
128
2.90k
    if (tablet == nullptr) {
129
0
        return Status::NotFound("Tablet not found. tablet_id={}", tablet_id);
130
0
    }
131
132
2.90k
    tablet->get_compaction_status(json_result);
133
2.90k
    LOG(INFO) << "finished to handle show compaction, tablet id: " << tablet_id;
134
2.90k
    return Status::OK();
135
2.90k
}
136
137
587
Status CloudCompactionAction::_handle_run_compaction(HttpRequest* req, std::string* json_result) {
138
    // 1. param check
139
    // check req_tablet_id or req_table_id is not empty and can not be set together.
140
587
    uint64_t tablet_id = 0;
141
587
    uint64_t table_id = 0;
142
587
    RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id, &table_id), "check param failed");
143
587
    LOG(INFO) << "begin to handle run compaction, tablet id: " << tablet_id
144
587
              << " table id: " << table_id;
145
146
    // check compaction_type equals 'base' or 'cumulative'
147
587
    auto& compaction_type = req->param(PARAM_COMPACTION_TYPE);
148
587
    if (compaction_type != PARAM_COMPACTION_BASE &&
149
587
        compaction_type != PARAM_COMPACTION_CUMULATIVE &&
150
587
        compaction_type != PARAM_COMPACTION_FULL) {
151
0
        return Status::NotSupported("The compaction type '{}' is not supported", compaction_type);
152
0
    }
153
587
    bool sync_delete_bitmap = compaction_type != PARAM_COMPACTION_FULL;
154
587
    CloudTabletSPtr tablet =
155
587
            DORIS_TRY(_engine.tablet_mgr().get_tablet(tablet_id, false, sync_delete_bitmap));
156
487
    if (tablet == nullptr) {
157
0
        return Status::NotFound("Tablet not found. tablet_id={}", tablet_id);
158
0
    }
159
160
487
    if (compaction_type == PARAM_COMPACTION_BASE) {
161
1
        tablet->set_last_base_compaction_schedule_time(UnixMillis());
162
486
    } else if (compaction_type == PARAM_COMPACTION_CUMULATIVE) {
163
392
        tablet->set_last_cumu_compaction_schedule_time(UnixMillis());
164
392
    } else if (compaction_type == PARAM_COMPACTION_FULL) {
165
94
        tablet->set_last_full_compaction_schedule_time(UnixMillis());
166
94
    }
167
168
487
    LOG(INFO) << "manual submit compaction task, tablet id: " << tablet_id
169
487
              << " table id: " << table_id;
170
    // 3. submit compaction task (trigger_method=1 for MANUAL)
171
487
    RETURN_IF_ERROR(_engine.submit_compaction_task(
172
487
            tablet,
173
487
            compaction_type == PARAM_COMPACTION_BASE         ? CompactionType::BASE_COMPACTION
174
487
            : compaction_type == PARAM_COMPACTION_CUMULATIVE ? CompactionType::CUMULATIVE_COMPACTION
175
487
                                                             : CompactionType::FULL_COMPACTION,
176
487
            /*trigger_method=*/1));
177
178
487
    LOG(INFO) << "Manual compaction task is successfully triggered, tablet id: " << tablet_id
179
433
              << " table id: " << table_id;
180
433
    *json_result =
181
433
            R"({"status": "Success", "msg": "compaction task is successfully triggered. Table id: )" +
182
433
            std::to_string(table_id) + ". Tablet id: " + std::to_string(tablet_id) + "\"}";
183
433
    return Status::OK();
184
487
}
185
186
Status CloudCompactionAction::_handle_run_status_compaction(HttpRequest* req,
187
877
                                                            std::string* json_result) {
188
877
    uint64_t tablet_id = 0;
189
877
    RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id, TABLET_ID_KEY),
190
877
                                   "check param failed");
191
877
    LOG(INFO) << "begin to handle run status compaction, tablet id: " << tablet_id;
192
193
877
    if (tablet_id == 0) {
194
        // overall compaction status
195
0
        RETURN_IF_ERROR(_engine.get_compaction_status_json(json_result));
196
877
    } else {
197
877
        std::string json_template = R"({
198
877
            "status" : "Success",
199
877
            "run_status" : $0,
200
877
            "msg" : "$1",
201
877
            "tablet_id" : $2,
202
877
            "compact_type" : "$3"
203
877
        })";
204
205
877
        std::string msg = "compaction task for this tablet is not running";
206
877
        std::string compaction_type;
207
877
        bool run_status = false;
208
209
877
        if (_engine.has_cumu_compaction(tablet_id)) {
210
87
            msg = "compaction task for this tablet is running";
211
87
            compaction_type = "cumulative";
212
87
            run_status = true;
213
87
            *json_result =
214
87
                    absl::Substitute(json_template, run_status, msg, tablet_id, compaction_type);
215
87
            return Status::OK();
216
87
        }
217
218
790
        if (_engine.has_base_compaction(tablet_id)) {
219
1
            msg = "compaction task for this tablet is running";
220
1
            compaction_type = "base";
221
1
            run_status = true;
222
1
            *json_result =
223
1
                    absl::Substitute(json_template, run_status, msg, tablet_id, compaction_type);
224
1
            return Status::OK();
225
1
        }
226
227
789
        if (_engine.has_full_compaction(tablet_id)) {
228
34
            msg = "compaction task for this tablet is running";
229
34
            compaction_type = "full";
230
34
            run_status = true;
231
34
            *json_result =
232
34
                    absl::Substitute(json_template, run_status, msg, tablet_id, compaction_type);
233
34
            return Status::OK();
234
34
        }
235
        // not running any compaction
236
755
        *json_result = absl::Substitute(json_template, run_status, msg, tablet_id, compaction_type);
237
755
    }
238
877
    LOG(INFO) << "finished to handle run status compaction, tablet id: " << tablet_id;
239
755
    return Status::OK();
240
877
}
241
242
4.37k
void CloudCompactionAction::handle(HttpRequest* req) {
243
4.37k
    req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str());
244
245
4.37k
    if (_compaction_type == CompactionActionType::SHOW_INFO) {
246
2.90k
        std::string json_result;
247
2.90k
        Status st = _handle_show_compaction(req, &json_result);
248
2.90k
        if (!st.ok()) {
249
0
            HttpChannel::send_reply(req, HttpStatus::OK, st.to_json());
250
2.90k
        } else {
251
2.90k
            HttpChannel::send_reply(req, HttpStatus::OK, json_result);
252
2.90k
        }
253
2.90k
    } else if (_compaction_type == CompactionActionType::RUN_COMPACTION) {
254
587
        std::string json_result;
255
587
        Status st = _handle_run_compaction(req, &json_result);
256
587
        if (!st.ok()) {
257
154
            HttpChannel::send_reply(req, HttpStatus::OK, st.to_json());
258
433
        } else {
259
433
            HttpChannel::send_reply(req, HttpStatus::OK, json_result);
260
433
        }
261
877
    } else {
262
877
        std::string json_result;
263
877
        Status st = _handle_run_status_compaction(req, &json_result);
264
877
        if (!st.ok()) {
265
0
            HttpChannel::send_reply(req, HttpStatus::OK, st.to_json());
266
877
        } else {
267
877
            HttpChannel::send_reply(req, HttpStatus::OK, json_result);
268
877
        }
269
877
    }
270
4.37k
}
271
272
} // end namespace doris