Coverage Report

Created: 2026-03-24 20:17

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/http_file_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/http_file_reader.h"
19
20
#include <curl/curl.h>
21
#include <curl/easy.h>
22
23
#include <algorithm>
24
25
#include "common/config.h"
26
#include "common/logging.h"
27
#include "gen_cpp/internal_service.pb.h"
28
#include "runtime/cdc_client_mgr.h"
29
#include "runtime/exec_env.h"
30
31
namespace doris::io {
32
33
Result<FileReaderSPtr> HttpFileReader::create(const std::string& url,
34
                                              const std::map<std::string, std::string>& props,
35
                                              const FileReaderOptions& opts,
36
0
                                              RuntimeProfile* /*profile*/) {
37
0
    OpenFileInfo ofi;
38
0
    ofi.path = Path(url);
39
0
    ofi.extend_info = props;
40
41
0
    auto reader = std::make_shared<HttpFileReader>(ofi, url, opts.mtime);
42
43
    // Open the file to detect Range support and validate configuration
44
0
    RETURN_IF_ERROR_RESULT(reader->open(opts));
45
46
0
    return reader;
47
0
}
48
49
HttpFileReader::HttpFileReader(const OpenFileInfo& fileInfo, std::string url, int64_t mtime)
50
0
        : _extend_kv(fileInfo.extend_info),
51
0
          _path(fileInfo.path),
52
0
          _url(std::move(url)),
53
0
          _client(std::make_unique<HttpClient>()),
54
0
          _mtime(mtime) {
55
0
    auto etag_iter = _extend_kv.find("etag");
56
0
    if (etag_iter != _extend_kv.end()) {
57
0
        _etag = etag_iter->second;
58
0
    }
59
60
0
    auto lm_iter = _extend_kv.find("last_modified");
61
0
    if (lm_iter != _extend_kv.end()) {
62
0
        _last_modified = std::stoll(lm_iter->second);
63
0
    }
64
65
0
    auto size_iter = _extend_kv.find("file_size");
66
0
    if (size_iter != _extend_kv.end()) {
67
0
        _file_size = std::stoull(size_iter->second);
68
0
        _initialized = true;
69
0
    }
70
71
    // Parse configuration for non-Range request handling
72
0
    auto enable_range_iter = _extend_kv.find("http.enable.range.request");
73
0
    if (enable_range_iter != _extend_kv.end()) {
74
        // Convert to lowercase for case-insensitive comparison
75
0
        std::string value = enable_range_iter->second;
76
0
        std::transform(value.begin(), value.end(), value.begin(), ::tolower);
77
0
        _enable_range_request = (value != "false" && value != "0");
78
0
    }
79
80
0
    auto max_size_iter = _extend_kv.find("http.max.request.size.bytes");
81
0
    if (max_size_iter != _extend_kv.end()) {
82
0
        try {
83
0
            _max_request_size_bytes = std::stoull(max_size_iter->second);
84
0
        } catch (const std::exception& _) {
85
0
            LOG(WARNING) << "Invalid http.max.request.size.bytes value: " << max_size_iter->second
86
0
                         << ", using default: " << DEFAULT_MAX_REQUEST_SIZE;
87
0
            _max_request_size_bytes = DEFAULT_MAX_REQUEST_SIZE;
88
0
        }
89
0
    }
90
91
    // Parse chunk response configuration; chunk response implies no Range support
92
0
    auto chunk_iter = _extend_kv.find("http.enable.chunk.response");
93
0
    if (chunk_iter != _extend_kv.end()) {
94
0
        std::string value = chunk_iter->second;
95
0
        std::transform(value.begin(), value.end(), value.begin(), ::tolower);
96
0
        _enable_chunk_response = (value == "true" || value == "1");
97
0
        if (_enable_chunk_response) {
98
0
            _range_supported = false;
99
0
        }
100
0
    }
101
102
0
    _read_buffer = std::make_unique<char[]>(READ_BUFFER_SIZE);
103
0
}
104
105
0
HttpFileReader::~HttpFileReader() {
106
0
    static_cast<void>(close());
107
0
}
108
109
0
Status HttpFileReader::open(const FileReaderOptions& opts) {
110
    // CDC client setup must run before the _initialized guard
111
0
    auto enable_cdc_iter = _extend_kv.find("enable_cdc_client");
112
0
    if (enable_cdc_iter != _extend_kv.end() && enable_cdc_iter->second == "true") {
113
0
        LOG(INFO) << "CDC client is enabled, starting CDC client for " << _url;
114
0
        ExecEnv* env = ExecEnv::GetInstance();
115
0
        if (env == nullptr || env->cdc_client_mgr() == nullptr) {
116
0
            return Status::InternalError("ExecEnv or CdcClientMgr is not initialized");
117
0
        }
118
119
0
        PRequestCdcClientResult result;
120
0
        Status start_st = env->cdc_client_mgr()->start_cdc_client(&result);
121
0
        if (!start_st.ok()) {
122
0
            LOG(ERROR) << "Failed to start CDC client, status=" << start_st.to_string();
123
0
            return start_st;
124
0
        }
125
126
        // Replace CDC_CLIENT_PORT placeholder with actual CDC client port
127
0
        const std::string placeholder = "CDC_CLIENT_PORT";
128
0
        size_t pos = _url.find(placeholder);
129
0
        if (pos != std::string::npos) {
130
0
            _url.replace(pos, placeholder.size(), std::to_string(doris::config::cdc_client_port));
131
0
        }
132
0
        LOG(INFO) << "CDC client started successfully for " << _url;
133
0
    }
134
135
    // Skip metadata detection when file size was pre-supplied by the caller.
136
0
    if (_initialized) {
137
0
        return Status::OK();
138
0
    }
139
140
    // Step 1: HEAD request to get file metadata (skip for chunk response)
141
0
    if (_enable_chunk_response) {
142
        // Chunk streaming response: size is unknown until the stream completes.
143
        // _range_supported is already false (set in constructor).
144
0
        _size_known = false;
145
        // Reset _file_size from the SIZE_MAX default to 0 so that any caller of
146
        // size() (e.g. NewJsonReader::_read_one_message) does not attempt to
147
        // allocate SIZE_MAX bytes before the download completes.
148
0
        _file_size = 0;
149
0
        LOG(INFO) << "Chunk response mode enabled, skipping HEAD request for " << _url;
150
0
    } else {
151
        // Normal mode: execute HEAD request to get file metadata
152
0
        RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/true));
153
0
        _client->set_method(HttpMethod::HEAD);
154
0
        RETURN_IF_ERROR(_client->execute());
155
156
0
        uint64_t content_length = 0;
157
0
        RETURN_IF_ERROR(_client->get_content_length(&content_length));
158
159
0
        _file_size = content_length;
160
0
        _size_known = true;
161
0
    }
