Coverage Report

Created: 2026-06-02 17:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/service/http/action/compaction_action.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "service/http/action/compaction_action.h"
19
20
// IWYU pragma: no_include <bits/chrono.h>
21
#include <chrono> // IWYU pragma: keep
22
#include <exception>
23
#include <future>
24
#include <memory>
25
#include <mutex>
26
#include <sstream>
27
#include <string>
28
#include <thread>
29
#include <utility>
30
31
#include "absl/strings/substitute.h"
32
#include "common/logging.h"
33
#include "common/metrics/doris_metrics.h"
34
#include "common/status.h"
35
#include "service/http/http_channel.h"
36
#include "service/http/http_headers.h"
37
#include "service/http/http_request.h"
38
#include "service/http/http_status.h"
39
#include "storage/compaction/base_compaction.h"
40
#include "storage/compaction/binlog_compaction.h"
41
#include "storage/compaction/cumulative_compaction.h"
42
#include "storage/compaction/cumulative_compaction_policy.h"
43
#include "storage/compaction/cumulative_compaction_time_series_policy.h"
44
#include "storage/compaction/single_replica_compaction.h"
45
#include "storage/compaction_task_tracker.h"
46
#include "storage/olap_define.h"
47
#include "storage/storage_engine.h"
48
#include "storage/tablet/tablet_manager.h"
49
#include "util/stopwatch.hpp"
50
51
namespace doris {
52
using namespace ErrorCode;
53
54
namespace {
55
56
constexpr std::string_view HEADER_JSON = "application/json";
57
58
} // namespace
59
60
CompactionAction::CompactionAction(CompactionActionType ctype, ExecEnv* exec_env,
61
                                   StorageEngine& engine, TPrivilegeHier::type hier,
62
                                   TPrivilegeType::type ptype)
63
34
        : HttpHandlerWithAuth(exec_env, hier, ptype), _engine(engine), _compaction_type(ctype) {}
64
65
/// check param and fetch tablet_id & table_id from req
66
12
static Status _check_param(HttpRequest* req, uint64_t* tablet_id, uint64_t* table_id) {
67
    // req tablet id and table id, we have to set only one of them.
68
12
    const auto& req_tablet_id = req->param(TABLET_ID_KEY);
69
12
    const auto& req_table_id = req->param(TABLE_ID_KEY);
70
12
    if (req_tablet_id.empty()) {
71
2
        if (req_table_id.empty()) {
72
            // both tablet id and table id are empty, return error.
73
1
            return Status::InternalError(
74
1
                    "tablet id and table id can not be empty at the same time!");
75
1
        } else {
76
1
            try {
77
1
                *table_id = std::stoull(req_table_id);
78
1
            } catch (const std::exception& e) {
79
0
                return Status::InternalError("convert table_id failed, {}", e.what());
80
0
            }
81
1
            return Status::OK();
82
1
        }
83
10
    } else if (req_table_id.empty()) {
84
9
        try {
85
9
            *tablet_id = std::stoull(req_tablet_id);
86
9
        } catch (const std::exception& e) {
87
1
            return Status::InternalError("convert tablet_id failed, {}", e.what());
88
1
        }
89
8
        return Status::OK();
90
9
    } else {
91
        // both tablet id and table id are not empty, return err.
92
1
        return Status::InternalError("tablet id and table id can not be set at the same time!");
93
1
    }
94
12
}
95
96
/// retrieve specific id from req
97
4
static Status _check_param(HttpRequest* req, uint64_t* id_param, const std::string param_name) {
98
4
    const auto& req_id_param = req->param(param_name);
99
4
    if (!req_id_param.empty()) {
100
2
        try {
101
2
            *id_param = std::stoull(req_id_param);
102
2
        } catch (const std::exception& e) {
103
0
            return Status::InternalError("convert {} failed, {}", param_name, e.what());
104
0
        }
105
2
    }
106
107
4
    return Status::OK();
108
4
}
109
110
// for viewing the compaction status
111
2
Status CompactionAction::_handle_show_compaction(HttpRequest* req, std::string* json_result) {
112
2
    uint64_t tablet_id = 0;
113
    // check & retrieve tablet_id from req if it contains
114
2
    RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id, TABLET_ID_KEY),
115
2
                                   "check param failed");
116
2
    if (tablet_id == 0) {
117
1
        return Status::InternalError("check param failed: missing tablet_id");
118
1
    }
119
120
1
    TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id);
