Coverage Report

Created: 2026-03-12 14:02

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/s3_file_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/s3_file_reader.h"
19
20
#include <aws/core/http/URI.h>
21
#include <aws/core/utils/Outcome.h>
22
#include <aws/s3/S3Client.h>
23
#include <aws/s3/S3Errors.h>
24
#include <aws/s3/model/GetObjectRequest.h>
25
#include <aws/s3/model/GetObjectResult.h>
26
#include <bvar/latency_recorder.h>
27
#include <bvar/reducer.h>
28
#include <fmt/format.h>
29
#include <glog/logging.h>
30
31
#include <algorithm>
32
#include <utility>
33
34
#include "common/compiler_util.h" // IWYU pragma: keep
35
#include "common/metrics/doris_metrics.h"
36
#include "io/cache/block_file_cache.h"
37
#include "io/fs/err_utils.h"
38
#include "io/fs/obj_storage_client.h"
39
#include "io/fs/s3_common.h"
40
#include "runtime/runtime_profile.h"
41
#include "runtime/thread_context.h"
42
#include "runtime/workload_management/io_throttle.h"
43
#include "util/bvar_helper.h"
44
#include "util/concurrency_stats.h"
45
#include "util/debug_points.h"
46
#include "util/s3_util.h"
47
48
namespace doris::io {
49
50
bvar::Adder<uint64_t> s3_file_reader_read_counter("s3_file_reader", "read_at");
51
bvar::Adder<uint64_t> s3_file_reader_total("s3_file_reader", "total_num");
52
bvar::Adder<uint64_t> s3_bytes_read_total("s3_file_reader", "bytes_read");
53
bvar::Adder<uint64_t> s3_file_being_read("s3_file_reader", "file_being_read");
54
bvar::Adder<uint64_t> s3_file_reader_too_many_request_counter("s3_file_reader", "too_many_request");
55
bvar::LatencyRecorder s3_bytes_per_read("s3_file_reader", "bytes_per_read"); // also QPS
56
bvar::PerSecond<bvar::Adder<uint64_t>> s3_read_througthput("s3_file_reader", "s3_read_throughput",
57
                                                           &s3_bytes_read_total);
58
// Although we can get QPS from s3_bytes_per_read, but s3_bytes_per_read only
59
// record successfull request, and s3_get_request_qps will record all request.
60
bvar::PerSecond<bvar::Adder<uint64_t>> s3_get_request_qps("s3_file_reader", "s3_get_request",
61
                                                          &s3_file_reader_read_counter);
62
bvar::LatencyRecorder s3_file_reader_latency("s3_file_reader", "s3_latency");
63
64
Result<FileReaderSPtr> S3FileReader::create(std::shared_ptr<const ObjClientHolder> client,
65
                                            std::string bucket, std::string key, int64_t file_size,
66
32.5k
                                            RuntimeProfile* profile) {
67
32.5k
    if (file_size < 0) {
68
5.44k
        auto res = client->object_file_size(bucket, key);
69
5.44k
        if (!res.has_value()) {
70
3
            return ResultError(std::move(res.error()));
71
3
        }
72
73
5.44k
        file_size = res.value();
74
5.44k
    }
75
76
32.5k
    return std::make_shared<S3FileReader>(std::move(client), std::move(bucket), std::move(key),
77
32.5k
                                          file_size, profile);
78
32.5k
}
79
80
S3FileReader::S3FileReader(std::shared_ptr<const ObjClientHolder> client, std::string bucket,
81
                           std::string key, size_t file_size, RuntimeProfile* profile)
82
32.5k
        : _path(fmt::format("s3://{}/{}", bucket, key)),
83
32.5k
          _file_size(file_size),
84
32.5k
          _bucket(std::move(bucket)),
85
32.5k
          _key(std::move(key)),
86
32.5k
          _client(std::move(client)),
87
32.5k
          _profile(profile) {
88
32.5k
    DorisMetrics::instance()->s3_file_open_reading->increment(1);
89
32.5k
    DorisMetrics::instance()->s3_file_reader_total->increment(1);
90
32.5k
    s3_file_reader_total << 1;
91
32.5k
    s3_file_being_read << 1;
92
93
32.5k
    Aws::Http::SetCompliantRfc3986Encoding(true);
94
32.5k
}
95
96
32.5k
S3FileReader::~S3FileReader() {
97
32.5k
    static_cast<void>(close());
98
32.5k
    s3_file_being_read << -1;
99
32.5k
}
100
101
65.0k
Status S3FileReader::close() {
102
65.0k
    bool expected = false;
103
65.0k
    if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
104
32.5k
        DorisMetrics::instance()->s3_file_open_reading->increment(-1);
105
32.5k
    }
106
65.0k
    return Status::OK();
107
65.0k
}
108
109
Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
110
28.8k
                                  const IOContext* /*io_ctx*/) {
111
28.8k
    DCHECK(!closed());
112
28.8k
    if (offset > _file_size) {
113
0
        return Status::InternalError(
114
0
                "offset exceeds file size(offset: {}, file size: {}, path: {})", offset, _file_size,
115
0
                _path.native());
116
0
    }
117
28.8k
    size_t bytes_req = result.size;
118
28.8k
    char* to = result.data;
119
28.8k
    bytes_req = std::min(bytes_req, _file_size - offset);
120
28.8k
    VLOG_DEBUG << fmt::format("S3FileReader::read_at_impl offset={} size={} path={} hash={}",
121
12
                              offset, result.size, _path.native(),
122
12
                              io::BlockFileCache::hash(_path.native()).to_string());
123
28.8k
    VLOG_DEBUG << "enter s3 read_at_impl, off=" << offset << " n=" << bytes_req
124
20
               << " req=" << result.size << " file size=" << _file_size;
125
28.8k
    if (UNLIKELY(bytes_req == 0)) {
126
0
        *bytes_read = 0;
127
0
        return Status::OK();
128
0
    }
129
130
28.8k
    auto client = _client->get();
131
28.8k
    if (!client) {
132
0
        return Status::InternalError("init s3 client error");
133
0
    }
134
135
28.8k
    SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().s3_file_reader_read);