162
163
    // Step 2: Check if Range request is disabled by configuration.
164
    // Chunk response mode always has _range_supported=false (set in constructor), so only
165
    // the non-chunk non-Range path needs the file size guard.
166
0
    if (_enable_chunk_response) {
167
        // Nothing to do: _range_supported already false, size check not applicable
168
0
    } else if (!_enable_range_request) {
169
0
        _range_supported = false;
170
0
        LOG(INFO) << "Range requests disabled by configuration for " << _url;
171
0
        if (_file_size > _max_request_size_bytes) {
172
0
            return Status::InternalError(
173
0
                    "Non-Range mode: file size ({} bytes) exceeds maximum allowed size ({} "
174
0
                    "bytes, configured by http.max.request.size.bytes). URL: {}",
175
0
                    _file_size, _max_request_size_bytes, _url);
176
0
        }
177
0
        LOG(INFO) << "Non-Range mode validated for " << _url << ", file size: " << _file_size
178
0
                  << " bytes, max allowed: " << _max_request_size_bytes << " bytes";
179
0
    } else {
180
        // Step 3: Range request is enabled (default), detect Range support
181
0
        VLOG(1) << "Detecting Range support for URL: " << _url;
182
0
        RETURN_IF_ERROR(detect_range_support());
183
184
        // Step 4: Validate Range support detection result
185
0
        if (!_range_supported) {
186
            // Server does not support Range and Range is required
187
0
            return Status::NotSupported(
188
0
                    "HTTP server does not support Range requests (RFC 7233), which is required "
189
0
                    "for reading files. File size: {} bytes, URL: {}. "
190
0
                    "To allow reading without Range support, set "
191
0
                    "'http.enable.range.request'='false' "
192
0
                    "in properties and configure 'http.max.request.size.bytes' appropriately "
193
0
                    "(note: this may cause high memory usage for large files).",
194
0
                    _file_size, _url);
195
0
        }
196
197
0
        LOG(INFO) << "HTTP server supports Range requests for " << _url;
198
0
    }