121
1
    if (tablet == nullptr) {
122
1
        return Status::NotFound("Tablet not found. tablet_id={}", tablet_id);
123
1
    }
124
125
0
    tablet->get_compaction_status(json_result);
126
0
    return Status::OK();
127
1
}
128
129
12
Status CompactionAction::_handle_run_compaction(HttpRequest* req, std::string* json_result) {
130
    // 1. param check
131
    // check req_tablet_id or req_table_id is not empty and can not be set together.
132
12
    uint64_t tablet_id = 0;
133
12
    uint64_t table_id = 0;
134
12
    RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id, &table_id), "check param failed");
135
136
    // check compaction_type
137
9
    std::string compaction_type = req->param(PARAM_COMPACTION_TYPE);
138
9
    if (compaction_type != PARAM_COMPACTION_BASE &&
139
9
        compaction_type != PARAM_COMPACTION_CUMULATIVE &&
140
9
        compaction_type != PARAM_COMPACTION_FULL && compaction_type != PARAM_COMPACTION_BINLOG) {
141
1
        return Status::NotSupported("The compaction type '{}' is not supported", compaction_type);
142
1
    }
143
144
    // "remote" = "true" means tablet should do single replica compaction to fetch rowset from peer
145
8
    bool fetch_from_remote = false;
146
8
    std::string param_remote = req->param(PARAM_COMPACTION_REMOTE);
147
8
    if (param_remote == "true") {
148
0
        fetch_from_remote = true;
149
8
    } else if (!param_remote.empty() && param_remote != "false") {
150
1
        return Status::NotSupported("The remote = '{}' is not supported", param_remote);
151
1
    }
152
153
    // "force" = "true" means skip permit limiter when submitting full compaction to thread pool
154
7
    bool force = false;
155
7
    std::string param_force = req->param(PARAM_COMPACTION_FORCE);
156
7
    if (param_force == "true") {
157
2
        force = true;
158
5
    } else if (!param_force.empty() && param_force != "false") {
159
1
        return Status::NotSupported("The force = '{}' is not supported", param_force);
160
1
    }
161
162
6
    if (tablet_id == 0 && table_id != 0) {
163
1
        std::vector<TabletSharedPtr> tablet_vec = _engine.tablet_manager()->get_all_tablet(
164
1
                [table_id](Tablet* tablet) -> bool { return tablet->get_table_id() == table_id; });
165
1
        for (const auto& tablet : tablet_vec) {
166
0
            tablet->set_last_full_compaction_schedule_time(UnixMillis());
167
0
            RETURN_IF_ERROR(_engine.submit_compaction_task(tablet, CompactionType::FULL_COMPACTION,
168
0
                                                           force, true, 1));
169
0
        }
170
5
    } else {
171
        // 2. fetch the tablet by tablet_id
172
5
        TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id);
173
5
        if (tablet == nullptr) {
174
5
            return Status::NotFound("Tablet not found. tablet_id={}", tablet_id);
175
5
        }
176
177
0
        if (fetch_from_remote && !tablet->should_fetch_from_peer()) {
178
0
            return Status::NotSupported("tablet should do compaction locally");
179
0
        }
180
0
        DBUG_EXECUTE_IF("CompactionAction._handle_run_compaction.submit_cumu_task", {
181
0
            RETURN_IF_ERROR(_engine.submit_compaction_task(
182
0
                    tablet, CompactionType::CUMULATIVE_COMPACTION, false));
183
0
            LOG(INFO) << "Manual debug compaction task is successfully triggered";
184
0
            *json_result =
185
0
                    R"({"status": "Success", "msg": "debug compaction task is successfully triggered. Table id: )" +
186
0
                    std::to_string(table_id) + ". Tablet id: " + std::to_string(tablet_id) + "\"}";
187
0
            return Status::OK();
188
0
        })
