Coverage Report

Created: 2026-03-12 16:03

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/service/http/action/download_action.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "service/http/action/download_action.h"
19
20
#include <memory>
21
#include <string>
22
#include <utility>
23
24
#include "common/config.h"
25
#include "common/logging.h"
26
#include "common/status.h"
27
#include "io/fs/local_file_system.h"
28
#include "runtime/exec_env.h"
29
#include "service/http/http_channel.h"
30
#include "service/http/http_request.h"
31
#include "service/http/utils.h"
32
33
namespace doris {
34
namespace {
35
const std::string FILE_PARAMETER = "file";
36
const std::string TOKEN_PARAMETER = "token";
37
const std::string CHANNEL_PARAMETER = "channel";
38
const std::string CHANNEL_INGEST_BINLOG_TYPE = "ingest_binlog";
39
const std::string ACQUIRE_MD5_PARAMETER = "acquire_md5";
40
} // namespace
41
42
DownloadAction::DownloadAction(ExecEnv* exec_env,
43
                               std::shared_ptr<bufferevent_rate_limit_group> rate_limit_group,
44
                               const std::vector<std::string>& allow_dirs, int32_t num_workers)
45
22
        : HttpHandlerWithAuth(exec_env),
46
22
          _download_type(NORMAL),
47
22
          _num_workers(num_workers),
48
22
          _rate_limit_group(std::move(rate_limit_group)) {
49
34
    for (const auto& dir : allow_dirs) {
50
34
        std::string p;
51
34
        Status st = io::global_local_filesystem()->canonicalize(dir, &p);
52
34
        if (!st.ok()) {
53
0
            continue;
54
0
        }
55
34
        _allow_paths.emplace_back(std::move(p));
56
34
    }
57
22
    if (_num_workers > 0) {
58
        // for single-replica-load
59
6
        static_cast<void>(ThreadPoolBuilder("DownloadThreadPool")
60
6
                                  .set_min_threads(num_workers)
61
6
                                  .set_max_threads(num_workers)
62
6
                                  .build(&_download_workers));
63
6
    }
64
22
}
65
66
DownloadAction::DownloadAction(ExecEnv* exec_env, const std::string& error_log_root_dir)
67
8
        : HttpHandlerWithAuth(exec_env), _download_type(ERROR_LOG), _num_workers(0) {
68
8
#ifndef BE_TEST
69
8
    static_cast<void>(
70
8
            io::global_local_filesystem()->canonicalize(error_log_root_dir, &_error_log_root_dir));
71
8
#endif
72
8
}
73
74
4
void DownloadAction::handle_normal(HttpRequest* req, const std::string& file_param) {
75
    // check token
76
4
    Status status;
77
4
    if (config::enable_token_check) {
78
4
        status = check_token(req);
79
4
        if (!status.ok()) {
80
0
            std::string error_msg = status.to_string();
81
0
            if (status.is<ErrorCode::NOT_AUTHORIZED>()) {
82
0
                HttpChannel::send_reply(req, HttpStatus::UNAUTHORIZED, error_msg);
83
0
                return;
84
0
            } else {
85
0
                HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, error_msg);
86
0
                return;
87
0
            }
88
0
        }
89
4
    }
90
91
4
    status = check_path_is_allowed(file_param);
92
4
    if (!status.ok()) {
93
0
        std::string error_msg = status.to_string();
94
0
        if (status.is<ErrorCode::NOT_FOUND>() || status.is<ErrorCode::IO_ERROR>()) {
95
0
            HttpChannel::send_reply(req, HttpStatus::NOT_FOUND, error_msg);
96
0
            return;
97
0
        } else if (status.is<ErrorCode::NOT_AUTHORIZED>()) {
98
0
            HttpChannel::send_reply(req, HttpStatus::UNAUTHORIZED, error_msg);
99
0
            return;
100
0
        } else {
101
0
            HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, error_msg);
102
0
            return;
103
0
        }
104
0
    }
105
106
4
    bool is_dir = false;
107
4
    status = io::global_local_filesystem()->is_directory(file_param, &is_dir);
108
4
    if (!status.ok()) {
109
0
        HttpChannel::send_reply(req, status.to_string());
110
0
        return;
111
0
    }
112
113
4
    if (is_dir) {
114
1
        do_dir_response(file_param, req);
115
3
    } else {
116
3
        const auto& channel = req->param(CHANNEL_PARAMETER);
117
3
        bool ingest_binlog = (channel == CHANNEL_INGEST_BINLOG_TYPE);
118
3
        bool is_acquire_md5 = !req->param(ACQUIRE_MD5_PARAMETER).empty();
119
3
        auto* rate_limit_group = ingest_binlog ? _rate_limit_group.get() : nullptr;
120
3
        do_file_response(file_param, req, rate_limit_group, is_acquire_md5);
121
3
    }
