Coverage Report

Created: 2026-03-25 14:36

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/http_file_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/http_file_reader.h"
19
20
#include <curl/curl.h>
21
#include <curl/easy.h>
22
23
#include <algorithm>
24
25
#include "common/config.h"
26
#include "common/logging.h"
27
#include "gen_cpp/internal_service.pb.h"
28
#include "runtime/cdc_client_mgr.h"
29
#include "runtime/exec_env.h"
30
31
namespace doris::io {
32
33
Result<FileReaderSPtr> HttpFileReader::create(const std::string& url,
34
                                              const std::map<std::string, std::string>& props,
35
                                              const FileReaderOptions& opts,
36
14
                                              RuntimeProfile* /*profile*/) {
37
14
    OpenFileInfo ofi;
38
14
    ofi.path = Path(url);
39
14
    ofi.extend_info = props;
40
41
14
    auto reader = std::make_shared<HttpFileReader>(ofi, url, opts.mtime);
42
43
    // Open the file to detect Range support and validate configuration
44
14
    RETURN_IF_ERROR_RESULT(reader->open(opts));
45
46
14
    return reader;
47
14
}
48
49
HttpFileReader::HttpFileReader(const OpenFileInfo& fileInfo, std::string url, int64_t mtime)
50
14
        : _extend_kv(fileInfo.extend_info),
51
14
          _path(fileInfo.path),
52
14
          _url(std::move(url)),
53
14
          _client(std::make_unique<HttpClient>()),
54
14
          _mtime(mtime) {
55
14
    auto etag_iter = _extend_kv.find("etag");
56
14
    if (etag_iter != _extend_kv.end()) {
57
0
        _etag = etag_iter->second;
58
0
    }
59
60
14
    auto lm_iter = _extend_kv.find("last_modified");
61
14
    if (lm_iter != _extend_kv.end()) {
62
0
        _last_modified = std::stoll(lm_iter->second);
63
0
    }
64
65
14
    auto size_iter = _extend_kv.find("file_size");
66
14
    if (size_iter != _extend_kv.end()) {
67
0
        _file_size = std::stoull(size_iter->second);
68
0
        _initialized = true;
69
0
    }
70
71
    // Parse configuration for non-Range request handling
72
14
    auto enable_range_iter = _extend_kv.find("http.enable.range.request");
73
14
    if (enable_range_iter != _extend_kv.end()) {
74
        // Convert to lowercase for case-insensitive comparison
75
14
        std::string value = enable_range_iter->second;
76
14
        std::transform(value.begin(), value.end(), value.begin(), ::tolower);
77
14
        _enable_range_request = (value != "false" && value != "0");
78
14
    }
79
80
14
    auto max_size_iter = _extend_kv.find("http.max.request.size.bytes");
81
14
    if (max_size_iter != _extend_kv.end()) {
82
0
        try {
83
0
            _max_request_size_bytes = std::stoull(max_size_iter->second);
84
0
        } catch (const std::exception& _) {
85
0
            LOG(WARNING) << "Invalid http.max.request.size.bytes value: " << max_size_iter->second
86
0
                         << ", using default: " << DEFAULT_MAX_REQUEST_SIZE;
87
0
            _max_request_size_bytes = DEFAULT_MAX_REQUEST_SIZE;
88
0
        }
89
0
    }
90
91
    // Parse chunk response configuration; chunk response implies no Range support
92
14
    auto chunk_iter = _extend_kv.find("http.enable.chunk.response");
93
14
    if (chunk_iter != _extend_kv.end()) {
94
14
        std::string value = chunk_iter->second;
95
14
        std::transform(value.begin(), value.end(), value.begin(), ::tolower);
96
14
        _enable_chunk_response = (value == "true" || value == "1");
97
14
        if (_enable_chunk_response) {
98
14
            _range_supported = false;
99
14
        }
100
14
    }
101
102
14
    _read_buffer = std::make_unique<char[]>(READ_BUFFER_SIZE);
103
14
}
104
105
14
HttpFileReader::~HttpFileReader() {
106
14
    static_cast<void>(close());
107
14
}
108
109
14
Status HttpFileReader::setup_cdc_client() {
110
14
    auto enable_cdc_iter = _extend_kv.find("enable_cdc_client");
111
14
    if (enable_cdc_iter == _extend_kv.end() || enable_cdc_iter->second != "true") {
112
0
        return Status::OK();
113
0
    }
114
115
14
    LOG(INFO) << "CDC client is enabled, starting CDC client for " << _url;
116
14
    ExecEnv* env = ExecEnv::GetInstance();
117
14
    if (env == nullptr || env->cdc_client_mgr() == nullptr) {
118
0
        return Status::InternalError("ExecEnv or CdcClientMgr is not initialized");
119
0
    }
120
121
14
    PRequestCdcClientResult result;
122
14
    Status start_st = env->cdc_client_mgr()->start_cdc_client(&result);
123
14
    if (!start_st.ok()) {
124
0
        LOG(ERROR) << "Failed to start CDC client, status=" << start_st.to_string();
125
0
        return start_st;
126
0
    }
127
128
    // Replace CDC_CLIENT_PORT placeholder with actual CDC client port
129
14
    const std::string placeholder = "CDC_CLIENT_PORT";
130
14
    size_t pos = _url.find(placeholder);
131
14
    if (pos != std::string::npos) {
132
14
        _url.replace(pos, placeholder.size(), std::to_string(doris::config::cdc_client_port));
133
14
    }
134
14
    LOG(INFO) << "CDC client started successfully for " << _url;
135
14
    return Status::OK();
136
14
}
137
138
14
Status HttpFileReader::open(const FileReaderOptions& opts) {
139
    // CDC client setup must run before the _initialized guard.
140
    // See setup_cdc_client() for lifecycle details.
141
14
    RETURN_IF_ERROR(setup_cdc_client());
142
143
    // Skip metadata detection when file size was pre-supplied by the caller.
144
14
    if (_initialized) {
145
0
        return Status::OK();
146
0
    }
147
148
    // Step 1: HEAD request to get file metadata (skip for chunk response)
149
14
    if (_enable_chunk_response) {
150
        // Chunk streaming response: size is unknown until the stream completes.
151
        // _range_supported is already false (set in constructor).
152
14
        _size_known = false;
153
        // Reset _file_size from the SIZE_MAX default to 0 so that any caller of
154
        // size() (e.g. NewJsonReader::_read_one_message) does not attempt to
155
        // allocate SIZE_MAX bytes before the download completes.
156
14
        _file_size = 0;
157
14
        LOG(INFO) << "Chunk response mode enabled, skipping HEAD request for " << _url;
158
14
    } else {
159
        // Normal mode: execute HEAD request to get file metadata
160
0
        RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/true));
161
0
        _client->set_method(HttpMethod::HEAD);
162
0
        RETURN_IF_ERROR(_client->execute());
163
164
0
        uint64_t content_length = 0;
165
0
        RETURN_IF_ERROR(_client->get_content_length(&content_length));
166
167
0
        _file_size = content_length;
168
0
        _size_known = true;
169
0
    }