199
200
0
    _initialized = true;
201
0
    return Status::OK();
202
0
}
203
204
Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
205
0
                                    const IOContext* /*io_ctx*/) {
206
0
    VLOG(2) << "HttpFileReader::read_at_impl offset=" << offset << " size=" << result.size
207
0
            << " url=" << _url << " range_supported=" << _range_supported;
208
209
0
    if (!_read_buffer) {
210
0
        _read_buffer = std::make_unique<char[]>(READ_BUFFER_SIZE);
211
0
    }
212
213
0
    size_t to_read = result.size;
214
0
    size_t buffer_offset = 0;
215
216
0
    if (_size_known && offset >= _file_size) {
217
0
        *bytes_read = 0;
218
0
        return Status::OK();
219
0
    }
220
221
    // For non-Range mode, prioritize full file cache
222
0
    if (!_range_supported && _full_file_cached) {
223
0
        if (offset >= _full_file_cache.size()) {
224
0
            *bytes_read = 0;
225
0
            return Status::OK();
226
0
        }
227
228
0
        size_t available = _full_file_cache.size() - offset;
229
0
        size_t copy_len = std::min(available, to_read);
230
0
        std::memcpy(result.data, _full_file_cache.data() + offset, copy_len);
231
0
        *bytes_read = copy_len;
232
233
0
        VLOG(2) << "Full file cache hit: copied " << copy_len << " bytes from offset " << offset;
234
0
        return Status::OK();
235
0
    }
236
237
    // Try to serve from buffer cache
238
0
    if (offset >= _buffer_start && offset < _buffer_end) {
239
0
        size_t buffer_idx = offset - _buffer_start;
240
0
        size_t available = _buffer_end - offset;
241
0
        size_t copy_len = std::min(available, to_read);
242
243
0
        DCHECK(buffer_idx + copy_len <= READ_BUFFER_SIZE)
244
0
                << "Buffer overflow: buffer_idx=" << buffer_idx << " copy_len=" << copy_len
245
0
                << " READ_BUFFER_SIZE=" << READ_BUFFER_SIZE;
246
247
0
        std::memcpy(result.data, _read_buffer.get() + buffer_idx, copy_len);
248
0
        buffer_offset += copy_len;
249
0
        to_read -= copy_len;
250
0
        offset += copy_len;
251
252
0
        VLOG(2) << "Buffer cache hit: copied " << copy_len << " bytes";
253
0
    } else {
254
        // Buffer miss, invalidate cache
255
0
        _buffer_start = 0;
256
0
        _buffer_end = 0;
257
0
        VLOG(2) << "Buffer cache miss";
258
0
    }
259
260
0
    if (to_read == 0) {
261
0
        *bytes_read = buffer_offset;
262
0
        return Status::OK();
263
0
    }
264
265
0
    size_t remaining = to_read;
266
0
    if (_size_known) {
267
0
        uint64_t left = (_file_size > offset) ? (_file_size - offset) : 0;
268
0
        if (left == 0) {
269
0
            *bytes_read = buffer_offset;
270
0
            return Status::OK();
271
0
        }
272
0
        remaining = std::min<uint64_t>(to_read, left);
273
0
    }
274
0
    size_t req_len = (remaining > READ_BUFFER_SIZE) ? remaining : READ_BUFFER_SIZE;
275
276
0
    VLOG(2) << "Issuing HTTP GET request: offset=" << offset << " req_len=" << req_len
277
0
            << " with_range=" << _range_supported;
278
279
    // Prepare and initialize the HTTP client for request
280
0
    RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/false));