189
190
0
        if (compaction_type == PARAM_COMPACTION_FULL) {
191
            // 3. submit full compaction task to thread pool
192
0
            tablet->set_last_full_compaction_schedule_time(UnixMillis());
193
0
            RETURN_IF_ERROR(_engine.submit_compaction_task(tablet, CompactionType::FULL_COMPACTION,
194
0
                                                           force, true, 1));
195
0
        } else {
196
            // 3. execute base/cumulative/binlog compaction task in a detached thread
197
0
            std::packaged_task<Status()> task([this, tablet, compaction_type, fetch_from_remote]() {
198
0
                return _execute_compaction_callback(tablet, compaction_type, fetch_from_remote);
199
0
            });
200
0
            std::future<Status> future_obj = task.get_future();
201
0
            std::thread(std::move(task)).detach();
202
203
0
            if (compaction_type == PARAM_COMPACTION_BASE) {
204
0
                tablet->set_last_base_compaction_schedule_time(UnixMillis());
205
0
            } else if (compaction_type == PARAM_COMPACTION_CUMULATIVE) {
206
0
                tablet->set_last_cumu_compaction_schedule_time(UnixMillis());
207
0
            }
208
209
            // 4. wait for result for 2 seconds by async
210
0
            std::future_status status = future_obj.wait_for(std::chrono::seconds(2));
211
0
            if (status == std::future_status::ready) {
212
0
                Status olap_status = future_obj.get();
213
0
                if (!olap_status.ok()) {
214
0
                    return olap_status;
215
0
                }
216
0
            } else {
217
0
                LOG(INFO) << "Manual compaction task is timeout for waiting "
218
0
                          << (status == std::future_status::timeout);
219
0
            }
220
0
        }
221
0
    }
222
6
    LOG(INFO) << "Manual compaction task is successfully triggered";
223
1
    *json_result =
224
1
            R"({"status": "Success", "msg": "compaction task is successfully triggered. Table id: )" +
225
1
            std::to_string(table_id) + ". Tablet id: " + std::to_string(tablet_id) + "\"}";
226
1
    return Status::OK();
227
6
}
228
229
2
Status CompactionAction::_handle_run_status_compaction(HttpRequest* req, std::string* json_result) {
230
2
    uint64_t tablet_id = 0;
231
    // check & retrieve tablet_id from req if it contains
232
2
    RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id, TABLET_ID_KEY),
233
2
                                   "check param failed");
234
235
2
    if (tablet_id == 0) {
236
        // overall compaction status
237
1
        _engine.get_compaction_status_json(json_result);
238
1
        return Status::OK();
239
1
    } else {
240
        // fetch the tablet by tablet_id
241
1
        TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id);
242
1
        if (tablet == nullptr) {
243
1
            LOG(WARNING) << "invalid argument.tablet_id:" << tablet_id;
244
1
            return Status::InternalError("fail to get {}", tablet_id);
245
1
        }
246
247
0
        std::string json_template = R"({
248
0
            "status" : "Success",
249
0
            "run_status" : $0,
250
0
            "msg" : "$1",
251
0
            "tablet_id" : $2,
252
0
            "compact_type" : "$3"
253
0
        })";
254
255
0
        std::string msg = "compaction task for this tablet is not running";
256
0
        std::string compaction_type;
257
0
        bool run_status = false;
258
259
0
        {
260
            // Full compaction holds both base compaction lock and cumu compaction lock.
261
            // So we can not judge if full compaction is running by check these two locks holding.
262
            // Here, we use a variable 'is_full_compaction_running' to check if full compaction is running.
263
0
            if (tablet->is_full_compaction_running()) {
264
0
                msg = "compaction task for this tablet is running";
265
0
                compaction_type = "full";
266
0
                run_status = true;
267
0
                *json_result = absl::Substitute(json_template, run_status, msg, tablet_id,
268
0
                                                compaction_type);
269
0
                return Status::OK();
270
0
            }
271
0
        }
272
273
0
        {
274
            // use try lock to check this tablet is running cumulative compaction
275
0
            std::unique_lock<std::mutex> lock_cumulative(tablet->get_cumulative_compaction_lock(),
276
0
                                                         std::try_to_lock);
277
0
            if (!lock_cumulative.owns_lock()) {
278
0
                msg = "compaction task for this tablet is running";
279
0
                compaction_type = "cumulative";
280
0
                run_status = true;
281
0
                *json_result = absl::Substitute(json_template, run_status, msg, tablet_id,
282
0
                                                compaction_type);
283
0
                return Status::OK();
284
0
            }
285
0
        }
286
287
0
        {
288
            // use try lock to check this tablet is running binlog compaction
289
0
            std::unique_lock<std::mutex> lock_binlog(tablet->get_binlog_compaction_lock(),
290
0
                                                     std::try_to_lock);
291
0
            if (!lock_binlog.owns_lock()) {
292
0
                msg = "compaction task for this tablet is running";
293
0
                compaction_type = "binlog";
294
0
                run_status = true;
295
0
                *json_result = absl::Substitute(json_template, run_status, msg, tablet_id,
296
0
                                                compaction_type);
297
0
                return Status::OK();
298
0
            }
299
0
        }