170
171
    // Step 2: Check if Range request is disabled by configuration.
172
    // Chunk response mode always has _range_supported=false (set in constructor), so only
173
    // the non-chunk non-Range path needs the file size guard.
174
14
    if (_enable_chunk_response) {
175
        // Nothing to do: _range_supported already false, size check not applicable
176
14
    } else if (!_enable_range_request) {
177
0
        _range_supported = false;
178
0
        LOG(INFO) << "Range requests disabled by configuration for " << _url;
179
0
        if (_file_size > _max_request_size_bytes) {
180
0
            return Status::InternalError(
181
0
                    "Non-Range mode: file size ({} bytes) exceeds maximum allowed size ({} "
182
0
                    "bytes, configured by http.max.request.size.bytes). URL: {}",
183
0
                    _file_size, _max_request_size_bytes, _url);
184
0
        }
185
0
        LOG(INFO) << "Non-Range mode validated for " << _url << ", file size: " << _file_size
186
0
                  << " bytes, max allowed: " << _max_request_size_bytes << " bytes";
187
0
    } else {
188
        // Step 3: Range request is enabled (default), detect Range support
189
0
        VLOG(1) << "Detecting Range support for URL: " << _url;
190
0
        RETURN_IF_ERROR(detect_range_support());
191
192
        // Step 4: Validate Range support detection result
193
0
        if (!_range_supported) {
194
            // Server does not support Range and Range is required
195
0
            return Status::NotSupported(
196
0
                    "HTTP server does not support Range requests (RFC 7233), which is required "
197
0
                    "for reading files. File size: {} bytes, URL: {}. "
198
0
                    "To allow reading without Range support, set "
199
0
                    "'http.enable.range.request'='false' "
200
0
                    "in properties and configure 'http.max.request.size.bytes' appropriately "
201
0
                    "(note: this may cause high memory usage for large files).",
202
0
                    _file_size, _url);
203
0
        }
204
205
0
        LOG(INFO) << "HTTP server supports Range requests for " << _url;
206
0
    }