122
4
}
123
124
0
void DownloadAction::handle_error_log(HttpRequest* req, const std::string& file_param) {
125
0
    const std::string absolute_path = _error_log_root_dir + "/" + file_param;
126
127
0
    Status status = check_log_path_is_allowed(absolute_path);
128
0
    if (!status.ok()) {
129
0
        std::string error_msg = status.to_string();
130
0
        if (status.is<ErrorCode::NOT_AUTHORIZED>()) {
131
0
            HttpChannel::send_reply(req, HttpStatus::UNAUTHORIZED, error_msg);
132
0
            return;
133
0
        } else {
134
0
            HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, error_msg);
135
0
            return;
136
0
        }
137
0
    }
138
139
0
    bool is_dir = false;
140
0
    status = io::global_local_filesystem()->is_directory(absolute_path, &is_dir);
141
0
    if (!status.ok()) {
142
0
        std::string error_msg = status.to_string();
143
0
        if (status.is<ErrorCode::NOT_FOUND>() || status.is<ErrorCode::IO_ERROR>()) {
144
0
            HttpChannel::send_reply(req, HttpStatus::NOT_FOUND, error_msg);
145
0
            return;
146
0
        } else if (status.is<ErrorCode::NOT_AUTHORIZED>()) {
147
0
            HttpChannel::send_reply(req, HttpStatus::UNAUTHORIZED, error_msg);
148
0
            return;
149
0
        } else {
150
0
            HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, error_msg);
151
0
            return;
152
0
        }
153
0
    }
154
0
    if (is_dir) {
155
0
        std::string error_msg = "error log can only be file.";
156
0
        HttpChannel::send_reply(req, error_msg);
157
0
        return;
158
0
    }
159
160
0
    do_file_response(absolute_path, req);
161
0
}
162
163
4
void DownloadAction::handle(HttpRequest* req) {
164
4
    if (_num_workers > 0) {
165
        // async for heavy download job, currently mainly for single-replica-load
166
0
        auto status = _download_workers->submit_func([this, req]() { _handle(req); });
167
0
        if (!status.ok()) {
168
0
            HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, status.to_string());
169
0
        }
170
4
    } else {
171
4
        _handle(req);
172
4
    }
173
4
}
174
175
4
void DownloadAction::_handle(HttpRequest* req) {
176
4
    VLOG_CRITICAL << "accept one download request " << req->debug_string();
177
178
    // Get 'file' parameter, then assembly file absolute path
179
4
    const std::string& file_path = req->param(FILE_PARAMETER);
180
4
    if (file_path.empty()) {
181
0
        std::string error_msg =
182
0
                std::string("parameter " + FILE_PARAMETER + " not specified in url.");
183
0
        HttpChannel::send_reply(req, error_msg);
184
0
        return;
185
0
    }
186
187
4
    if (_download_type == ERROR_LOG) {
188
0
        handle_error_log(req, file_path);
189
4
    } else if (_download_type == NORMAL) {
190
4
        handle_normal(req, file_path);
191
4
    }
192
193
4
    VLOG_CRITICAL << "deal with download request finished! ";
194
4
}
195
196
4
Status DownloadAction::check_token(HttpRequest* req) {
197
4
    const std::string& token_str = req->param(TOKEN_PARAMETER);
198
4
    if (token_str.empty()) {
199
0
        return Status::NotAuthorized("token is not specified.");
200
0
    }
201
202
4
    const std::string& local_token = _exec_env->token();
203
4
    if (token_str != local_token) {
204
0
        LOG(WARNING) << "invalid download token: " << token_str << ", local token: " << local_token;
205
0
        return Status::NotAuthorized("invalid token {}", token_str);
206
0
    }
207
208
4
    return Status::OK();
209
4
}
210
211
4
Status DownloadAction::check_path_is_allowed(const std::string& file_path) {
212
4
    DCHECK_EQ(_download_type, NORMAL);
213
214
4
    std::string canonical_file_path;
215
4
    RETURN_IF_ERROR(io::global_local_filesystem()->canonicalize(file_path, &canonical_file_path));
216
4
    for (auto& allow_path : _allow_paths) {
217
4
        if (io::LocalFileSystem::contain_path(allow_path, canonical_file_path)) {
218
4
            return Status::OK();
219
4
        }
220
4
    }
221
222
0
    return Status::NotAuthorized("file path is not allowed: {}", canonical_file_path);
223
4
}
224
225
0
Status DownloadAction::check_log_path_is_allowed(const std::string& file_path) {
226
0
    DCHECK_EQ(_download_type, ERROR_LOG);
227
228
0
    std::string canonical_file_path;
229
0
    RETURN_IF_ERROR(io::global_local_filesystem()->canonicalize(file_path, &canonical_file_path));
230
0
    if (io::LocalFileSystem::contain_path(_error_log_root_dir, canonical_file_path)) {
231
0
        return Status::OK();
232
0
    }
233
234
0
    return Status::NotAuthorized("file path is not allowed: {}", file_path);
235
0
}
236
237
} // end namespace doris