300
301
0
        {
302
            // use try lock to check this tablet is running base compaction
303
0
            std::unique_lock<std::mutex> lock_base(tablet->get_base_compaction_lock(),
304
0
                                                   std::try_to_lock);
305
0
            if (!lock_base.owns_lock()) {
306
0
                msg = "compaction task for this tablet is running";
307
0
                compaction_type = "base";
308
0
                run_status = true;
309
0
                *json_result = absl::Substitute(json_template, run_status, msg, tablet_id,
310
0
                                                compaction_type);
311
0
                return Status::OK();
312
0
            }
313
0
        }
314
        // not running any compaction
315
0
        *json_result = absl::Substitute(json_template, run_status, msg, tablet_id, compaction_type);
316
0
        return Status::OK();
317
0
    }
318
2
}
319
320
Status CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet,
321
                                                      const std::string& compaction_type,
322
                                                      bool fetch_from_remote,
323
0
                                                      int8_t prefer_compaction_level) {
324
0
    MonotonicStopWatch timer;
325
0
    timer.start();
326
327
0
    std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy =
328
0
            CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
329
0
                    tablet->tablet_meta()->compaction_policy());
330
0
    if (tablet->get_cumulative_compaction_policy() == nullptr) {
331
0
        tablet->set_cumulative_compaction_policy(cumulative_compaction_policy);
332
0
    }
333
0
    Status res = Status::OK();
334
0
    auto* tracker = CompactionTaskTracker::instance();
335
0
    auto do_compact = [&](Compaction& compaction, CompactionProfileType profile_type) {
336
0
        RETURN_IF_ERROR(compaction.prepare_compact());
337
        // Register task as RUNNING with tracker (manual trigger, direct execution path)
338
        // Use compaction.compaction_id() which was allocated in constructor.
339
0
        int64_t compaction_id = compaction.compaction_id();
340
0
        {
341
0
            CompactionTaskInfo info;
342
0
            info.compaction_id = compaction_id;
343
0
            info.tablet_id = tablet->tablet_id();
344
0
            info.table_id = tablet->get_table_id();
345
0
            info.partition_id = tablet->partition_id();
346
0
            info.compaction_type = profile_type;
347
0
            info.status = CompactionTaskStatus::RUNNING;
348
0
            info.trigger_method = TriggerMethod::MANUAL;
349
0
            auto now_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
350
0
                                  std::chrono::system_clock::now().time_since_epoch())
351
0
                                  .count();
352
0
            info.scheduled_time_ms = now_ms;
353
0
            info.start_time_ms = now_ms;
354
0
            info.compaction_score = tablet->get_real_compaction_score();
355
0
            info.input_rowsets_count = compaction.input_rowsets_count();
356
0
            info.input_row_num = compaction.input_row_num_value();
357
0
            info.input_data_size = compaction.input_rowsets_data_size();
358
0
            info.input_index_size = compaction.input_rowsets_index_size();
359
0
            info.input_total_size = compaction.input_rowsets_total_size();
360
0
            info.input_segments_num = compaction.input_segments_num_value();
361
0
            info.input_version_range = compaction.input_version_range_str();
362
0
            info.is_vertical = compaction.is_vertical();
363
0
            info.backend_id = BackendOptions::get_backend_id();
364
0
            tracker->register_task(std::move(info));
365
0
        }
366
0
        auto st = compaction.execute_compact();
367
        // Idempotent cleanup: if execute returned early (e.g. TRY_LOCK_FAILED)
368
        // before submit_profile_record was called, task remains in active_tasks.
369
0
        tracker->remove_task(compaction_id);
370
0
        return st;
371
0
    };