281
282
    // Determine HTTP method from configuration (default: GET)
283
0
    HttpMethod method = HttpMethod::GET;
284
0
    auto method_iter = _extend_kv.find("http.method");
285
0
    if (method_iter != _extend_kv.end()) {
286
0
        method = to_http_method(method_iter->second.c_str());
287
0
        if (method == HttpMethod::UNKNOWN) {
288
0
            LOG(WARNING) << "Invalid http.method value: " << method_iter->second
289
0
                         << ", falling back to GET";
290
0
            method = HttpMethod::GET;
291
0
        }
292
0
    }
293
0
    _client->set_method(method);
294
295
    // Set payload if configured (supports POST, PUT, DELETE, etc.)
296
0
    auto payload_iter = _extend_kv.find("http.payload");
297
0
    if (payload_iter != _extend_kv.end() && !payload_iter->second.empty()) {
298
0
        _client->set_payload(payload_iter->second);
299
0
        _client->set_content_type("application/json");
300
0
        VLOG(2) << "HTTP request with payload, size=" << payload_iter->second.size();
301
0
    }
302
303
0
    _client->set_header("Expect", "");
304
0
    _client->set_header("Connection", "close");
305
306
0
    bool with_range = _range_supported;
307
0
    if (with_range) _client->set_range(offset, req_len);
308
309
0
    std::string buf;
310
0
    buf.reserve(req_len);
311
0
    size_t total_received = 0;
312
0
    bool size_limit_exceeded = false;
313
314
0
    auto cb = [&](const void* data, size_t len) {
315
0
        total_received += len;
316
317
        // If using non-Range mode, enforce size limit to prevent OOM
318
0
        if (!_range_supported && total_received > _max_request_size_bytes) {
319
0
            size_limit_exceeded = true;
320
0
            VLOG(1) << "Stopping download: received " << total_received << " bytes, exceeds limit "
321
0
                    << _max_request_size_bytes;
322
0
            return false; // Stop receiving - this will cause CURL to return an error
323
0
        }
324
325
0
        buf.append(reinterpret_cast<const char*>(data), len);
326
0
        return true;
327
0
    };
328
329
0
    Status exec_status = _client->execute(cb);
330
331
    // Check if we stopped due to size limit - this is expected behavior
332
0
    if (size_limit_exceeded) {
333
0
        return Status::InternalError(
334
0
                "HTTP response too large: received {} bytes, exceeds maximum allowed size {} "
335
0
                "bytes (configured by max.request.size.bytes). URL: {}",
336
0
                total_received, _max_request_size_bytes, _url);
337
0
    }
338
339
    // If there's an error and it's not due to our size limit check, return it
340
0
    RETURN_IF_ERROR(exec_status);
341
342
0
    long http_status = _client->get_http_status();
343
0
    VLOG(2) << "HTTP response: status=" << http_status << " received_bytes=" << buf.size();
344
345
    // Check for HTTP error status codes (4xx, 5xx)
