Coverage Report

Created: 2026-03-13 19:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/service/http/action/tablet_migration_action.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "service/http/action/tablet_migration_action.h"
19
20
#include <glog/logging.h>
21
22
#include <exception>
23
#include <string>
24
25
#include "common/config.h"
26
#include "common/status.h"
27
#include "service/http/http_channel.h"
28
#include "service/http/http_headers.h"
29
#include "service/http/http_request.h"
30
#include "service/http/http_status.h"
31
#include "storage/data_dir.h"
32
#include "storage/storage_engine.h"
33
#include "storage/tablet/tablet_manager.h"
34
#include "storage/task/engine_storage_migration_task.h"
35
36
namespace doris {
37
#include "common/compile_check_begin.h"
38
const static std::string HEADER_JSON = "application/json";
39
40
0
void TabletMigrationAction::_init_migration_action() {
41
0
    int32_t max_thread_num = config::max_tablet_migration_threads;
42
0
    int32_t min_thread_num = config::min_tablet_migration_threads;
43
0
    THROW_IF_ERROR(ThreadPoolBuilder("MigrationTaskThreadPool")
44
0
                           .set_min_threads(min_thread_num)
45
0
                           .set_max_threads(max_thread_num)
46
0
                           .build(&_migration_thread_pool));
47
0
}
48
49
0
void TabletMigrationAction::handle(HttpRequest* req) {
50
0
    int64_t tablet_id = 0;
51
0
    unsigned long schema_hash = 0;
52
0
    std::string dest_disk = "";
53
0
    std::string goal = "";
54
0
    Status status = _check_param(req, tablet_id, schema_hash, dest_disk, goal);
55
0
    if (status.ok()) {
56
0
        if (goal == "run") {
57
0
            MigrationTask current_task(tablet_id, schema_hash, dest_disk);
58
0
            TabletSharedPtr tablet;
59
0
            DataDir* dest_store;
60
0
            status = _check_migrate_request(tablet_id, schema_hash, dest_disk, tablet, &dest_store);
61
0
            if (status.ok()) {
62
0
                do {
63
0
                    {
64
0
                        std::unique_lock<std::mutex> lock(_migration_status_mutex);
65
0
                        std::map<MigrationTask, std::string>::iterator it_task =
66
0
                                _migration_tasks.find(current_task);
67
0
                        if (it_task != _migration_tasks.end()) {
68
0
                            status = Status::AlreadyExist(
69
0
                                    "There is a migration task for this tablet already exists. "
70
0
                                    "dest_disk is {} .",
71
0
                                    (it_task->first)._dest_disk);
72
0
                            break;
73
0
                        }
74
0
                        _migration_tasks[current_task] = "submitted";
75
0
                    }
76
0
                    auto st = _migration_thread_pool->submit_func([&, tablet, dest_store,
77
0
                                                                   current_task]() {
78
0
                        {
79
0
                            std::unique_lock<std::mutex> lock(_migration_status_mutex);
80
0
                            _migration_tasks[current_task] = "running";
81
0
                        }
82
0
                        Status result_status = _execute_tablet_migration(tablet, dest_store);
83
0
                        {
84
0
                            std::unique_lock<std::mutex> lock(_migration_status_mutex);
85
0
                            std::map<MigrationTask, std::string>::iterator it_task =
86
0
                                    _migration_tasks.find(current_task);
87
0
                            if (it_task != _migration_tasks.end()) {
88
0
                                _migration_tasks.erase(it_task);
89
0
                            }
90
0
                            std::pair<MigrationTask, Status> finished_task =
91
0
                                    std::make_pair(current_task, result_status);
92
0
                            if (_finished_migration_tasks.size() >=
93
0
                                config::finished_migration_tasks_size) {
94
0
                                _finished_migration_tasks.pop_front();
95
0
                            }
96
0
                            _finished_migration_tasks.push_back(finished_task);
97
0
                        }
98
0
                    });
99
0
                    if (!st.ok()) {
100
0
                        status = Status::InternalError("Migration task submission failed");
101
0
                        std::unique_lock<std::mutex> lock(_migration_status_mutex);
102
0
                        std::map<MigrationTask, std::string>::iterator it_task =
103
0
                                _migration_tasks.find(current_task);
104
0
                        if (it_task != _migration_tasks.end()) {
105
0
                            _migration_tasks.erase(it_task);
106
0
                        }
107
0
                    }
108
0
                } while (0);
109
0
            }
110
0
            std::string status_result;
111
0
            if (!status.ok()) {
112
0
                status_result = status.to_json();
113
0
            } else {
114
0
                status_result =
115
0
                        "{\"status\": \"Success\", \"msg\": \"migration task is successfully "
116
0
                        "submitted.\"}";
117
0
            }
118
0
            req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str());
119
0
            HttpChannel::send_reply(req, HttpStatus::OK, status_result);
120
0
        } else {
121
0
            DCHECK(goal == "status");
122
0
            MigrationTask current_task(tablet_id, schema_hash);
123
0
            std::string status_result;
124
0
            do {
125
0
                std::unique_lock<std::mutex> lock(_migration_status_mutex);
126
0
                std::map<MigrationTask, std::string>::iterator it_task =
127
0
                        _migration_tasks.find(current_task);
128
0
                if (it_task != _migration_tasks.end()) {
129
0
                    status_result = "{\"status\": \"Success\", \"msg\": \"migration task is " +
130
0
                                    it_task->second + "\", \"dest_disk\": \"" +
131
0
                                    (it_task->first)._dest_disk + "\"}";
132
0
                    break;
133
0
                }
134
135
0
                int i = cast_set<int>(_finished_migration_tasks.size()) - 1;
136
0
                for (; i >= 0; i--) {
137
0
                    MigrationTask finished_task = _finished_migration_tasks[i].first;
138
0
                    if (finished_task._tablet_id == tablet_id &&
139
0
                        finished_task._schema_hash == schema_hash) {
140
0
                        status = _finished_migration_tasks[i].second;
141
0
                        if (status.ok()) {
142
0
                            status_result =
143
0
                                    "{\"status\": \"Success\", \"msg\": \"migration task has "
144
0
                                    "finished successfully\", \"dest_disk\": \"" +
145
0
                                    finished_task._dest_disk + "\"}";
146
0
                        }
147
0
                        break;
148
0
                    }
149
0
                }
150
0
                if (i < 0) {
151
0
                    status = Status::NotFound("Migration task not found");
152
0
                }
153
0
            } while (0);
154
0
            if (!status.ok()) {
155
0
                status_result = status.to_json();
156
0
            }
157
0
            req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str());
158
0
            HttpChannel::send_reply(req, HttpStatus::OK, status_result);
159
0
        }
160
0
    } else {
161
0
        std::string status_result = status.to_json();
162
0
        req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str());