372
0
    if (compaction_type == PARAM_COMPACTION_BASE) {
373
0
        BaseCompaction base_compaction(_engine, tablet);
374
0
        res = do_compact(base_compaction, CompactionProfileType::BASE);
375
0
        if (!res) {
376
0
            if (!res.is<BE_NO_SUITABLE_VERSION>()) {
377
0
                DorisMetrics::instance()->base_compaction_request_failed->increment(1);
378
0
            }
379
0
        }
380
0
    } else if (compaction_type == PARAM_COMPACTION_CUMULATIVE) {
381
0
        if (fetch_from_remote) {
382
0
            SingleReplicaCompaction single_compaction(_engine, tablet,
383
0
                                                      CompactionType::CUMULATIVE_COMPACTION);
384
0
            res = do_compact(single_compaction, CompactionProfileType::CUMULATIVE);
385
0
            if (!res) {
386
0
                LOG(WARNING) << "failed to do single compaction. res=" << res
387
0
                             << ", table=" << tablet->tablet_id();
388
0
            }
389
0
        } else {
390
0
            CumulativeCompaction cumulative_compaction(_engine, tablet);
391
0
            res = do_compact(cumulative_compaction, CompactionProfileType::CUMULATIVE);
392
0
            if (!res) {
393
0
                if (res.is<CUMULATIVE_NO_SUITABLE_VERSION>()) {
394
                    // Ignore this error code.
395
0
                    VLOG_NOTICE
396
0
                            << "failed to init cumulative compaction due to no suitable version,"
397
0
                            << "tablet=" << tablet->tablet_id();
398
0
                } else {
399
0
                    DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1);
400
0
                    LOG(WARNING) << "failed to do cumulative compaction. res=" << res
401
0
                                 << ", table=" << tablet->tablet_id();
402
0
                }
403
0
            }
404
0
        }
405
0
    } else if (compaction_type == PARAM_COMPACTION_BINLOG) {
406
0
        if (prefer_compaction_level < 0) {
407
0
            tablet->calc_compaction_score(CompactionType::BINLOG_COMPACTION,
408
0
                                          &prefer_compaction_level);
409
0
        }
410
0
        if (prefer_compaction_level < 0) {
411
0
            res = Status::Error<BINLOG_COMPACTION_NO_SUITABLE_VERSION>(
412
0
                    "failed to init binlog compaction due to no suitable version");
413
0
        } else {
414
0
            BinlogCompaction binlog_compaction(_engine, tablet, prefer_compaction_level);
415
0
            res = do_compact(binlog_compaction, CompactionProfileType::BINLOG);
416
0
            if (!res) {
417
0
                if (res.is<BINLOG_COMPACTION_NO_SUITABLE_VERSION>()) {
418
0
                    VLOG_NOTICE << "failed to init binlog compaction due to no suitable version,"
419
0
                                << "tablet=" << tablet->tablet_id() << ", compaction_level="
420
0
                                << static_cast<int>(prefer_compaction_level);
421
0
                } else {
422
0
                    LOG(WARNING) << "failed to do binlog compaction. res=" << res
423
0
                                 << ", table=" << tablet->tablet_id() << ", compaction_level="
424
0
                                 << static_cast<int>(prefer_compaction_level);
425
0
                }
426
0
            }
427
0
        }
428
0
    }
429
0
    timer.stop();
430
0
    LOG(INFO) << "Manual compaction task finish, status=" << res
431
0
              << ", compaction_use_time=" << timer.elapsed_time() / 1000000 << "ms";
432
0
    return res;
433
0
}
434
435
0
void CompactionAction::handle(HttpRequest* req) {
436
0
    req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.data());
437
438
0
    if (_compaction_type == CompactionActionType::SHOW_INFO) {
439
0
        std::string json_result;
440
0
        Status st = _handle_show_compaction(req, &json_result);
441
0
        if (!st.ok()) {
442
0
            HttpChannel::send_reply(req, HttpStatus::OK, st.to_json());
443
0
        } else {
444
0
            HttpChannel::send_reply(req, HttpStatus::OK, json_result);
445
0
        }
446
0
    } else if (_compaction_type == CompactionActionType::RUN_COMPACTION) {
447
0
        std::string json_result;
448
0
        Status st = _handle_run_compaction(req, &json_result);
449
0
        if (!st.ok()) {
450
0
            HttpChannel::send_reply(req, HttpStatus::OK, st.to_json());
451
0
        } else {
452
0
            HttpChannel::send_reply(req, HttpStatus::OK, json_result);
453
0
        }
454
0
    } else {
455
0
        std::string json_result;
456
0
        Status st = _handle_run_status_compaction(req, &json_result);
457
0
        if (!st.ok()) {
458
0
            HttpChannel::send_reply(req, HttpStatus::OK, st.to_json());
459
0
        } else {
460
0
            HttpChannel::send_reply(req, HttpStatus::OK, json_result);
461
0
        }
462
0
    }
463
0
}
464
465
} // end namespace doris