Coverage Report

Created: 2026-03-15 17:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/service/http/action/download_binlog_action.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "service/http/action/download_binlog_action.h"
19
20
#include <fmt/format.h>
21
#include <fmt/ranges.h>
22
23
#include <cstdint>
24
#include <stdexcept>
25
#include <string_view>
26
#include <utility>
27
28
#include "common/config.h"
29
#include "common/logging.h"
30
#include "io/fs/local_file_system.h"
31
#include "runtime/exec_env.h"
32
#include "service/http/http_channel.h"
33
#include "service/http/http_request.h"
34
#include "service/http/utils.h"
35
#include "storage/storage_engine.h"
36
#include "storage/tablet/tablet_manager.h"
37
#include "util/stopwatch.hpp"
38
39
namespace doris {
40
41
namespace {
42
const std::string kMethodParameter = "method";
43
const std::string kTokenParameter = "token";
44
const std::string kTabletIdParameter = "tablet_id";
45
const std::string kBinlogVersionParameter = "binlog_version";
46
const std::string kRowsetIdParameter = "rowset_id";
47
const std::string kSegmentIndexParameter = "segment_index";
48
const std::string kSegmentIndexIdParameter = "segment_index_id";
49
const std::string kAcquireMD5Parameter = "acquire_md5";
50
51
bvar::LatencyRecorder g_get_binlog_info_latency("doris_download_binlog", "get_binlog_info");
52
bvar::LatencyRecorder g_get_segment_file_latency("doris_download_binlog", "get_segment_file");
53
bvar::LatencyRecorder g_get_segment_index_file_latency("doris_download_binlog",
54
                                                       "get_segment_index_file");
55
bvar::LatencyRecorder g_get_rowset_meta_latency("doris_download_binlog", "get_rowset_meta");
56
57
// get http param, if no value throw exception
58
0
const auto& get_http_param(HttpRequest* req, const std::string& param_name) {
59
0
    const auto& param = req->param(param_name);
60
0
    if (param.empty()) {
61
0
        auto error_msg = fmt::format("parameter {} not specified in url.", param_name);
62
0
        throw std::runtime_error(error_msg);
63
0
    }
64
0
    return param;
65
0
}
66
67
0
auto get_tablet(StorageEngine& engine, const std::string& tablet_id_str) {
68
0
    int64_t tablet_id = std::atoll(tablet_id_str.data());
69
70
0
    TabletSharedPtr tablet = engine.tablet_manager()->get_tablet(tablet_id);
71
0
    if (tablet == nullptr) {
72
0
        auto error = fmt::format("tablet is not exist, tablet_id={}", tablet_id);
73
0
        LOG(WARNING) << error;
74
0
        throw std::runtime_error(error);
75
0
    }
76
77
0
    return tablet;
78
0
}
79
80
// need binlog_version, tablet_id
81
0
void handle_get_binlog_info(StorageEngine& engine, HttpRequest* req) {
82
0
    try {
83
0
        const auto& binlog_version = get_http_param(req, kBinlogVersionParameter);
84
0
        const auto& tablet_id = get_http_param(req, kTabletIdParameter);
85
0
        auto tablet = get_tablet(engine, tablet_id);
86
87
0
        const auto& [rowset_id, num_segments] = tablet->get_binlog_info(binlog_version);
88
0
        if (rowset_id.empty()) {
89
0
            HttpChannel::send_reply(
90
0
                    req, HttpStatus::NOT_FOUND,
91
0
                    fmt::format("get binlog info failed, binlog_version={}", binlog_version));
92
0
        } else if (num_segments < 0) {
93
0
            HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
94
0
                                    fmt::format("invalid num_segments: {}", num_segments));
95
0
        } else {
96
0
            auto binlog_info_msg = fmt::format("{}:{}", rowset_id, num_segments);
97
0
            HttpChannel::send_reply(req, binlog_info_msg);
98
0
        }
99
0
    } catch (const std::exception& e) {
100
0
        HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what());
101
0
        LOG(WARNING) << "get binlog info failed, error: " << e.what();
102
0
        return;
103
0
    }
104
0
}
105
106
/// handle get segment file, need tablet_id, rowset_id && index
107
void handle_get_segment_file(StorageEngine& engine, HttpRequest* req,
108
0
                             bufferevent_rate_limit_group* rate_limit_group) {
109
    // Step 1: get download file path
110
0
    std::string segment_file_path;
111
0
    bool is_acquire_md5 = false;
112
0
    try {
113
0
        const auto& tablet_id = get_http_param(req, kTabletIdParameter);
114
0
        auto tablet = get_tablet(engine, tablet_id);
115
0
        const auto& rowset_id = get_http_param(req, kRowsetIdParameter);
116
0
        const auto& segment_index = get_http_param(req, kSegmentIndexParameter);
117
0
        segment_file_path = tablet->get_segment_filepath(rowset_id, segment_index);
118
0
        is_acquire_md5 = !req->param(kAcquireMD5Parameter).empty();
119
0
    } catch (const std::exception& e) {
120
0
        HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what());
121
0
        LOG(WARNING) << "get download file path failed, error: " << e.what();
122
0
        return;
123
0
    }