346
0
    if (http_status >= 400) {
347
0
        std::string error_body;
348
0
        if (buf.empty()) {
349
0
            error_body = "(empty response body)";
350
0
        } else {
351
            // Limit error message to 1024 bytes to avoid excessive logging
352
0
            size_t max_len = std::min(buf.size(), static_cast<size_t>(1024));
353
0
            error_body = buf.substr(0, max_len);
354
0
        }
355
356
0
        return Status::InternalError("HTTP request failed with status {}: {}.", http_status,
357
0
                                     error_body);
358
0
    }
359
360
0
    if (buf.empty()) {
361
0
        *bytes_read = buffer_offset;
362
0
        return Status::OK();
363
0
    }
364
365
    // Defensive check: if we sent Range but server returned 200 instead of 206
366
    // This should rarely happen since we detect Range support in open()
367
0
    if (with_range && offset > 0 && http_status == 200) {
368
0
        LOG(ERROR) << "HTTP server unexpectedly does not support Range requests for " << _url
369
0
                   << " (this should have been detected in open()). HTTP status: " << http_status
370
0
                   << ", received: " << buf.size()
371
0
                   << " bytes. This indicates a server behavior change.";
372
373
0
        return Status::InternalError(
374
0
                "HTTP server does not support Range requests but this was not detected during "
375
0
                "file open. This may indicate the server behavior has changed. "
376
0
                "HTTP status: {}, received: {} bytes. URL: {}",
377
0
                http_status, buf.size(), _url);
378
0
    }
379
380
    // Handle non-Range mode: cache the full file on first download
381
0
    if (!_range_supported && !_full_file_cached) {
382
        // Cache the complete file content for subsequent reads
383
0
        _full_file_cache = std::move(buf);
384
0
        _full_file_cached = true;
385
        // Now that the full content is in hand, update _file_size to the actual
386
        // byte count. This replaces the 0 placeholder set in open() for chunk
387
        // response mode, so subsequent calls to size() return a correct value.
388
0
        _file_size = _full_file_cache.size();
389
0
        _size_known = true;
390
391
0
        VLOG(2) << "Cached full file: " << _full_file_cache.size() << " bytes";
392
393
        // Serve the requested portion from cache
394
0
        if (offset >= _full_file_cache.size()) {
395
0
            *bytes_read = buffer_offset;
396
0
            return Status::OK();
397
0
        }
398
399
0
        size_t available = _full_file_cache.size() - offset;
400
0
        size_t copy_len = std::min(available, remaining);
401
0
        std::memcpy(result.data + buffer_offset, _full_file_cache.data() + offset, copy_len);
402
0
        *bytes_read = buffer_offset + copy_len;
403
0
        return Status::OK();
404
0
    }
405
406
0
    if (to_read > READ_BUFFER_SIZE) {
407
0
        if (buf.size() > remaining) {
408
0
            return Status::InternalError("HTTP response larger than requested buffer");
409
0
        }
410
0
        std::memcpy(result.data + buffer_offset, buf.data(), buf.size());
411
0
        buffer_offset += buf.size();
412
0
    } else {
413
0
        size_t cached = std::min(buf.size(), (size_t)READ_BUFFER_SIZE);
414
0
        std::memcpy(_read_buffer.get(), buf.data(), cached);
415
0
        _buffer_start = offset;
416
0
        _buffer_end = offset + cached;
417
418
0
        size_t copy_len = std::min(remaining, cached);
419
0
        std::memcpy(result.data + buffer_offset, _read_buffer.get(), copy_len);
420
0
        buffer_offset += copy_len;
421
0
    }
422
423
0
    if (!_size_known && with_range && buf.size() < req_len) {
424
0
        _size_known = true;
425
0
        _file_size = offset + buf.size();
426
0
    }
427
428
0
    *bytes_read = buffer_offset;
429
0
    return Status::OK();