136
137
28.8k
    int retry_count = 0;
138
28.8k
    const int base_wait_time = config::s3_read_base_wait_time_ms; // Base wait time in milliseconds
139
28.8k
    const int max_wait_time = config::s3_read_max_wait_time_ms; // Maximum wait time in milliseconds
140
28.8k
    const int max_retries = config::max_s3_client_retry; // wait 1s, 2s, 4s, 8s for each backoff
141
142
28.8k
    int64_t begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
143
28.8k
                               std::chrono::system_clock::now().time_since_epoch())
144
28.8k
                               .count();
145
28.8k
    LIMIT_REMOTE_SCAN_IO(bytes_read);
146
28.8k
    DBUG_EXECUTE_IF("S3FileReader::read_at_impl.io_slow", {
147
28.8k
        auto sleep_time = dp->param("sleep", 3);
148
28.8k
        LOG_INFO("S3FileReader::read_at_impl.io_slow inject microseconds {} s", sleep_time)
149
28.8k
                .tag("bucket", _bucket)
150
28.8k
                .tag("key", _key);
151
28.8k
        std::this_thread::sleep_for(std::chrono::microseconds(sleep_time));
152
28.8k
    });
153
28.8k
    Defer defer_latency {[&]() {
154
28.7k
        int64_t end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
155
28.7k
                                 std::chrono::system_clock::now().time_since_epoch())
156
28.7k
                                 .count();
157
28.7k
        s3_file_reader_latency << (end_ts - begin_ts);
158
28.7k
    }};
159
28.8k
    SCOPED_RAW_TIMER(&_s3_stats.total_get_request_time_ns);
160
161
28.8k
    int total_sleep_time = 0;