163
0
        HttpChannel::send_reply(req, HttpStatus::OK, status_result);
164
0
    }
165
0
}
166
167
Status TabletMigrationAction::_check_param(HttpRequest* req, int64_t& tablet_id,
168
                                           unsigned long& schema_hash, std::string& dest_disk,
169
0
                                           std::string& goal) {
170
0
    const std::string& req_tablet_id = req->param("tablet_id");
171
0
    const std::string& req_schema_hash = req->param("schema_hash");
172
0
    try {
173
0
        tablet_id = std::stoull(req_tablet_id);
174
0
        schema_hash = std::stoul(req_schema_hash);
175
0
    } catch (const std::exception& e) {
176
0
        return Status::InternalError(
177
0
                "Convert failed:{}, invalid argument.tablet_id: {}, schema_hash: {}", e.what(),
178
0
                req_tablet_id, req_schema_hash);
179
0
    }
180
0
    dest_disk = req->param("disk");
181
0
    goal = req->param("goal");
182
0
    if (goal != "run" && goal != "status") {
183
0
        return Status::InternalError("invalid goal argument.");
184
0
    }
185
0
    return Status::OK();
186
0
}
187
188
Status TabletMigrationAction::_check_migrate_request(int64_t tablet_id, unsigned long schema_hash,
189
                                                     std::string dest_disk, TabletSharedPtr& tablet,
190
0
                                                     DataDir** dest_store) {
191
0
    tablet = _engine.tablet_manager()->get_tablet(tablet_id);
192
0
    if (tablet == nullptr) {
193
0
        LOG(WARNING) << "no tablet for tablet_id:" << tablet_id;
194
0
        return Status::NotFound("Tablet not found");
195
0
    }
196
197
    // request specify the data dir
198
0
    *dest_store = _engine.get_store(dest_disk);
199
0
    if (*dest_store == nullptr) {
200
0
        LOG(WARNING) << "data dir not found: " << dest_disk;
201
0
        return Status::NotFound("Disk not found");
202
0
    }
203
204
0
    if (tablet->data_dir() == *dest_store) {
205
0
        LOG(WARNING) << "tablet already exist in destine disk: " << dest_disk;
206
0
        return Status::AlreadyExist("Tablet already exist in destination disk");
207
0
    }
208
209
    // check local disk capacity
210
0
    int64_t tablet_size = tablet->tablet_local_size();
211
0
    if ((*dest_store)->reach_capacity_limit(tablet_size)) {
212
0
        LOG(WARNING) << "reach the capacity limit of path: " << (*dest_store)->path()
213
0
                     << ", tablet size: " << tablet_size;
214
0
        return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
215
0
                "reach the capacity limit of path {}, tablet_size={}", (*dest_store)->path(),
216
0
                tablet_size);
217
0
    }
218
219
0
    return Status::OK();
220
0
}
221
222
Status TabletMigrationAction::_execute_tablet_migration(TabletSharedPtr tablet,
223
0
                                                        DataDir* dest_store) {
224
0
    int64_t tablet_id = tablet->tablet_id();
225
0
    int32_t schema_hash = tablet->schema_hash();
226
0
    std::string dest_disk = dest_store->path();
227
0
    EngineStorageMigrationTask engine_task(_engine, tablet, dest_store);
228
0
    SCOPED_ATTACH_TASK(engine_task.mem_tracker());
229
0
    Status res = engine_task.execute();
230
0
    if (!res.ok()) {
231
0
        LOG(WARNING) << "tablet migrate failed. tablet_id=" << tablet_id
232
0
                     << ", schema_hash=" << schema_hash << ", dest_disk=" << dest_disk
233
0
                     << ", status:" << res;
234
0
    } else {
235
        LOG(INFO) << "tablet migrate success. tablet_id=" << tablet_id
236
0
                  << ", schema_hash=" << schema_hash << ", dest_disk=" << dest_disk;
237
0
    }
238
0
    return res;
239
0
}
240
#include "common/compile_check_end.h"
241
} // namespace doris