be/src/io/fs/s3_file_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/fs/s3_file_reader.h" |
19 | | |
20 | | #include <aws/core/http/URI.h> |
21 | | #include <aws/core/utils/Outcome.h> |
22 | | #include <aws/s3/S3Client.h> |
23 | | #include <aws/s3/S3Errors.h> |
24 | | #include <aws/s3/model/GetObjectRequest.h> |
25 | | #include <aws/s3/model/GetObjectResult.h> |
26 | | #include <bvar/latency_recorder.h> |
27 | | #include <bvar/reducer.h> |
28 | | #include <fmt/format.h> |
29 | | #include <glog/logging.h> |
30 | | |
31 | | #include <algorithm> |
32 | | #include <utility> |
33 | | |
34 | | #include "common/compiler_util.h" // IWYU pragma: keep |
35 | | #include "common/metrics/doris_metrics.h" |
36 | | #include "io/cache/block_file_cache.h" |
37 | | #include "io/fs/err_utils.h" |
38 | | #include "io/fs/obj_storage_client.h" |
39 | | #include "io/fs/s3_common.h" |
40 | | #include "runtime/runtime_profile.h" |
41 | | #include "runtime/thread_context.h" |
42 | | #include "runtime/workload_management/io_throttle.h" |
43 | | #include "util/bvar_helper.h" |
44 | | #include "util/concurrency_stats.h" |
45 | | #include "util/debug_points.h" |
46 | | #include "util/s3_util.h" |
47 | | |
48 | | namespace doris::io { |
49 | | |
50 | | bvar::Adder<uint64_t> s3_file_reader_read_counter("s3_file_reader", "read_at"); |
51 | | bvar::Adder<uint64_t> s3_file_reader_total("s3_file_reader", "total_num"); |
52 | | bvar::Adder<uint64_t> s3_bytes_read_total("s3_file_reader", "bytes_read"); |
53 | | bvar::Adder<uint64_t> s3_file_being_read("s3_file_reader", "file_being_read"); |
54 | | bvar::Adder<uint64_t> s3_file_reader_too_many_request_counter("s3_file_reader", "too_many_request"); |
55 | | bvar::LatencyRecorder s3_bytes_per_read("s3_file_reader", "bytes_per_read"); // also QPS |
56 | | bvar::PerSecond<bvar::Adder<uint64_t>> s3_read_througthput("s3_file_reader", "s3_read_throughput", |
57 | | &s3_bytes_read_total); |
58 | | // Although we can get QPS from s3_bytes_per_read, but s3_bytes_per_read only |
59 | | // record successfull request, and s3_get_request_qps will record all request. |
60 | | bvar::PerSecond<bvar::Adder<uint64_t>> s3_get_request_qps("s3_file_reader", "s3_get_request", |
61 | | &s3_file_reader_read_counter); |
62 | | bvar::LatencyRecorder s3_file_reader_latency("s3_file_reader", "s3_latency"); |
63 | | |
64 | | Result<FileReaderSPtr> S3FileReader::create(std::shared_ptr<const ObjClientHolder> client, |
65 | | std::string bucket, std::string key, int64_t file_size, |
66 | 32.5k | RuntimeProfile* profile) { |
67 | 32.5k | if (file_size < 0) { |
68 | 5.44k | auto res = client->object_file_size(bucket, key); |
69 | 5.44k | if (!res.has_value()) { |
70 | 3 | return ResultError(std::move(res.error())); |
71 | 3 | } |
72 | | |
73 | 5.44k | file_size = res.value(); |
74 | 5.44k | } |
75 | | |
76 | 32.5k | return std::make_shared<S3FileReader>(std::move(client), std::move(bucket), std::move(key), |
77 | 32.5k | file_size, profile); |
78 | 32.5k | } |
79 | | |
80 | | S3FileReader::S3FileReader(std::shared_ptr<const ObjClientHolder> client, std::string bucket, |
81 | | std::string key, size_t file_size, RuntimeProfile* profile) |
82 | 32.5k | : _path(fmt::format("s3://{}/{}", bucket, key)), |
83 | 32.5k | _file_size(file_size), |
84 | 32.5k | _bucket(std::move(bucket)), |
85 | 32.5k | _key(std::move(key)), |
86 | 32.5k | _client(std::move(client)), |
87 | 32.5k | _profile(profile) { |
88 | 32.5k | DorisMetrics::instance()->s3_file_open_reading->increment(1); |
89 | 32.5k | DorisMetrics::instance()->s3_file_reader_total->increment(1); |
90 | 32.5k | s3_file_reader_total << 1; |
91 | 32.5k | s3_file_being_read << 1; |
92 | | |
93 | 32.5k | Aws::Http::SetCompliantRfc3986Encoding(true); |
94 | 32.5k | } |
95 | | |
96 | 32.5k | S3FileReader::~S3FileReader() { |
97 | 32.5k | static_cast<void>(close()); |
98 | 32.5k | s3_file_being_read << -1; |
99 | 32.5k | } |
100 | | |
101 | 65.0k | Status S3FileReader::close() { |
102 | 65.0k | bool expected = false; |
103 | 65.0k | if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { |
104 | 32.5k | DorisMetrics::instance()->s3_file_open_reading->increment(-1); |
105 | 32.5k | } |
106 | 65.0k | return Status::OK(); |
107 | 65.0k | } |
108 | | |
109 | | Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, |
110 | 28.8k | const IOContext* /*io_ctx*/) { |
111 | 28.8k | DCHECK(!closed()); |
112 | 28.8k | if (offset > _file_size) { |
113 | 0 | return Status::InternalError( |
114 | 0 | "offset exceeds file size(offset: {}, file size: {}, path: {})", offset, _file_size, |
115 | 0 | _path.native()); |
116 | 0 | } |
117 | 28.8k | size_t bytes_req = result.size; |
118 | 28.8k | char* to = result.data; |
119 | 28.8k | bytes_req = std::min(bytes_req, _file_size - offset); |
120 | 28.8k | VLOG_DEBUG << fmt::format("S3FileReader::read_at_impl offset={} size={} path={} hash={}", |
121 | 12 | offset, result.size, _path.native(), |
122 | 12 | io::BlockFileCache::hash(_path.native()).to_string()); |
123 | 28.8k | VLOG_DEBUG << "enter s3 read_at_impl, off=" << offset << " n=" << bytes_req |
124 | 20 | << " req=" << result.size << " file size=" << _file_size; |
125 | 28.8k | if (UNLIKELY(bytes_req == 0)) { |
126 | 0 | *bytes_read = 0; |
127 | 0 | return Status::OK(); |
128 | 0 | } |
129 | | |
130 | 28.8k | auto client = _client->get(); |
131 | 28.8k | if (!client) { |
132 | 0 | return Status::InternalError("init s3 client error"); |
133 | 0 | } |
134 | | |
135 | 28.8k | SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().s3_file_reader_read); |
136 | | |
137 | 28.8k | int retry_count = 0; |
138 | 28.8k | const int base_wait_time = config::s3_read_base_wait_time_ms; // Base wait time in milliseconds |
139 | 28.8k | const int max_wait_time = config::s3_read_max_wait_time_ms; // Maximum wait time in milliseconds |
140 | 28.8k | const int max_retries = config::max_s3_client_retry; // wait 1s, 2s, 4s, 8s for each backoff |
141 | | |
142 | 28.8k | int64_t begin_ts = std::chrono::duration_cast<std::chrono::microseconds>( |
143 | 28.8k | std::chrono::system_clock::now().time_since_epoch()) |
144 | 28.8k | .count(); |
145 | 28.8k | LIMIT_REMOTE_SCAN_IO(bytes_read); |
146 | 28.8k | DBUG_EXECUTE_IF("S3FileReader::read_at_impl.io_slow", { |
147 | 28.8k | auto sleep_time = dp->param("sleep", 3); |
148 | 28.8k | LOG_INFO("S3FileReader::read_at_impl.io_slow inject microseconds {} s", sleep_time) |
149 | 28.8k | .tag("bucket", _bucket) |
150 | 28.8k | .tag("key", _key); |
151 | 28.8k | std::this_thread::sleep_for(std::chrono::microseconds(sleep_time)); |
152 | 28.8k | }); |
153 | 28.8k | Defer defer_latency {[&]() { |
154 | 28.7k | int64_t end_ts = std::chrono::duration_cast<std::chrono::microseconds>( |
155 | 28.7k | std::chrono::system_clock::now().time_since_epoch()) |
156 | 28.7k | .count(); |
157 | 28.7k | s3_file_reader_latency << (end_ts - begin_ts); |
158 | 28.7k | }}; |
159 | 28.8k | SCOPED_RAW_TIMER(&_s3_stats.total_get_request_time_ns); |
160 | | |
161 | 28.8k | int total_sleep_time = 0; |
162 | 28.8k | while (retry_count <= max_retries) { |
163 | 28.7k | *bytes_read = 0; |
164 | 28.7k | s3_file_reader_read_counter << 1; |
165 | | // clang-format off |
166 | 28.7k | auto resp = client->get_object( { .bucket = _bucket, .key = _key, }, |
167 | 28.7k | to, offset, bytes_req, bytes_read); |
168 | | // clang-format on |
169 | 28.7k | _s3_stats.total_get_request_counter++; |
170 | 28.7k | if (resp.status.code != ErrorCode::OK) { |
171 | 0 | if (resp.http_code == |
172 | 0 | static_cast<int>(Aws::Http::HttpResponseCode::TOO_MANY_REQUESTS)) { |
173 | 0 | s3_file_reader_too_many_request_counter << 1; |
174 | 0 | retry_count++; |
175 | 0 | int wait_time = std::min(base_wait_time * (1 << retry_count), |
176 | 0 | max_wait_time); // Exponential backoff |
177 | 0 | std::this_thread::sleep_for(std::chrono::milliseconds(wait_time)); |
178 | 0 | _s3_stats.too_many_request_err_counter++; |
179 | 0 | _s3_stats.too_many_request_sleep_time_ms += wait_time; |
180 | 0 | total_sleep_time += wait_time; |
181 | 0 | continue; |
182 | 0 | } else { |
183 | | // Handle other errors |
184 | 0 | return std::move(Status(resp.status.code, std::move(resp.status.msg)) |
185 | 0 | .append("failed to read")); |
186 | 0 | } |
187 | 0 | } |
188 | 28.7k | if (*bytes_read != bytes_req) { |
189 | 0 | std::string msg = fmt::format( |
190 | 0 | "failed to get object, path={} offset={} bytes_req={} bytes_read={} " |
191 | 0 | "file_size={} tries={}", |
192 | 0 | _path.native(), offset, bytes_req, *bytes_read, _file_size, (retry_count + 1)); |
193 | 0 | LOG(WARNING) << msg; |
194 | 0 | return Status::InternalError(msg); |
195 | 0 | } |
196 | 28.7k | _s3_stats.total_bytes_read += bytes_req; |
197 | 28.7k | s3_bytes_read_total << bytes_req; |
198 | 28.7k | s3_bytes_per_read << bytes_req; |
199 | 28.7k | DorisMetrics::instance()->s3_bytes_read_total->increment(bytes_req); |
200 | 28.7k | if (retry_count > 0) { |
201 | 0 | LOG(INFO) << fmt::format("read s3 file {} succeed after {} times with {} ms sleeping", |
202 | 0 | _path.native(), retry_count, total_sleep_time); |
203 | 0 | } |
204 | 28.7k | return Status::OK(); |
205 | 28.7k | } |
206 | 8 | std::string msg = fmt::format( |
207 | 8 | "failed to get object, path={} offset={} bytes_req={} bytes_read={} file_size={} " |
208 | 8 | "tries={}", |
209 | 8 | _path.native(), offset, bytes_req, *bytes_read, _file_size, (max_retries + 1)); |
210 | 8 | LOG(WARNING) << msg; |
211 | 8 | return Status::InternalError(msg); |
212 | 28.8k | } |
213 | | |
214 | 27.4k | void S3FileReader::_collect_profile_before_close() { |
215 | 27.4k | if (_profile != nullptr) { |
216 | 27.4k | const char* s3_profile_name = "S3Profile"; |
217 | 27.4k | ADD_TIMER(_profile, s3_profile_name); |
218 | 27.4k | RuntimeProfile::Counter* total_get_request_counter = |
219 | 27.4k | ADD_CHILD_COUNTER(_profile, "TotalGetRequest", TUnit::UNIT, s3_profile_name); |
220 | 27.4k | RuntimeProfile::Counter* too_many_request_err_counter = |
221 | 27.4k | ADD_CHILD_COUNTER(_profile, "TooManyRequestErr", TUnit::UNIT, s3_profile_name); |
222 | 27.4k | RuntimeProfile::Counter* too_many_request_sleep_time = ADD_CHILD_COUNTER( |
223 | 27.4k | _profile, "TooManyRequestSleepTime", TUnit::TIME_MS, s3_profile_name); |
224 | 27.4k | RuntimeProfile::Counter* total_bytes_read = |
225 | 27.4k | ADD_CHILD_COUNTER(_profile, "TotalBytesRead", TUnit::BYTES, s3_profile_name); |
226 | 27.4k | RuntimeProfile::Counter* total_get_request_time_ns = |
227 | 27.4k | ADD_CHILD_TIMER(_profile, "TotalGetRequestTime", s3_profile_name); |
228 | | |
229 | 27.4k | COUNTER_UPDATE(total_get_request_counter, _s3_stats.total_get_request_counter); |
230 | 27.4k | COUNTER_UPDATE(too_many_request_err_counter, _s3_stats.too_many_request_err_counter); |
231 | 27.4k | COUNTER_UPDATE(too_many_request_sleep_time, _s3_stats.too_many_request_sleep_time_ms); |
232 | 27.4k | COUNTER_UPDATE(total_bytes_read, _s3_stats.total_bytes_read); |
233 | 27.4k | COUNTER_UPDATE(total_get_request_time_ns, _s3_stats.total_get_request_time_ns); |
234 | 27.4k | } |
235 | 27.4k | } |
236 | | |
237 | | } // namespace doris::io |