Coverage Report

Created: 2026-03-13 14:44

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/service/http/action/compaction_action.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "service/http/action/compaction_action.h"
19
20
// IWYU pragma: no_include <bits/chrono.h>
21
#include <chrono> // IWYU pragma: keep
22
#include <exception>
23
#include <future>
24
#include <memory>
25
#include <mutex>
26
#include <sstream>
27
#include <string>
28
#include <thread>
29
#include <utility>
30
31
#include "absl/strings/substitute.h"
32
#include "common/logging.h"
33
#include "common/metrics/doris_metrics.h"
34
#include "common/status.h"
35
#include "service/http/http_channel.h"
36
#include "service/http/http_headers.h"
37
#include "service/http/http_request.h"
38
#include "service/http/http_status.h"
39
#include "storage/compaction/base_compaction.h"
40
#include "storage/compaction/cumulative_compaction.h"
41
#include "storage/compaction/cumulative_compaction_policy.h"
42
#include "storage/compaction/cumulative_compaction_time_series_policy.h"
43
#include "storage/compaction/full_compaction.h"
44
#include "storage/compaction/single_replica_compaction.h"
45
#include "storage/olap_define.h"
46
#include "storage/storage_engine.h"
47
#include "storage/tablet/tablet_manager.h"
48
#include "util/stopwatch.hpp"
49
50
namespace doris {
51
using namespace ErrorCode;
52
53
namespace {
54
55
constexpr std::string_view HEADER_JSON = "application/json";
56
57
} // namespace
58
59
CompactionAction::CompactionAction(CompactionActionType ctype, ExecEnv* exec_env,
60
                                   StorageEngine& engine, TPrivilegeHier::type hier,
61
                                   TPrivilegeType::type ptype)
62
18
        : HttpHandlerWithAuth(exec_env, hier, ptype), _engine(engine), _compaction_type(ctype) {}
63
64
/// check param and fetch tablet_id & table_id from req
65
0
static Status _check_param(HttpRequest* req, uint64_t* tablet_id, uint64_t* table_id) {
66
    // req tablet id and table id, we have to set only one of them.
67
0
    const auto& req_tablet_id = req->param(TABLET_ID_KEY);
68
0
    const auto& req_table_id = req->param(TABLE_ID_KEY);
69
0
    if (req_tablet_id.empty()) {
70
0
        if (req_table_id.empty()) {
71
            // both tablet id and table id are empty, return error.
72
0
            return Status::InternalError(
73
0
                    "tablet id and table id can not be empty at the same time!");
74
0
        } else {
75
0
            try {
76
0
                *table_id = std::stoull(req_table_id);
77
0
            } catch (const std::exception& e) {
78
0
                return Status::InternalError("convert table_id failed, {}", e.what());
79
0
            }
80
0
            return Status::OK();
81
0
        }
82
0
    } else if (req_table_id.empty()) {
83
0
        try {
84
0
            *tablet_id = std::stoull(req_tablet_id);
85
0
        } catch (const std::exception& e) {
86
0
            return Status::InternalError("convert tablet_id failed, {}", e.what());
87
0
        }
88
0
        return Status::OK();
89
0
    } else {
90
        // both tablet id and table id are not empty, return err.
91
0
        return Status::InternalError("tablet id and table id can not be set at the same time!");
92
0
    }
93
0
}
94
95
/// retrieve specific id from req
96
0
static Status _check_param(HttpRequest* req, uint64_t* id_param, const std::string param_name) {
97
0
    const auto& req_id_param = req->param(param_name);
98
0
    if (!req_id_param.empty()) {
99
0
        try {
100
0
            *id_param = std::stoull(req_id_param);
101
0
        } catch (const std::exception& e) {
102
0
            return Status::InternalError("convert {} failed, {}", param_name, e.what());
103
0
        }
104
0
    }
105
106
0
    return Status::OK();
107
0
}
108
109
// for viewing the compaction status
110
0
Status CompactionAction::_handle_show_compaction(HttpRequest* req, std::string* json_result) {
111
0
    uint64_t tablet_id = 0;
112
    // check & retrieve tablet_id from req if it contains
113
0
    RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id, TABLET_ID_KEY),
114
0
                                   "check param failed");
115
0
    if (tablet_id == 0) {
116
0
        return Status::InternalError("check param failed: missing tablet_id");
117
0
    }
118
119
0
    TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id);