162
28.8k
    while (retry_count <= max_retries) {
163
28.7k
        *bytes_read = 0;
164
28.7k
        s3_file_reader_read_counter << 1;
165
        // clang-format off
166
28.7k
        auto resp = client->get_object( { .bucket = _bucket, .key = _key, },
167
28.7k
                to, offset, bytes_req, bytes_read);
168
        // clang-format on
169
28.7k
        _s3_stats.total_get_request_counter++;
170
28.7k
        if (resp.status.code != ErrorCode::OK) {
171
0
            if (resp.http_code ==
172
0
                static_cast<int>(Aws::Http::HttpResponseCode::TOO_MANY_REQUESTS)) {
173
0
                s3_file_reader_too_many_request_counter << 1;
174
0
                retry_count++;
175
0
                int wait_time = std::min(base_wait_time * (1 << retry_count),
176
0
                                         max_wait_time); // Exponential backoff
177
0
                std::this_thread::sleep_for(std::chrono::milliseconds(wait_time));
178
0
                _s3_stats.too_many_request_err_counter++;
179
0
                _s3_stats.too_many_request_sleep_time_ms += wait_time;
180
0
                total_sleep_time += wait_time;
181
0
                continue;
182
0
            } else {
183
                // Handle other errors
184
0
                return std::move(Status(resp.status.code, std::move(resp.status.msg))
185
0
                                         .append("failed to read"));
186
0
            }
187
0
        }
188
28.7k
        if (*bytes_read != bytes_req) {
189
0
            std::string msg = fmt::format(
190
0
                    "failed to get object, path={} offset={} bytes_req={} bytes_read={} "
191
0
                    "file_size={} tries={}",
192
0
                    _path.native(), offset, bytes_req, *bytes_read, _file_size, (retry_count + 1));
193
0
            LOG(WARNING) << msg;
194
0
            return Status::InternalError(msg);
195
0
        }
196
28.7k
        _s3_stats.total_bytes_read += bytes_req;
197
28.7k
        s3_bytes_read_total << bytes_req;
198
28.7k
        s3_bytes_per_read << bytes_req;
199
28.7k
        DorisMetrics::instance()->s3_bytes_read_total->increment(bytes_req);
200
28.7k
        if (retry_count > 0) {
201
0
            LOG(INFO) << fmt::format("read s3 file {} succeed after {} times with {} ms sleeping",
202
0
                                     _path.native(), retry_count, total_sleep_time);
203
0
        }
204
28.7k
        return Status::OK();
205
28.7k
    }
206
8
    std::string msg = fmt::format(
207
8
            "failed to get object, path={} offset={} bytes_req={} bytes_read={} file_size={} "
208
8
            "tries={}",
209
8
            _path.native(), offset, bytes_req, *bytes_read, _file_size, (max_retries + 1));
210
8
    LOG(WARNING) << msg;
211
8
    return Status::InternalError(msg);
212
28.8k
}
213
214
27.4k
void S3FileReader::_collect_profile_before_close() {
215
27.4k
    if (_profile != nullptr) {
216
27.4k
        const char* s3_profile_name = "S3Profile";
217
27.4k
        ADD_TIMER(_profile, s3_profile_name);
218
27.4k
        RuntimeProfile::Counter* total_get_request_counter =
219
27.4k
                ADD_CHILD_COUNTER(_profile, "TotalGetRequest", TUnit::UNIT, s3_profile_name);
220
27.4k
        RuntimeProfile::Counter* too_many_request_err_counter =
221
27.4k
                ADD_CHILD_COUNTER(_profile, "TooManyRequestErr", TUnit::UNIT, s3_profile_name);
222
27.4k
        RuntimeProfile::Counter* too_many_request_sleep_time = ADD_CHILD_COUNTER(
223
27.4k
                _profile, "TooManyRequestSleepTime", TUnit::TIME_MS, s3_profile_name);
224
27.4k
        RuntimeProfile::Counter* total_bytes_read =
225
27.4k
                ADD_CHILD_COUNTER(_profile, "TotalBytesRead", TUnit::BYTES, s3_profile_name);
226
27.4k
        RuntimeProfile::Counter* total_get_request_time_ns =
227
27.4k
                ADD_CHILD_TIMER(_profile, "TotalGetRequestTime", s3_profile_name);
228
229
27.4k
        COUNTER_UPDATE(total_get_request_counter, _s3_stats.total_get_request_counter);
230
27.4k
        COUNTER_UPDATE(too_many_request_err_counter, _s3_stats.too_many_request_err_counter);
231
27.4k
        COUNTER_UPDATE(too_many_request_sleep_time, _s3_stats.too_many_request_sleep_time_ms);
232
27.4k
        COUNTER_UPDATE(total_bytes_read, _s3_stats.total_bytes_read);
233
27.4k
        COUNTER_UPDATE(total_get_request_time_ns, _s3_stats.total_get_request_time_ns);
234
27.4k
    }
235
27.4k
}
236
237
} // namespace doris::io