124
125
    // Step 2: handle download
126
    // check file exists
127
0
    bool exists = false;
128
0
    Status status = io::global_local_filesystem()->exists(segment_file_path, &exists);
129
0
    if (!status.ok()) {
130
0
        HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, status.to_string());
131
0
        LOG(WARNING) << "check file exists failed, error: " << status.to_string();
132
0
        return;
133
0
    }
134
0
    if (!exists) {
135
0
        HttpChannel::send_reply(req, HttpStatus::NOT_FOUND, "file not exist.");
136
0
        LOG(WARNING) << "file not exist, file path: " << segment_file_path;
137
0
        return;
138
0
    }
139
0
    do_file_response(segment_file_path, req, rate_limit_group, is_acquire_md5);
140
0
}
141
142
/// handle get segment index file, need tablet_id, rowset_id, segment_index && segment_index_id
143
void handle_get_segment_index_file(StorageEngine& engine, HttpRequest* req,
144
0
                                   bufferevent_rate_limit_group* rate_limit_group) {
145
    // Step 1: get download file path
146
0
    std::string segment_index_file_path;
147
0
    bool is_acquire_md5 = false;
148
0
    try {
149
0
        const auto& tablet_id = get_http_param(req, kTabletIdParameter);
150
0
        auto tablet = get_tablet(engine, tablet_id);
151
0
        const auto& rowset_id = get_http_param(req, kRowsetIdParameter);
152
0
        const auto& segment_index = get_http_param(req, kSegmentIndexParameter);
153
0
        const auto& segment_index_id = req->param(kSegmentIndexIdParameter);
154
0
        auto segment_file_path = tablet->get_segment_filepath(rowset_id, segment_index);
155
0
        if (tablet->tablet_schema()->get_inverted_index_storage_format() ==
156
0
            InvertedIndexStorageFormatPB::V1) {
157
            // now CCR not support for variant + index v1
158
0
            constexpr std::string_view index_suffix = "";
159
0
            segment_index_file_path = InvertedIndexDescriptor::get_index_file_path_v1(
160
0
                    InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path),
161
0
                    std::stoll(segment_index_id), index_suffix);
162
0
        } else {
163
0
            DCHECK(segment_index_id == "-1");
164
0
            segment_index_file_path = InvertedIndexDescriptor::get_index_file_path_v2(
165
0
                    InvertedIndexDescriptor::get_index_file_path_prefix(segment_file_path));
166
0
        }
167
0
        is_acquire_md5 = !req->param(kAcquireMD5Parameter).empty();
168
0
    } catch (const std::exception& e) {
169
0
        HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what());
170
0
        LOG(WARNING) << "get download file path failed, error: " << e.what();
171
0
        return;
172
0
    }
173
174
    // Step 2: handle download
175
    // check file exists
176
0
    bool exists = false;
177
0
    Status status = io::global_local_filesystem()->exists(segment_index_file_path, &exists);
178
0
    if (!status.ok()) {
179
0
        HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, status.to_string());
180
0
        LOG(WARNING) << "check file exists failed, error: " << status.to_string();
181
0
        return;
182
0
    }
183
0
    if (!exists) {
184
0
        HttpChannel::send_reply(req, HttpStatus::NOT_FOUND, "file not exist.");
185
0
        LOG(WARNING) << "file not exist, file path: " << segment_index_file_path;
186
0
        return;
187
0
    }
188
0
    do_file_response(segment_index_file_path, req, rate_limit_group, is_acquire_md5);