120
0
    if (tablet == nullptr) {
121
0
        return Status::NotFound("Tablet not found. tablet_id={}", tablet_id);
122
0
    }
123
124
0
    tablet->get_compaction_status(json_result);
125
0
    return Status::OK();
126
0
}
127
128
0
Status CompactionAction::_handle_run_compaction(HttpRequest* req, std::string* json_result) {
129
    // 1. param check
130
    // check req_tablet_id or req_table_id is not empty and can not be set together.
131
0
    uint64_t tablet_id = 0;
132
0
    uint64_t table_id = 0;
133
0
    RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id, &table_id), "check param failed");
134
135
    // check compaction_type equals 'base' or 'cumulative'
136
0
    std::string compaction_type = req->param(PARAM_COMPACTION_TYPE);
137
0
    if (compaction_type != PARAM_COMPACTION_BASE &&
138
0
        compaction_type != PARAM_COMPACTION_CUMULATIVE &&
139
0
        compaction_type != PARAM_COMPACTION_FULL) {
140
0
        return Status::NotSupported("The compaction type '{}' is not supported", compaction_type);
141
0
    }
142
143
    // "remote" = "true" means tablet should do single replica compaction to fetch rowset from peer
144
0
    bool fetch_from_remote = false;
145
0
    std::string param_remote = req->param(PARAM_COMPACTION_REMOTE);
146
0
    if (param_remote == "true") {
147
0
        fetch_from_remote = true;
148
0
    } else if (!param_remote.empty() && param_remote != "false") {
149
0
        return Status::NotSupported("The remote = '{}' is not supported", param_remote);
150
0
    }
151
152
0
    if (tablet_id == 0 && table_id != 0) {
153
0
        std::vector<TabletSharedPtr> tablet_vec = _engine.tablet_manager()->get_all_tablet(
154
0
                [table_id](Tablet* tablet) -> bool { return tablet->get_table_id() == table_id; });
155
0
        for (const auto& tablet : tablet_vec) {
156
0
            tablet->set_last_full_compaction_schedule_time(UnixMillis());
157
0
            RETURN_IF_ERROR(
158
0
                    _engine.submit_compaction_task(tablet, CompactionType::FULL_COMPACTION, false));
159
0
        }
160
0
    } else {
161
        // 2. fetch the tablet by tablet_id
162
0
        TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id);
163
0
        if (tablet == nullptr) {
164
0
            return Status::NotFound("Tablet not found. tablet_id={}", tablet_id);
165
0
        }
166
167
0
        if (fetch_from_remote && !tablet->should_fetch_from_peer()) {
168
0
            return Status::NotSupported("tablet should do compaction locally");
169
0
        }
170
0
        DBUG_EXECUTE_IF("CompactionAction._handle_run_compaction.submit_cumu_task", {
171
0
            RETURN_IF_ERROR(_engine.submit_compaction_task(
172
0
                    tablet, CompactionType::CUMULATIVE_COMPACTION, false));
173
0
            LOG(INFO) << "Manual debug compaction task is successfully triggered";
174
0
            *json_result =
175
0
                    R"({"status": "Success", "msg": "debug compaction task is successfully triggered. Table id: )" +
176
0
                    std::to_string(table_id) + ". Tablet id: " + std::to_string(tablet_id) + "\"}";
177
0
            return Status::OK();
178
0
        })
179
180
        // 3. execute compaction task
181
0
        std::packaged_task<Status()> task([this, tablet, compaction_type, fetch_from_remote]() {
182
0
            return _execute_compaction_callback(tablet, compaction_type, fetch_from_remote);
183
0
        });
184
0
        std::future<Status> future_obj = task.get_future();
185
0
        std::thread(std::move(task)).detach();
186
187
        // 更新schedule_time
188
0
        if (compaction_type == PARAM_COMPACTION_BASE) {
189
0
            tablet->set_last_base_compaction_schedule_time(UnixMillis());
190
0
        } else if (compaction_type == PARAM_COMPACTION_CUMULATIVE) {
191
0
            tablet->set_last_cumu_compaction_schedule_time(UnixMillis());
192
0
        } else if (compaction_type == PARAM_COMPACTION_FULL) {
193
0
            tablet->set_last_full_compaction_schedule_time(UnixMillis());
194
0
        }
195
196
        // 4. wait for result for 2 seconds by async
197
0
        std::future_status status = future_obj.wait_for(std::chrono::seconds(2));