207
208
14
    _initialized = true;
209
14
    return Status::OK();
210
14
}
211
212
Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
213
20
                                    const IOContext* /*io_ctx*/) {
214
20
    VLOG(2) << "HttpFileReader::read_at_impl offset=" << offset << " size=" << result.size
215
0
            << " url=" << _url << " range_supported=" << _range_supported;
216
217
20
    if (!_read_buffer) {
218
0
        _read_buffer = std::make_unique<char[]>(READ_BUFFER_SIZE);
219
0
    }
220
221
20
    size_t to_read = result.size;
222
20
    size_t buffer_offset = 0;
223
224
20
    if (_size_known && offset >= _file_size) {
225
6
        *bytes_read = 0;
226
6
        return Status::OK();
227
6
    }
228
229
    // For non-Range mode, prioritize full file cache
230
14
    if (!_range_supported && _full_file_cached) {
231
0
        if (offset >= _full_file_cache.size()) {
232
0
            *bytes_read = 0;
233
0
            return Status::OK();
234
0
        }
235
236
0
        size_t available = _full_file_cache.size() - offset;
237
0
        size_t copy_len = std::min(available, to_read);
238
0
        std::memcpy(result.data, _full_file_cache.data() + offset, copy_len);
239
0
        *bytes_read = copy_len;
240
241
0
        VLOG(2) << "Full file cache hit: copied " << copy_len << " bytes from offset " << offset;
242
0
        return Status::OK();
243
0
    }
244
245
    // Try to serve from buffer cache
246
14
    if (offset >= _buffer_start && offset < _buffer_end) {
247
0
        size_t buffer_idx = offset - _buffer_start;
248
0
        size_t available = _buffer_end - offset;
249
0
        size_t copy_len = std::min(available, to_read);
250
251
0
        DCHECK(buffer_idx + copy_len <= READ_BUFFER_SIZE)
252
0
                << "Buffer overflow: buffer_idx=" << buffer_idx << " copy_len=" << copy_len
253
0
                << " READ_BUFFER_SIZE=" << READ_BUFFER_SIZE;
254
255
0
        std::memcpy(result.data, _read_buffer.get() + buffer_idx, copy_len);
256
0
        buffer_offset += copy_len;
257
0
        to_read -= copy_len;
258
0
        offset += copy_len;
259
260
0
        VLOG(2) << "Buffer cache hit: copied " << copy_len << " bytes";
261
14
    } else {
262
        // Buffer miss, invalidate cache
263
14
        _buffer_start = 0;
264
14
        _buffer_end = 0;
265
14
        VLOG(2) << "Buffer cache miss";
266
14
    }
267
268
14
    if (to_read == 0) {
269
0
        *bytes_read = buffer_offset;
270
0
        return Status::OK();
271
0
    }
272
273
14
    size_t remaining = to_read;
274
14
    if (_size_known) {
275
0
        uint64_t left = (_file_size > offset) ? (_file_size - offset) : 0;
276
0
        if (left == 0) {
277
0
            *bytes_read = buffer_offset;
278
0
            return Status::OK();
279
0
        }
280
0
        remaining = std::min<uint64_t>(to_read, left);
281
0
    }
282
14
    size_t req_len = (remaining > READ_BUFFER_SIZE) ? remaining : READ_BUFFER_SIZE;
283
284
14
    VLOG(2) << "Issuing HTTP GET request: offset=" << offset << " req_len=" << req_len
285
0
            << " with_range=" << _range_supported;
286
287
    // Prepare and initialize the HTTP client for request
288
14
    RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/false));
