Coverage Report

Created: 2026-03-12 17:15

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/http_file_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/http_file_reader.h"
19
20
#include <curl/curl.h>
21
#include <curl/easy.h>
22
23
#include <algorithm>
24
25
#include "common/logging.h"
26
27
namespace doris::io {
28
29
Result<FileReaderSPtr> HttpFileReader::create(const std::string& url,
30
                                              const std::map<std::string, std::string>& props,
31
                                              const FileReaderOptions& opts,
32
0
                                              RuntimeProfile* /*profile*/) {
33
0
    OpenFileInfo ofi;
34
0
    ofi.path = Path(url);
35
0
    ofi.extend_info = props;
36
37
0
    auto reader = std::make_shared<HttpFileReader>(ofi, url, opts.mtime);
38
39
    // Open the file to detect Range support and validate configuration
40
0
    RETURN_IF_ERROR_RESULT(reader->open(opts));
41
42
0
    return reader;
43
0
}
44
45
HttpFileReader::HttpFileReader(const OpenFileInfo& fileInfo, std::string url, int64_t mtime)
46
0
        : _extend_kv(fileInfo.extend_info),
47
0
          _path(fileInfo.path),
48
0
          _url(std::move(url)),
49
0
          _client(std::make_unique<HttpClient>()),
50
0
          _mtime(mtime) {
51
0
    auto etag_iter = _extend_kv.find("etag");
52
0
    if (etag_iter != _extend_kv.end()) {
53
0
        _etag = etag_iter->second;
54
0
    }
55
56
0
    auto lm_iter = _extend_kv.find("last_modified");
57
0
    if (lm_iter != _extend_kv.end()) {
58
0
        _last_modified = std::stoll(lm_iter->second);
59
0
    }
60
61
0
    auto size_iter = _extend_kv.find("file_size");
62
0
    if (size_iter != _extend_kv.end()) {
63
0
        _file_size = std::stoull(size_iter->second);
64
0
        _initialized = true;
65
0
    }
66
67
    // Parse configuration for non-Range request handling
68
0
    auto enable_range_iter = _extend_kv.find("http.enable.range.request");
69
0
    if (enable_range_iter != _extend_kv.end()) {
70
        // Convert to lowercase for case-insensitive comparison
71
0
        std::string value = enable_range_iter->second;
72
0
        std::transform(value.begin(), value.end(), value.begin(), ::tolower);
73
0
        _enable_range_request = (value != "false" && value != "0");
74
0
    }
75
76
0
    auto max_size_iter = _extend_kv.find("http.max.request.size.bytes");
77
0
    if (max_size_iter != _extend_kv.end()) {
78
0
        try {
79
0
            _max_request_size_bytes = std::stoull(max_size_iter->second);
80
0
        } catch (const std::exception& _) {
81
0
            LOG(WARNING) << "Invalid http.max.request.size.bytes value: " << max_size_iter->second
82
0
                         << ", using default: " << DEFAULT_MAX_REQUEST_SIZE;
83
0
            _max_request_size_bytes = DEFAULT_MAX_REQUEST_SIZE;
84
0
        }
85
0
    }
86
87
0
    _read_buffer = std::make_unique<char[]>(READ_BUFFER_SIZE);
88
0
}
89
90
0
HttpFileReader::~HttpFileReader() {
91
0
    static_cast<void>(close());
92
0
}
93
94
0
Status HttpFileReader::open(const FileReaderOptions& opts) {
95
0
    if (_initialized) {
96
0
        return Status::OK();
97
0
    }
98
99
    // Step 1: HEAD request to get file metadata
100
0
    RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/true));
101
0
    _client->set_method(HttpMethod::HEAD);
102
0
    RETURN_IF_ERROR(_client->execute());
103
104
0
    uint64_t content_length = 0;
105
0
    RETURN_IF_ERROR(_client->get_content_length(&content_length));
106
107
0
    _file_size = content_length;
108
0
    _size_known = true;
109
110
    // Step 2: Check if Range request is disabled by configuration