198
0
        if (status == std::future_status::ready) {
199
            // fetch execute result
200
0
            Status olap_status = future_obj.get();
201
0
            if (!olap_status.ok()) {
202
0
                return olap_status;
203
0
            }
204
0
        } else {
205
0
            LOG(INFO) << "Manual compaction task is timeout for waiting "
206
0
                      << (status == std::future_status::timeout);
207
0
        }
208
0
    }
209
0
    LOG(INFO) << "Manual compaction task is successfully triggered";
210
0
    *json_result =
211
0
            R"({"status": "Success", "msg": "compaction task is successfully triggered. Table id: )" +
212
0
            std::to_string(table_id) + ". Tablet id: " + std::to_string(tablet_id) + "\"}";
213
0
    return Status::OK();
214
0
}
215
216
0
Status CompactionAction::_handle_run_status_compaction(HttpRequest* req, std::string* json_result) {
217
0
    uint64_t tablet_id = 0;
218
    // check & retrieve tablet_id from req if it contains
219
0
    RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id, TABLET_ID_KEY),
220
0
                                   "check param failed");
221
222
0
    if (tablet_id == 0) {
223
        // overall compaction status
224
0
        _engine.get_compaction_status_json(json_result);
225
0
        return Status::OK();
226
0
    } else {
227
        // fetch the tablet by tablet_id
228
0
        TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id);
229
0
        if (tablet == nullptr) {
230
0
            LOG(WARNING) << "invalid argument.tablet_id:" << tablet_id;
231
0
            return Status::InternalError("fail to get {}", tablet_id);
232
0
        }
233
234
0
        std::string json_template = R"({
235
0
            "status" : "Success",
236
0
            "run_status" : $0,
237
0
            "msg" : "$1",
238
0
            "tablet_id" : $2,
239
0
            "compact_type" : "$3"
240
0
        })";
241
242
0
        std::string msg = "compaction task for this tablet is not running";
243
0
        std::string compaction_type;
244
0
        bool run_status = false;
245
246
0
        {
247
            // Full compaction holds both base compaction lock and cumu compaction lock.
248
            // So we can not judge if full compaction is running by check these two locks holding.
249
            // Here, we use a variable 'is_full_compaction_running' to check if full compaction is running.
250
0
            if (tablet->is_full_compaction_running()) {
251
0
                msg = "compaction task for this tablet is running";
252
0
                compaction_type = "full";
253
0
                run_status = true;
254
0
                *json_result = absl::Substitute(json_template, run_status, msg, tablet_id,
255
0
                                                compaction_type);
256
0
                return Status::OK();
257
0
            }
258
0
        }
259
260
0
        {
261
            // use try lock to check this tablet is running cumulative compaction
262
0
            std::unique_lock<std::mutex> lock_cumulative(tablet->get_cumulative_compaction_lock(),
263
0
                                                         std::try_to_lock);
264
0
            if (!lock_cumulative.owns_lock()) {
265
0
                msg = "compaction task for this tablet is running";
266
0
                compaction_type = "cumulative";
267
0
                run_status = true;
268
0
                *json_result = absl::Substitute(json_template, run_status, msg, tablet_id,
269
0
                                                compaction_type);
270
0
                return Status::OK();
271
0
            }
272
0
        }
273
274
0
        {
275
            // use try lock to check this tablet is running base compaction
276
0
            std::unique_lock<std::mutex> lock_base(tablet->get_base_compaction_lock(),
277
0
                                                   std::try_to_lock);
278
0
            if (!lock_base.owns_lock()) {
279
0
                msg = "compaction task for this tablet is running";
280
0
                compaction_type = "base";
281
0
                run_status = true;
282
0
                *json_result = absl::Substitute(json_template, run_status, msg, tablet_id,
283
0
                                                compaction_type);
284
0
                return Status::OK();
285
0
            }
286
0
        }
287
        // not running any compaction
288
0
        *json_result = absl::Substitute(json_template, run_status, msg, tablet_id, compaction_type);
289
0
        return Status::OK();
290
0
    }
291
0
}
292
293
Status CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet,
294
                                                      const std::string& compaction_type,
295
0
                                                      bool fetch_from_remote) {
296
0
    MonotonicStopWatch timer;
297
0
    timer.start();
298
299
0
    std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy =
300
0
            CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
301
0
                    tablet->tablet_meta()->compaction_policy());
302
0
    if (tablet->get_cumulative_compaction_policy() == nullptr) {
303
0
        tablet->set_cumulative_compaction_policy(cumulative_compaction_policy);
304
0
    }