289
290
    // Determine HTTP method from configuration (default: GET)
291
14
    HttpMethod method = HttpMethod::GET;
292
14
    auto method_iter = _extend_kv.find("http.method");
293
14
    if (method_iter != _extend_kv.end()) {
294
14
        method = to_http_method(method_iter->second.c_str());
295
14
        if (method == HttpMethod::UNKNOWN) {
296
0
            LOG(WARNING) << "Invalid http.method value: " << method_iter->second
297
0
                         << ", falling back to GET";
298
0
            method = HttpMethod::GET;
299
0
        }
300
14
    }
301
14
    _client->set_method(method);
302
303
    // Set payload if configured (supports POST, PUT, DELETE, etc.)
304
14
    auto payload_iter = _extend_kv.find("http.payload");
305
14
    if (payload_iter != _extend_kv.end() && !payload_iter->second.empty()) {
306
14
        _client->set_payload(payload_iter->second);
307
14
        _client->set_content_type("application/json");
308
14
        VLOG(2) << "HTTP request with payload, size=" << payload_iter->second.size();
309
14
    }
310
311
14
    _client->set_header("Expect", "");
312
14
    _client->set_header("Connection", "close");
313
314
14
    bool with_range = _range_supported;
315
14
    if (with_range) _client->set_range(offset, req_len);
316
317
14
    std::string buf;
318
14
    buf.reserve(req_len);
319
14
    size_t total_received = 0;
320
14
    bool size_limit_exceeded = false;
321
322
14
    auto cb = [&](const void* data, size_t len) {
323
12
        total_received += len;
324
325
        // If using non-Range mode, enforce size limit to prevent OOM
326
12
        if (!_range_supported && total_received > _max_request_size_bytes) {
327
0
            size_limit_exceeded = true;
328
0
            VLOG(1) << "Stopping download: received " << total_received << " bytes, exceeds limit "
329
0
                    << _max_request_size_bytes;
330
0
            return false; // Stop receiving - this will cause CURL to return an error
331
0
        }
332
333
12
        buf.append(reinterpret_cast<const char*>(data), len);
334
12
        return true;
335
12
    };
336
337
14
    Status exec_status = _client->execute(cb);
338
339
    // Check if we stopped due to size limit - this is expected behavior
340
14
    if (size_limit_exceeded) {
341
0
        return Status::InternalError(
342
0
                "HTTP response too large: received {} bytes, exceeds maximum allowed size {} "
343
0
                "bytes (configured by max.request.size.bytes). URL: {}",
344
0
                total_received, _max_request_size_bytes, _url);
345
0
    }
346
347
    // If there's an error and it's not due to our size limit check, return it
348
14
    RETURN_IF_ERROR(exec_status);
349
350
14
    long http_status = _client->get_http_status();
351
14
    VLOG(2) << "HTTP response: status=" << http_status << " received_bytes=" << buf.size();
352
353
    // Check for HTTP error status codes (4xx, 5xx)