111
0
    if (!_enable_range_request) {
112
        // User explicitly disabled Range requests, use non-Range mode directly
113
0
        _range_supported = false;
114
0
        LOG(INFO) << "Range requests disabled by configuration for " << _url
115
0
                  << ", using non-Range mode. File size: " << _file_size << " bytes";
116
117
        // Check if file size exceeds limit for non-Range mode
118
0
        if (_file_size > _max_request_size_bytes) {
119
0
            return Status::InternalError(
120
0
                    "Non-Range mode: file size ({} bytes) exceeds maximum allowed size ({} bytes, "
121
0
                    "configured by http.max.request.size.bytes). URL: {}",
122
0
                    _file_size, _max_request_size_bytes, _url);
123
0
        }
124
125
0
        LOG(INFO) << "Non-Range mode validated for " << _url << ", file size: " << _file_size
126
0
                  << " bytes, max allowed: " << _max_request_size_bytes << " bytes";
127
0
    } else {
128
        // Step 3: Range request is enabled (default), detect Range support
129
0
        VLOG(1) << "Detecting Range support for URL: " << _url;
130
0
        RETURN_IF_ERROR(detect_range_support());
131
132
        // Step 4: Validate Range support detection result
133
0
        if (!_range_supported) {
134
            // Server does not support Range and Range is required
135
0
            return Status::NotSupported(
136
0
                    "HTTP server does not support Range requests (RFC 7233), which is required "
137
0
                    "for reading files. File size: {} bytes, URL: {}. "
138
0
                    "To allow reading without Range support, set "
139
0
                    "'http.enable.range.request'='false' "
140
0
                    "in properties and configure 'http.max.request.size.bytes' appropriately "
141
0
                    "(note: this may cause high memory usage for large files).",
142
0
                    _file_size, _url);
143
0
        }
144
145
0
        LOG(INFO) << "HTTP server supports Range requests for " << _url;
146
0
    }
147
148
0
    _initialized = true;
149
0
    return Status::OK();
150
0
}
151
152
Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
153
0
                                    const IOContext* /*io_ctx*/) {
154
0
    VLOG(2) << "HttpFileReader::read_at_impl offset=" << offset << " size=" << result.size
155
0
            << " url=" << _url << " range_supported=" << _range_supported;
156
157
0
    if (!_read_buffer) {
158
0
        _read_buffer = std::make_unique<char[]>(READ_BUFFER_SIZE);
159
0
    }
160
161
0
    size_t to_read = result.size;
162
0
    size_t buffer_offset = 0;
163
164
0
    if (_size_known && offset >= _file_size) {
165
0
        *bytes_read = 0;
166
0
        return Status::OK();
167
0
    }
168
169
    // For non-Range mode, prioritize full file cache
170
0
    if (!_range_supported && _full_file_cached) {
171
0
        if (offset >= _full_file_cache.size()) {
172
0
            *bytes_read = 0;
173
0
            return Status::OK();
174
0
        }
175
176
0
        size_t available = _full_file_cache.size() - offset;
177
0
        size_t copy_len = std::min(available, to_read);
178
0
        std::memcpy(result.data, _full_file_cache.data() + offset, copy_len);
179
0
        *bytes_read = copy_len;
180
181
0
        VLOG(2) << "Full file cache hit: copied " << copy_len << " bytes from offset " << offset;
182
0
        return Status::OK();
183
0
    }
184
185
    // Try to serve from buffer cache
186
0
    if (offset >= _buffer_start && offset < _buffer_end) {
187
0
        size_t buffer_idx = offset - _buffer_start;
188
0
        size_t available = _buffer_end - offset;
189
0
        size_t copy_len = std::min(available, to_read);
190
191
0
        DCHECK(buffer_idx + copy_len <= READ_BUFFER_SIZE)
192
0
                << "Buffer overflow: buffer_idx=" << buffer_idx << " copy_len=" << copy_len
193
0
                << " READ_BUFFER_SIZE=" << READ_BUFFER_SIZE;
194
195
0
        std::memcpy(result.data, _read_buffer.get() + buffer_idx, copy_len);
196
0
        buffer_offset += copy_len;
197
0
        to_read -= copy_len;
198
0
        offset += copy_len;
199
200
0
        VLOG(2) << "Buffer cache hit: copied " << copy_len << " bytes";
201
0
    } else {
202
        // Buffer miss, invalidate cache
203
0
        _buffer_start = 0;
204
0
        _buffer_end = 0;
205
0
        VLOG(2) << "Buffer cache miss";
206
0
    }
207
208
0
    if (to_read == 0) {
209
0
        *bytes_read = buffer_offset;
210
0
        return Status::OK();
211
0
    }
212
213
0
    size_t remaining = to_read;
214
0
    if (_size_known) {
215
0
        uint64_t left = (_file_size > offset) ? (_file_size - offset) : 0;
216
0
        if (left == 0) {
217
0
            *bytes_read = buffer_offset;
218
0
            return Status::OK();
219
0
        }
220
0
        remaining = std::min<uint64_t>(to_read, left);
221
0
    }
222
0
    size_t req_len = (remaining > READ_BUFFER_SIZE) ? remaining : READ_BUFFER_SIZE;
223
224
0
    VLOG(2) << "Issuing HTTP GET request: offset=" << offset << " req_len=" << req_len
225
0
            << " with_range=" << _range_supported;
226
227
    // Prepare and initialize the HTTP client for GET request
228
0
    RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/false));