305
0
    Status res = Status::OK();
306
0
    auto do_compact = [](Compaction& compaction) {
307
0
        RETURN_IF_ERROR(compaction.prepare_compact());
308
0
        return compaction.execute_compact();
309
0
    };
310
0
    if (compaction_type == PARAM_COMPACTION_BASE) {
311
0
        BaseCompaction base_compaction(_engine, tablet);
312
0
        res = do_compact(base_compaction);
313
0
        if (!res) {
314
0
            if (!res.is<BE_NO_SUITABLE_VERSION>()) {
315
0
                DorisMetrics::instance()->base_compaction_request_failed->increment(1);
316
0
            }
317
0
        }
318
0
    } else if (compaction_type == PARAM_COMPACTION_CUMULATIVE) {
319
0
        if (fetch_from_remote) {
320
0
            SingleReplicaCompaction single_compaction(_engine, tablet,
321
0
                                                      CompactionType::CUMULATIVE_COMPACTION);
322
0
            res = do_compact(single_compaction);
323
0
            if (!res) {
324
0
                LOG(WARNING) << "failed to do single compaction. res=" << res
325
0
                             << ", table=" << tablet->tablet_id();
326
0
            }
327
0
        } else {
328
0
            CumulativeCompaction cumulative_compaction(_engine, tablet);
329
0
            res = do_compact(cumulative_compaction);
330
0
            if (!res) {
331
0
                if (res.is<CUMULATIVE_NO_SUITABLE_VERSION>()) {
332
                    // Ignore this error code.
333
0
                    VLOG_NOTICE
334
0
                            << "failed to init cumulative compaction due to no suitable version,"
335
0
                            << "tablet=" << tablet->tablet_id();
336
0
                } else {
337
0
                    DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1);
338
0
                    LOG(WARNING) << "failed to do cumulative compaction. res=" << res
339
0
                                 << ", table=" << tablet->tablet_id();
340
0
                }
341
0
            }
342
0
        }
343
0
    } else if (compaction_type == PARAM_COMPACTION_FULL) {
344
0
        FullCompaction full_compaction(_engine, tablet);
345
0
        res = do_compact(full_compaction);
346
0
        if (!res) {
347
0
            if (res.is<FULL_NO_SUITABLE_VERSION>()) {
348
                // Ignore this error code.
349
0
                VLOG_NOTICE << "failed to init full compaction due to no suitable version,"
350
0
                            << "tablet=" << tablet->tablet_id();
351
0
            } else {
352
0
                LOG(WARNING) << "failed to do full compaction. res=" << res
353
0
                             << ", table=" << tablet->tablet_id();
354
0
            }
355
0
        }
356
0
    }
357
0
    timer.stop();
358
0
    LOG(INFO) << "Manual compaction task finish, status=" << res
359
0
              << ", compaction_use_time=" << timer.elapsed_time() / 1000000 << "ms";
360
0
    return res;
361
0
}
362
363
0
void CompactionAction::handle(HttpRequest* req) {
364
0
    req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.data());
365
366
0
    if (_compaction_type == CompactionActionType::SHOW_INFO) {
367
0
        std::string json_result;
368
0
        Status st = _handle_show_compaction(req, &json_result);
369
0
        if (!st.ok()) {
370
0
            HttpChannel::send_reply(req, HttpStatus::OK, st.to_json());
371
0
        } else {
372
0
            HttpChannel::send_reply(req, HttpStatus::OK, json_result);
373
0
        }
374
0
    } else if (_compaction_type == CompactionActionType::RUN_COMPACTION) {
375
0
        std::string json_result;
376
0
        Status st = _handle_run_compaction(req, &json_result);
377
0
        if (!st.ok()) {
378
0
            HttpChannel::send_reply(req, HttpStatus::OK, st.to_json());
379
0
        } else {
380
0
            HttpChannel::send_reply(req, HttpStatus::OK, json_result);
381
0
        }
382
0
    } else {
383
0
        std::string json_result;
384
0
        Status st = _handle_run_status_compaction(req, &json_result);
385
0
        if (!st.ok()) {
386
0
            HttpChannel::send_reply(req, HttpStatus::OK, st.to_json());
387
0
        } else {
388
0
            HttpChannel::send_reply(req, HttpStatus::OK, json_result);
389
0
        }
390
0
    }
391
0
}
392
393
} // end namespace doris