Coverage Report

Created: 2026-04-15 12:36

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/service/http/action/tablet_migration_action.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "service/http/action/tablet_migration_action.h"
19
20
#include <glog/logging.h>
21
22
#include <exception>
23
#include <string>
24
25
#include "common/config.h"
26
#include "common/status.h"
27
#include "service/http/http_channel.h"
28
#include "service/http/http_headers.h"
29
#include "service/http/http_request.h"
30
#include "service/http/http_status.h"
31
#include "storage/data_dir.h"
32
#include "storage/storage_engine.h"
33
#include "storage/tablet/tablet_manager.h"
34
#include "storage/task/engine_storage_migration_task.h"
35
36
namespace doris {
37
const static std::string HEADER_JSON = "application/json";
38
39
0
void TabletMigrationAction::_init_migration_action() {
40
0
    int32_t max_thread_num = config::max_tablet_migration_threads;
41
0
    int32_t min_thread_num = config::min_tablet_migration_threads;
42
0
    THROW_IF_ERROR(ThreadPoolBuilder("MigrationTaskThreadPool")
43
0
                           .set_min_threads(min_thread_num)
44
0
                           .set_max_threads(max_thread_num)
45
0
                           .build(&_migration_thread_pool));
46
0
}
47
48
0
void TabletMigrationAction::handle(HttpRequest* req) {
49
0
    int64_t tablet_id = 0;
50
0
    unsigned long schema_hash = 0;
51
0
    std::string dest_disk = "";
52
0
    std::string goal = "";
53
0
    Status status = _check_param(req, tablet_id, schema_hash, dest_disk, goal);
54
0
    if (status.ok()) {
55
0
        if (goal == "run") {
56
0
            MigrationTask current_task(tablet_id, schema_hash, dest_disk);
57
0
            TabletSharedPtr tablet;
58
0
            DataDir* dest_store;
59
0
            status = _check_migrate_request(tablet_id, schema_hash, dest_disk, tablet, &dest_store);
60
0
            if (status.ok()) {
61
0
                do {
62
0
                    {
63
0
                        std::unique_lock<std::mutex> lock(_migration_status_mutex);
64
0
                        std::map<MigrationTask, std::string>::iterator it_task =
65
0
                                _migration_tasks.find(current_task);
66
0
                        if (it_task != _migration_tasks.end()) {
67
0
                            status = Status::AlreadyExist(
68
0
                                    "There is a migration task for this tablet already exists. "
69
0
                                    "dest_disk is {} .",
70
0
                                    (it_task->first)._dest_disk);
71
0
                            break;
72
0
                        }
73
0
                        _migration_tasks[current_task] = "submitted";
74
0
                    }
75
0
                    auto st = _migration_thread_pool->submit_func([&, tablet, dest_store,
76
0
                                                                   current_task]() {
77
0
                        {
78
0
                            std::unique_lock<std::mutex> lock(_migration_status_mutex);
79
0
                            _migration_tasks[current_task] = "running";
80
0
                        }
81
0
                        Status result_status = _execute_tablet_migration(tablet, dest_store);
82
0
                        {
83
0
                            std::unique_lock<std::mutex> lock(_migration_status_mutex);
84
0
                            std::map<MigrationTask, std::string>::iterator it_task =
85
0
                                    _migration_tasks.find(current_task);
86
0
                            if (it_task != _migration_tasks.end()) {
87
0
                                _migration_tasks.erase(it_task);
88
0
                            }
89
0
                            std::pair<MigrationTask, Status> finished_task =
90
0
                                    std::make_pair(current_task, result_status);
91
0
                            if (_finished_migration_tasks.size() >=
92
0
                                config::finished_migration_tasks_size) {
93
0
                                _finished_migration_tasks.pop_front();
94
0
                            }
95
0
                            _finished_migration_tasks.push_back(finished_task);
96
0
                        }
97
0
                    });
98
0
                    if (!st.ok()) {
99
0
                        status = Status::InternalError("Migration task submission failed");
100
0
                        std::unique_lock<std::mutex> lock(_migration_status_mutex);
101
0
                        std::map<MigrationTask, std::string>::iterator it_task =
102
0
                                _migration_tasks.find(current_task);
103
0
                        if (it_task != _migration_tasks.end()) {
104
0
                            _migration_tasks.erase(it_task);
105
0
                        }
106
0
                    }
107
0
                } while (0);
108
0
            }
109
0
            std::string status_result;
110
0
            if (!status.ok()) {
111
0
                status_result = status.to_json();
112
0
            } else {
113
0
                status_result =
114
0
                        "{\"status\": \"Success\", \"msg\": \"migration task is successfully "
115
0
                        "submitted.\"}";
116
0
            }
117
0
            req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str());
118
0
            HttpChannel::send_reply(req, HttpStatus::OK, status_result);
119
0
        } else {
120
0
            DCHECK(goal == "status");
121
0
            MigrationTask current_task(tablet_id, schema_hash);
122
0
            std::string status_result;
123
0
            do {
124
0
                std::unique_lock<std::mutex> lock(_migration_status_mutex);
125
0
                std::map<MigrationTask, std::string>::iterator it_task =
126
0
                        _migration_tasks.find(current_task);
127
0
                if (it_task != _migration_tasks.end()) {
128
0
                    status_result = "{\"status\": \"Success\", \"msg\": \"migration task is " +
129
0
                                    it_task->second + "\", \"dest_disk\": \"" +
130
0
                                    (it_task->first)._dest_disk + "\"}";
131
0
                    break;
132
0
                }
133
134
0
                int i = cast_set<int>(_finished_migration_tasks.size()) - 1;
135
0
                for (; i >= 0; i--) {
136
0
                    MigrationTask finished_task = _finished_migration_tasks[i].first;
137
0
                    if (finished_task._tablet_id == tablet_id &&
138
0
                        finished_task._schema_hash == schema_hash) {
139
0
                        status = _finished_migration_tasks[i].second;
140
0
                        if (status.ok()) {
141
0
                            status_result =
142
0
                                    "{\"status\": \"Success\", \"msg\": \"migration task has "
143
0
                                    "finished successfully\", \"dest_disk\": \"" +
144
0
                                    finished_task._dest_disk + "\"}";
145
0
                        }
146
0
                        break;
147
0
                    }
148
0
                }
149
0
                if (i < 0) {
150
0
                    status = Status::NotFound("Migration task not found");
151
0
                }
152
0
            } while (0);
153
0
            if (!status.ok()) {
154
0
                status_result = status.to_json();
155
0
            }
156
0
            req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str());
157
0
            HttpChannel::send_reply(req, HttpStatus::OK, status_result);
158
0
        }
159
0
    } else {
160
0
        std::string status_result = status.to_json();
161
0
        req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str());