229
0
    _client->set_method(HttpMethod::GET);
230
231
0
    _client->set_header("Expect", "");
232
0
    _client->set_header("Connection", "close");
233
234
0
    bool with_range = _range_supported;
235
0
    if (with_range) _client->set_range(offset, req_len);
236
237
0
    std::string buf;
238
0
    buf.reserve(req_len);
239
0
    size_t total_received = 0;
240
0
    bool size_limit_exceeded = false;
241
242
0
    auto cb = [&](const void* data, size_t len) {
243
0
        total_received += len;
244
245
        // If using non-Range mode, enforce size limit to prevent OOM
246
0
        if (!_range_supported && total_received > _max_request_size_bytes) {
247
0
            size_limit_exceeded = true;
248
0
            VLOG(1) << "Stopping download: received " << total_received << " bytes, exceeds limit "
249
0
                    << _max_request_size_bytes;
250
0
            return false; // Stop receiving - this will cause CURL to return an error
251
0
        }
252
253
0
        buf.append(reinterpret_cast<const char*>(data), len);
254
0
        return true;
255
0
    };
256
257
0
    Status exec_status = _client->execute(cb);
258
259
    // Check if we stopped due to size limit - this is expected behavior
260
0
    if (size_limit_exceeded) {
261
0
        return Status::InternalError(
262
0
                "HTTP response too large: received {} bytes, exceeds maximum allowed size {} "
263
0
                "bytes (configured by max.request.size.bytes). URL: {}",
264
0
                total_received, _max_request_size_bytes, _url);
265
0
    }
266
267
    // If there's an error and it's not due to our size limit check, return it
268
0
    RETURN_IF_ERROR(exec_status);
269
270
0
    long http_status = _client->get_http_status();
271
0
    VLOG(2) << "HTTP response: status=" << http_status << " received_bytes=" << buf.size();
272
273
0
    if (buf.empty()) {
274
0
        *bytes_read = buffer_offset;
275
0
        return Status::OK();
276
0
    }
277
278
    // Defensive check: if we sent Range but server returned 200 instead of 206
279
    // This should rarely happen since we detect Range support in open()
280
0
    if (with_range && offset > 0 && http_status == 200) {
281
0
        LOG(ERROR) << "HTTP server unexpectedly does not support Range requests for " << _url
282
0
                   << " (this should have been detected in open()). HTTP status: " << http_status
283
0
                   << ", received: " << buf.size()
284
0
                   << " bytes. This indicates a server behavior change.";
285
286
0
        return Status::InternalError(
287
0
                "HTTP server does not support Range requests but this was not detected during "
288
0
                "file open. This may indicate the server behavior has changed. "
289
0
                "HTTP status: {}, received: {} bytes. URL: {}",
290
0
                http_status, buf.size(), _url);
291
0
    }
292
293
    // Handle non-Range mode: cache the full file on first download
294
0
    if (!_range_supported && !_full_file_cached) {
295
        // Cache the complete file content for subsequent reads
296
0
        _full_file_cache = std::move(buf);
297
0
        _full_file_cached = true;
298
299
0
        VLOG(2) << "Cached full file: " << _full_file_cache.size() << " bytes";
300
301
        // Serve the requested portion from cache
302
0
        if (offset >= _full_file_cache.size()) {
303
0
            *bytes_read = buffer_offset;
304
0
            return Status::OK();
305
0
        }
306
307
0
        size_t available = _full_file_cache.size() - offset;
308
0
        size_t copy_len = std::min(available, remaining);
309
0
        std::memcpy(result.data + buffer_offset, _full_file_cache.data() + offset, copy_len);
310
0
        *bytes_read = buffer_offset + copy_len;
311
0
        return Status::OK();
312
0
    }
313
314
0
    if (to_read > READ_BUFFER_SIZE) {
315
0
        if (buf.size() > remaining) {
316
0
            return Status::InternalError("HTTP response larger than requested buffer");
317
0
        }
318
0
        std::memcpy(result.data + buffer_offset, buf.data(), buf.size());
319
0
        buffer_offset += buf.size();
320
0
    } else {
321
0
        size_t cached = std::min(buf.size(), (size_t)READ_BUFFER_SIZE);
322
0
        std::memcpy(_read_buffer.get(), buf.data(), cached);
323
0
        _buffer_start = offset;
324
0
        _buffer_end = offset + cached;
325
326
0
        size_t copy_len = std::min(remaining, cached);
327
0
        std::memcpy(result.data + buffer_offset, _read_buffer.get(), copy_len);
328
0
        buffer_offset += copy_len;
329
0
    }