189
0
}
190
191
0
void handle_get_rowset_meta(StorageEngine& engine, HttpRequest* req) {
192
0
    try {
193
0
        const auto& tablet_id = get_http_param(req, kTabletIdParameter);
194
0
        auto tablet = get_tablet(engine, tablet_id);
195
0
        const auto& rowset_id = get_http_param(req, kRowsetIdParameter);
196
0
        const auto& binlog_version = get_http_param(req, kBinlogVersionParameter);
197
0
        auto rowset_meta = tablet->get_rowset_binlog_meta(binlog_version, rowset_id);
198
0
        if (rowset_meta.empty()) {
199
0
            HttpChannel::send_reply(req, HttpStatus::NOT_FOUND,
200
0
                                    fmt::format("get rowset meta failed, rowset_id={}", rowset_id));
201
0
        } else {
202
0
            HttpChannel::send_reply(req, rowset_meta);
203
0
        }
204
0
    } catch (const std::exception& e) {
205
0
        HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, e.what());
206
0
        LOG(WARNING) << "get download file path failed, error: " << e.what();
207
0
    }
208
0
}
209
210
} // namespace
211
212
DownloadBinlogAction::DownloadBinlogAction(
213
        ExecEnv* exec_env, StorageEngine& engine,
214
        std::shared_ptr<bufferevent_rate_limit_group> rate_limit_group)
215
0
        : HttpHandlerWithAuth(exec_env),
216
0
          _engine(engine),
217
0
          _rate_limit_group(std::move(rate_limit_group)) {}
218
219
0
void DownloadBinlogAction::handle(HttpRequest* req) {
220
0
    VLOG_CRITICAL << "accept one download binlog request " << req->debug_string();
221
222
0
    if (!config::enable_feature_binlog) {
223
0
        HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
224
0
                                "binlog feature is not enabled.");
225
0
        return;
226
0
    }
227
228
    // Step 1: check token
229
0
    Status status;
230
0
    if (config::enable_token_check) {
231
        // FIXME(Drogon): support check token
232
        // status = _check_token(req);
233
0
        if (!status.ok()) {
234
0
            HttpChannel::send_reply(req, HttpStatus::UNAUTHORIZED, status.to_string());
235
0
            return;
236
0
        }
237
0
    }
238
239
    // Step 2: get method
240
0
    const std::string& method = req->param(kMethodParameter);
241
242
    // Step 3: dispatch
243
0
    MonotonicStopWatch watch;
244
0
    watch.start();
245
0
    if (method == "get_binlog_info") {
246
0
        handle_get_binlog_info(_engine, req);
247
0
        g_get_binlog_info_latency << watch.elapsed_time_microseconds();
248
0
    } else if (method == "get_segment_file") {
249
0
        handle_get_segment_file(_engine, req, _rate_limit_group.get());
250
0
        g_get_segment_file_latency << watch.elapsed_time_microseconds();
251
0
    } else if (method == "get_segment_index_file") {
252
0
        handle_get_segment_index_file(_engine, req, _rate_limit_group.get());
253
0
        g_get_segment_index_file_latency << watch.elapsed_time_microseconds();
254
0
    } else if (method == "get_rowset_meta") {
255
0
        handle_get_rowset_meta(_engine, req);
256
0
        g_get_rowset_meta_latency << watch.elapsed_time_microseconds();
257
0
    } else {
258
0
        auto error_msg = fmt::format("invalid method: {}", method);
259
0
        LOG(WARNING) << error_msg;
260
0
        HttpChannel::send_reply(req, HttpStatus::NOT_IMPLEMENTED, error_msg);
261
0
    }
262
0
}
263
264
0
Status DownloadBinlogAction::_check_token(HttpRequest* req) {
265
0
    const std::string& token_str = req->param(kTokenParameter);
266
0
    if (token_str.empty()) {
267
0
        return Status::InternalError("token is not specified.");
268
0
    }
269
270
0
    const std::string& local_token = _exec_env->token();
271
0
    if (token_str != local_token) {
272
0
        LOG(WARNING) << "invalid download token: " << token_str << ", local token: " << local_token;
273
0
        return Status::NotAuthorized("invalid token {}", token_str);
274
0
    }
275
276
0
    return Status::OK();
277
0
}
278
279
} // end namespace doris