354
14
    if (http_status >= 400) {
355
6
        std::string error_body;
356
6
        if (buf.empty()) {
357
0
            error_body = "(empty response body)";
358
6
        } else {
359
            // Limit error message to 1024 bytes to avoid excessive logging
360
6
            size_t max_len = std::min(buf.size(), static_cast<size_t>(1024));
361
6
            error_body = buf.substr(0, max_len);
362
6
        }
363
364
6
        return Status::InternalError("HTTP request failed with status {}: {}.", http_status,
365
6
                                     error_body);
366
6
    }
367
368
8
    if (buf.empty()) {
369
2
        *bytes_read = buffer_offset;
370
2
        return Status::OK();
371
2
    }
372
373
    // Defensive check: if we sent Range but server returned 200 instead of 206
374
    // This should rarely happen since we detect Range support in open()
375
6
    if (with_range && offset > 0 && http_status == 200) {
376
0
        LOG(ERROR) << "HTTP server unexpectedly does not support Range requests for " << _url
377
0
                   << " (this should have been detected in open()). HTTP status: " << http_status
378
0
                   << ", received: " << buf.size()
379
0
                   << " bytes. This indicates a server behavior change.";
380
381
0
        return Status::InternalError(
382
0
                "HTTP server does not support Range requests but this was not detected during "
383
0
                "file open. This may indicate the server behavior has changed. "
384
0
                "HTTP status: {}, received: {} bytes. URL: {}",
385
0
                http_status, buf.size(), _url);
386
0
    }
387
388
    // Handle non-Range mode: cache the full file on first download
389
6
    if (!_range_supported && !_full_file_cached) {
390
        // Cache the complete file content for subsequent reads
391
6
        _full_file_cache = std::move(buf);
392
6
        _full_file_cached = true;
393
        // Now that the full content is in hand, update _file_size to the actual
394
        // byte count. This replaces the 0 placeholder set in open() for chunk
395
        // response mode, so subsequent calls to size() return a correct value.
396
6
        _file_size = _full_file_cache.size();
397
6
        _size_known = true;
398
399
6
        VLOG(2) << "Cached full file: " << _full_file_cache.size() << " bytes";
400
401
        // Serve the requested portion from cache
402
6
        if (offset >= _full_file_cache.size()) {
403
0
            *bytes_read = buffer_offset;
404
0
            return Status::OK();
405
0
        }
406
407
6
        size_t available = _full_file_cache.size() - offset;
408
6
        size_t copy_len = std::min(available, remaining);
409
6
        std::memcpy(result.data + buffer_offset, _full_file_cache.data() + offset, copy_len);
410
6
        *bytes_read = buffer_offset + copy_len;
411
6
        return Status::OK();
412
6
    }
413
414
0
    if (to_read > READ_BUFFER_SIZE) {
415
0
        if (buf.size() > remaining) {
416
0
            return Status::InternalError("HTTP response larger than requested buffer");
417
0
        }
418
0
        std::memcpy(result.data + buffer_offset, buf.data(), buf.size());
419
0
        buffer_offset += buf.size();
420
0
    } else {
421
0
        size_t cached = std::min(buf.size(), (size_t)READ_BUFFER_SIZE);
422
0
        std::memcpy(_read_buffer.get(), buf.data(), cached);
423
0
        _buffer_start = offset;
424
0
        _buffer_end = offset + cached;
425
426
0
        size_t copy_len = std::min(remaining, cached);
427
0
        std::memcpy(result.data + buffer_offset, _read_buffer.get(), copy_len);
428
0
        buffer_offset += copy_len;
429
0
    }
430
431
0
    if (!_size_known && with_range && buf.size() < req_len) {
432
0
        _size_known = true;
433
0
        _file_size = offset + buf.size();
434
0
    }
435
436
0
    *bytes_read = buffer_offset;
437
0
    return Status::OK();