162
0
        HttpChannel::send_reply(req, HttpStatus::OK, status_result);
163
0
    }
164
0
}
165
166
Status TabletMigrationAction::_check_param(HttpRequest* req, int64_t& tablet_id,
167
                                           unsigned long& schema_hash, std::string& dest_disk,
168
0
                                           std::string& goal) {
169
0
    const std::string& req_tablet_id = req->param("tablet_id");
170
0
    const std::string& req_schema_hash = req->param("schema_hash");
171
0
    try {
172
0
        tablet_id = std::stoull(req_tablet_id);
173
0
        schema_hash = std::stoul(req_schema_hash);
174
0
    } catch (const std::exception& e) {
175
0
        return Status::InternalError(
176
0
                "Convert failed:{}, invalid argument.tablet_id: {}, schema_hash: {}", e.what(),
177
0
                req_tablet_id, req_schema_hash);
178
0
    }
179
0
    dest_disk = req->param("disk");
180
0
    goal = req->param("goal");
181
0
    if (goal != "run" && goal != "status") {
182
0
        return Status::InternalError("invalid goal argument.");
183
0
    }
184
0
    return Status::OK();
185
0
}
186
187
Status TabletMigrationAction::_check_migrate_request(int64_t tablet_id, unsigned long schema_hash,
188
                                                     std::string dest_disk, TabletSharedPtr& tablet,
189
0
                                                     DataDir** dest_store) {
190
0
    tablet = _engine.tablet_manager()->get_tablet(tablet_id);
191
0
    if (tablet == nullptr) {
192
0
        LOG(WARNING) << "no tablet for tablet_id:" << tablet_id;
193
0
        return Status::NotFound("Tablet not found");
194
0
    }
195
196
    // request specify the data dir
197
0
    *dest_store = _engine.get_store(dest_disk);
198
0
    if (*dest_store == nullptr) {
199
0
        LOG(WARNING) << "data dir not found: " << dest_disk;
200
0
        return Status::NotFound("Disk not found");
201
0
    }
202
203
0
    if (tablet->data_dir() == *dest_store) {
204
0
        LOG(WARNING) << "tablet already exist in destine disk: " << dest_disk;
205
0
        return Status::AlreadyExist("Tablet already exist in destination disk");
206
0
    }
207
208
    // check local disk capacity
209
0
    int64_t tablet_size = tablet->tablet_local_size();
210
0
    if ((*dest_store)->reach_capacity_limit(tablet_size)) {
211
0
        LOG(WARNING) << "reach the capacity limit of path: " << (*dest_store)->path()
212
0
                     << ", tablet size: " << tablet_size;
213
0
        return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
214
0
                "reach the capacity limit of path {}, tablet_size={}", (*dest_store)->path(),
215
0
                tablet_size);
216
0
    }
217
218
0
    return Status::OK();
219
0
}
220
221
Status TabletMigrationAction::_execute_tablet_migration(TabletSharedPtr tablet,
222
0
                                                        DataDir* dest_store) {
223
0
    int64_t tablet_id = tablet->tablet_id();
224
0
    int32_t schema_hash = tablet->schema_hash();
225
0
    std::string dest_disk = dest_store->path();
226
0
    EngineStorageMigrationTask engine_task(_engine, tablet, dest_store);
227
0
    SCOPED_ATTACH_TASK(engine_task.mem_tracker());
228
0
    Status res = engine_task.execute();
229
0
    if (!res.ok()) {
230
0
        LOG(WARNING) << "tablet migrate failed. tablet_id=" << tablet_id
231
0
                     << ", schema_hash=" << schema_hash << ", dest_disk=" << dest_disk
232
0
                     << ", status:" << res;
233
0
    } else {
234
        LOG(INFO) << "tablet migrate success. tablet_id=" << tablet_id
235
0
                  << ", schema_hash=" << schema_hash << ", dest_disk=" << dest_disk;
236
0
    }
237
0
    return res;
238
0
}
239
} // namespace doris