430
0
}
431
432
0
Status HttpFileReader::close() {
433
0
    if (_closed.exchange(true)) {
434
0
        return Status::OK();
435
0
    }
436
437
    // Release buffer memory (1MB)
438
0
    _read_buffer.reset();
439
0
    _buffer_start = 0;
440
0
    _buffer_end = 0;
441
442
    // Release full file cache
443
0
    _full_file_cache.clear();
444
0
    _full_file_cache.shrink_to_fit();
445
0
    _full_file_cached = false;
446
447
    // Release HttpClient resources
448
0
    _client.reset();
449
450
0
    return Status::OK();
451
0
}
452
453
0
Status HttpFileReader::prepare_client(bool set_fail_on_error) {
454
0
    if (!_client) {
455
0
        return Status::InternalError("HttpClient is not initialized");
456
0
    }
457
458
    // Initialize the HTTP client with URL
459
0
    RETURN_IF_ERROR(_client->init(_url, set_fail_on_error));
460
461
    // Set custom headers from extend_kv
462
0
    for (const auto& kv : _extend_kv) {
463
0
        if (kv.first.rfind("http.header.", 0) == 0) {
464
0
            _client->set_header(kv.first.substr(strlen("http.header.")), kv.second);
465
0
        }
466
0
    }
467
468
0
    return Status::OK();
469
0
}
470
471
0
Status HttpFileReader::detect_range_support() {
472
    // Send a small Range request to test if the server supports it
473
    // We request only the first byte to minimize data transfer
474
0
    RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/false));
475
0
    _client->set_method(HttpMethod::GET);
476
0
    _client->set_range(0, 1); // Request only the first byte
477
478
0
    std::string test_buf;
479
0
    size_t received = 0;
480
0
    constexpr size_t MAX_TEST_SIZE = 10240; // 10KB max for test
481
0
    bool stopped_by_limit = false;
482
483
0
    auto cb = [&](const void* data, size_t len) {
484
0
        received += len;
485
        // Limit test data to prevent downloading too much
486
0
        if (received > MAX_TEST_SIZE) {
487
0
            stopped_by_limit = true;
488
0
            VLOG(2) << "Stopping Range detection test after receiving " << received << " bytes";
489
0
            return false; // This will cause CURL to stop with an error
490
0
        }
491
0
        test_buf.append(reinterpret_cast<const char*>(data), len);
492
0
        return true;
493
0
    };
494
495
0
    Status exec_status = _client->execute(cb);
496
497
    // If we stopped because of size limit, it's not a real error
498
0
    if (!exec_status.ok() && stopped_by_limit) {
499
0
        VLOG(1) << "Range detection stopped at size limit (expected): " << exec_status.to_string();
500
        // Continue processing - this is expected behavior
501
0
    } else if (!exec_status.ok()) {
502
        // Real error
503
0
        return exec_status;
504
0
    }
505
506
0
    long http_status = _client->get_http_status();
507
508
0
    if (http_status == 206) {
509
        // HTTP 206 Partial Content - server supports Range requests
510
0
        _range_supported = true;
511
0
        VLOG(1) << "Range support detected (HTTP 206) for " << _url << ", received "
512
0
                << test_buf.size() << " bytes";
513
0
    } else if (http_status == 200) {
514
        // HTTP 200 OK - server does not support Range requests
515
        // It returned the full file (or a large portion)
516
0
        _range_supported = false;
517
0
        VLOG(1) << "Range not supported (HTTP 200) for " << _url << ", received " << test_buf.size()
518
0
                << " bytes in test";
519
520
        // If we received a lot of data, it's likely the full file
521
0
        if (test_buf.size() >= MAX_TEST_SIZE || stopped_by_limit) {
522
0
            LOG(WARNING) << "Server returned " << received << "+ bytes for Range test, "
523
0
                         << "indicating no Range support for " << _url;
524
0
        }
525
0
    } else {
526
        // Unexpected status code
527
0
        LOG(WARNING) << "Unexpected HTTP status " << http_status << " during Range detection for "
528
0
                     << _url << ", assuming Range is not supported";
529
0
        _range_supported = false;
530
0
    }
531
532
0
    return Status::OK();
533
0
}
534
535
} // namespace doris::io