438
0
}
439
440
14
Status HttpFileReader::close() {
441
14
    if (_closed.exchange(true)) {
442
0
        return Status::OK();
443
0
    }
444
445
    // Release buffer memory (1MB)
446
14
    _read_buffer.reset();
447
14
    _buffer_start = 0;
448
14
    _buffer_end = 0;
449
450
    // Release full file cache
451
14
    _full_file_cache.clear();
452
14
    _full_file_cache.shrink_to_fit();
453
14
    _full_file_cached = false;
454
455
    // Release HttpClient resources
456
14
    _client.reset();
457
458
14
    return Status::OK();
459
14
}
460
461
14
Status HttpFileReader::prepare_client(bool set_fail_on_error) {
462
14
    if (!_client) {
463
0
        return Status::InternalError("HttpClient is not initialized");
464
0
    }
465
466
    // Initialize the HTTP client with URL
467
14
    RETURN_IF_ERROR(_client->init(_url, set_fail_on_error));
468
469
    // Set custom headers from extend_kv
470
214
    for (const auto& kv : _extend_kv) {
471
214
        if (kv.first.rfind("http.header.", 0) == 0) {
472
0
            _client->set_header(kv.first.substr(strlen("http.header.")), kv.second);
473
0
        }
474
214
    }
475
476
14
    return Status::OK();
477
14
}
478
479
0
Status HttpFileReader::detect_range_support() {
480
    // Send a small Range request to test if the server supports it
481
    // We request only the first byte to minimize data transfer
482
0
    RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/false));
483
0
    _client->set_method(HttpMethod::GET);
484
0
    _client->set_range(0, 1); // Request only the first byte
485
486
0
    std::string test_buf;
487
0
    size_t received = 0;
488
0
    constexpr size_t MAX_TEST_SIZE = 10240; // 10KB max for test
489
0
    bool stopped_by_limit = false;
490
491
0
    auto cb = [&](const void* data, size_t len) {
492
0
        received += len;
493
        // Limit test data to prevent downloading too much
494
0
        if (received > MAX_TEST_SIZE) {
495
0
            stopped_by_limit = true;
496
0
            VLOG(2) << "Stopping Range detection test after receiving " << received << " bytes";
497
0
            return false; // This will cause CURL to stop with an error
498
0
        }
499
0
        test_buf.append(reinterpret_cast<const char*>(data), len);
500
0
        return true;
501
0
    };
502
503
0
    Status exec_status = _client->execute(cb);
504
505
    // If we stopped because of size limit, it's not a real error
506
0
    if (!exec_status.ok() && stopped_by_limit) {
507
0
        VLOG(1) << "Range detection stopped at size limit (expected): " << exec_status.to_string();
508
        // Continue processing - this is expected behavior
509
0
    } else if (!exec_status.ok()) {
510
        // Real error
511
0
        return exec_status;
512
0
    }
513
514
0
    long http_status = _client->get_http_status();
515
516
0
    if (http_status == 206) {
517
        // HTTP 206 Partial Content - server supports Range requests
518
0
        _range_supported = true;
519
0
        VLOG(1) << "Range support detected (HTTP 206) for " << _url << ", received "
520
0
                << test_buf.size() << " bytes";
521
0
    } else if (http_status == 200) {
522
        // HTTP 200 OK - server does not support Range requests
523
        // It returned the full file (or a large portion)
524
0
        _range_supported = false;
525
0
        VLOG(1) << "Range not supported (HTTP 200) for " << _url << ", received " << test_buf.size()
526
0
                << " bytes in test";
527
528
        // If we received a lot of data, it's likely the full file
529
0
        if (test_buf.size() >= MAX_TEST_SIZE || stopped_by_limit) {
530
0
            LOG(WARNING) << "Server returned " << received << "+ bytes for Range test, "
531
0
                         << "indicating no Range support for " << _url;
532
0
        }
533
0
    } else {
534
        // Unexpected status code
535
0
        LOG(WARNING) << "Unexpected HTTP status " << http_status << " during Range detection for "
536
0
                     << _url << ", assuming Range is not supported";
537
0
        _range_supported = false;
538
0
    }
539
540
0
    return Status::OK();
541
0
}
542
543
} // namespace doris::io