330
331
0
    if (!_size_known && with_range && buf.size() < req_len) {
332
0
        _size_known = true;
333
0
        _file_size = offset + buf.size();
334
0
    }
335
336
0
    *bytes_read = buffer_offset;
337
0
    return Status::OK();
338
0
}
339
340
0
Status HttpFileReader::close() {
341
0
    if (_closed.exchange(true)) {
342
0
        return Status::OK();
343
0
    }
344
345
    // Release buffer memory (1MB)
346
0
    _read_buffer.reset();
347
0
    _buffer_start = 0;
348
0
    _buffer_end = 0;
349
350
    // Release full file cache
351
0
    _full_file_cache.clear();
352
0
    _full_file_cache.shrink_to_fit();
353
0
    _full_file_cached = false;
354
355
    // Release HttpClient resources
356
0
    _client.reset();
357
358
0
    return Status::OK();
359
0
}
360
361
0
Status HttpFileReader::prepare_client(bool set_fail_on_error) {
362
0
    if (!_client) {
363
0
        return Status::InternalError("HttpClient is not initialized");
364
0
    }
365
366
    // Initialize the HTTP client with URL
367
0
    RETURN_IF_ERROR(_client->init(_url, set_fail_on_error));
368
369
    // Set custom headers from extend_kv
370
0
    for (const auto& kv : _extend_kv) {
371
0
        if (kv.first.rfind("http.header.", 0) == 0) {
372
0
            _client->set_header(kv.first.substr(strlen("http.header.")), kv.second);
373
0
        }
374
0
    }
375
376
0
    return Status::OK();
377
0
}
378
379
0
Status HttpFileReader::detect_range_support() {
380
    // Send a small Range request to test if the server supports it
381
    // We request only the first byte to minimize data transfer
382
0
    RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/false));
383
0
    _client->set_method(HttpMethod::GET);
384
0
    _client->set_range(0, 1); // Request only the first byte
385
386
0
    std::string test_buf;
387
0
    size_t received = 0;
388
0
    constexpr size_t MAX_TEST_SIZE = 10240; // 10KB max for test
389
0
    bool stopped_by_limit = false;
390
391
0
    auto cb = [&](const void* data, size_t len) {
392
0
        received += len;
393
        // Limit test data to prevent downloading too much
394
0
        if (received > MAX_TEST_SIZE) {
395
0
            stopped_by_limit = true;
396
0
            VLOG(2) << "Stopping Range detection test after receiving " << received << " bytes";
397
0
            return false; // This will cause CURL to stop with an error
398
0
        }
399
0
        test_buf.append(reinterpret_cast<const char*>(data), len);
400
0
        return true;
401
0
    };
402
403
0
    Status exec_status = _client->execute(cb);
404
405
    // If we stopped because of size limit, it's not a real error
406
0
    if (!exec_status.ok() && stopped_by_limit) {
407
0
        VLOG(1) << "Range detection stopped at size limit (expected): " << exec_status.to_string();
408
        // Continue processing - this is expected behavior
409
0
    } else if (!exec_status.ok()) {
410
        // Real error
411
0
        return exec_status;
412
0
    }
413
414
0
    long http_status = _client->get_http_status();
415
416
0
    if (http_status == 206) {
417
        // HTTP 206 Partial Content - server supports Range requests
418
0
        _range_supported = true;
419
0
        VLOG(1) << "Range support detected (HTTP 206) for " << _url << ", received "
420
0
                << test_buf.size() << " bytes";
421
0
    } else if (http_status == 200) {
422
        // HTTP 200 OK - server does not support Range requests
423
        // It returned the full file (or a large portion)
424
0
        _range_supported = false;
425
0
        VLOG(1) << "Range not supported (HTTP 200) for " << _url << ", received " << test_buf.size()
426
0
                << " bytes in test";
427
428
        // If we received a lot of data, it's likely the full file
429
0
        if (test_buf.size() >= MAX_TEST_SIZE || stopped_by_limit) {
430
0
            LOG(WARNING) << "Server returned " << received << "+ bytes for Range test, "
431
0
                         << "indicating no Range support for " << _url;
432
0
        }
433
0
    } else {
434
        // Unexpected status code
435
0
        LOG(WARNING) << "Unexpected HTTP status " << http_status << " during Range detection for "
436
0
                     << _url << ", assuming Range is not supported";
437
0
        _range_supported = false;
438
0
    }
439
440
0
    return Status::OK();
441
0
}
442
443
} // namespace doris::io