be/src/io/fs/stream_load_pipe.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/fs/stream_load_pipe.h" |
19 | | |
20 | | #include <glog/logging.h> |
21 | | |
22 | | #include <algorithm> |
23 | | #include <ostream> |
24 | | #include <utility> |
25 | | |
26 | | #include "common/compiler_util.h" // IWYU pragma: keep |
27 | | #include "common/status.h" |
28 | | #include "core/custom_allocator.h" |
29 | | #include "runtime/exec_env.h" |
30 | | #include "runtime/thread_context.h" |
31 | | #include "util/bit_util.h" |
32 | | |
33 | | namespace doris { |
34 | | namespace io { |
35 | | struct IOContext; |
36 | | |
37 | | StreamLoadPipe::StreamLoadPipe(size_t max_buffered_bytes, size_t min_chunk_size, |
38 | | int64_t total_length, bool use_proto) |
39 | 2.44k | : _buffered_bytes(0), |
40 | 2.44k | _proto_buffered_bytes(0), |
41 | 2.44k | _max_buffered_bytes(max_buffered_bytes), |
42 | 2.44k | _min_chunk_size(min_chunk_size), |
43 | 2.44k | _total_length(total_length), |
44 | 2.44k | _use_proto(use_proto) {} |
45 | | |
46 | 2.44k | StreamLoadPipe::~StreamLoadPipe() { |
47 | 4.18k | while (!_buf_queue.empty()) { |
48 | 1.73k | _buf_queue.pop_front(); |
49 | 1.73k | } |
50 | 2.44k | } |
51 | | |
52 | | Status StreamLoadPipe::read_at_impl(size_t /*offset*/, Slice result, size_t* bytes_read, |
53 | 4.64k | const IOContext* /*io_ctx*/) { |
54 | 4.64k | *bytes_read = 0; |
55 | 4.64k | size_t bytes_req = result.size; |
56 | 4.64k | char* to = result.data; |
57 | 4.64k | if (UNLIKELY(bytes_req == 0)) { |
58 | 10 | return Status::OK(); |
59 | 10 | } |
60 | 308k | while (*bytes_read < bytes_req) { |
61 | 308k | std::unique_lock<std::mutex> l(_lock); |
62 | 420k | while (!_cancelled && !_finished && _buf_queue.empty()) { |
63 | 112k | _get_cond.wait(l); |
64 | 112k | } |
65 | | // cancelled |
66 | 308k | if (_cancelled) { |
67 | 0 | return Status::Cancelled("cancelled: {}", _cancelled_reason); |
68 | 0 | } |
69 | | // finished |
70 | 308k | if (_buf_queue.empty()) { |
71 | 4.14k | DCHECK(_finished); |
72 | | // break the while loop |
73 | 4.14k | bytes_req = *bytes_read; |
74 | 4.14k | return Status::OK(); |
75 | 4.14k | } |
76 | 304k | auto buf = _buf_queue.front(); |
77 | 304k | int64_t copy_size = std::min(bytes_req - *bytes_read, buf->remaining()); |
78 | 304k | buf->get_bytes(to + *bytes_read, copy_size); |
79 | 304k | *bytes_read += copy_size; |
80 | 304k | if (!buf->has_remaining()) { |
81 | 303k | _buf_queue.pop_front(); |
82 | 303k | _buffered_bytes -= buf->limit; |
83 | 303k | _put_cond.notify_one(); |
84 | 303k | } |
85 | 304k | } |
86 | 4.63k | DCHECK(*bytes_read == bytes_req) |
87 | 0 | << "*bytes_read=" << *bytes_read << ", bytes_req=" << bytes_req; |
88 | 494 | return Status::OK(); |
89 | 4.63k | } |
90 | | |
91 | | // If _total_length == -1, this should be a Kafka routine load task or stream load with chunked transfer HTTP request, |
92 | | // just get the next buffer directly from the buffer queue, because one buffer contains a complete piece of data. |
93 | | // Otherwise, this should be a stream load task that needs to read the specified amount of data. |
94 | 626 | Status StreamLoadPipe::read_one_message(DorisUniqueBufferPtr<uint8_t>* data, size_t* length) { |
95 | 626 | if (_total_length < -1) { |
96 | 0 | return Status::InternalError("invalid, _total_length is: {}", _total_length); |
97 | 626 | } else if (_total_length == 0) { |
98 | | // no data |
99 | 0 | *length = 0; |
100 | 0 | return Status::OK(); |
101 | 0 | } |
102 | | |
103 | 626 | if (_total_length == -1) { |
104 | 516 | return _read_next_buffer(data, length); |
105 | 516 | } |
106 | | |
107 | | // _total_length > 0, read the entire data |
108 | 110 | *data = make_unique_buffer<uint8_t>(_total_length); |
109 | 110 | Slice result(data->get(), _total_length); |
110 | 110 | Status st = read_at(0, result, length); |
111 | 110 | return st; |
112 | 626 | } |
113 | | |
114 | 280 | Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t proto_byte_size) { |
115 | 280 | SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); |
116 | 280 | ByteBufferPtr buf; |
117 | 280 | RETURN_IF_ERROR(ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(size + 1), &buf)); |
118 | 280 | buf->put_bytes(data, size); |
119 | 280 | buf->flip(); |
120 | 280 | return _append(buf, proto_byte_size); |
121 | 280 | } |
122 | | |
123 | 148 | Status StreamLoadPipe::append(std::unique_ptr<PDataRow>&& row) { |
124 | 148 | PDataRow* row_ptr = row.get(); |
125 | 148 | { |
126 | 148 | std::unique_lock<std::mutex> l(_lock); |
127 | 148 | _data_row_ptrs.emplace_back(std::move(row)); |
128 | 148 | } |
129 | 148 | return append_and_flush(reinterpret_cast<char*>(&row_ptr), sizeof(row_ptr), |
130 | 148 | sizeof(PDataRow*) + row_ptr->ByteSizeLong()); |
131 | 148 | } |
132 | | |
133 | 1.08k | Status StreamLoadPipe::append(const char* data, size_t size) { |
134 | 1.08k | size_t pos = 0; |
135 | 1.08k | if (_write_buf != nullptr) { |
136 | 1.02k | if (size < _write_buf->remaining()) { |
137 | 1.02k | _write_buf->put_bytes(data, size); |
138 | 1.02k | return Status::OK(); |
139 | 1.02k | } else { |
140 | 0 | pos = _write_buf->remaining(); |
141 | 0 | _write_buf->put_bytes(data, pos); |
142 | |
|
143 | 0 | _write_buf->flip(); |
144 | 0 | RETURN_IF_ERROR(_append(_write_buf)); |
145 | 0 | _write_buf.reset(); |
146 | 0 | } |
147 | 1.02k | } |
148 | | // need to allocate a new chunk, min chunk is 64k |
149 | 58 | size_t chunk_size = std::max(_min_chunk_size, size - pos); |
150 | 58 | chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size); |
151 | 58 | SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); |
152 | 58 | RETURN_IF_ERROR(ByteBuffer::allocate(chunk_size, &_write_buf)); |
153 | 58 | _write_buf->put_bytes(data + pos, size - pos); |
154 | 58 | return Status::OK(); |
155 | 58 | } |
156 | | |
157 | 305k | Status StreamLoadPipe::append(const ByteBufferPtr& buf) { |
158 | 305k | if (_write_buf != nullptr) { |
159 | 0 | _write_buf->flip(); |
160 | 0 | RETURN_IF_ERROR(_append(_write_buf)); |
161 | 0 | _write_buf.reset(); |
162 | 0 | } |
163 | 305k | return _append(buf); |
164 | 305k | } |
165 | | |
166 | | // read the next buffer from _buf_queue |
167 | 516 | Status StreamLoadPipe::_read_next_buffer(DorisUniqueBufferPtr<uint8_t>* data, size_t* length) { |
168 | 516 | std::unique_lock<std::mutex> l(_lock); |
169 | 691 | while (!_cancelled && !_finished && _buf_queue.empty()) { |
170 | 175 | _get_cond.wait(l); |
171 | 175 | } |
172 | | // cancelled |
173 | 516 | if (_cancelled) { |
174 | 5 | return Status::Cancelled("cancelled: {}", _cancelled_reason); |
175 | 5 | } |
176 | | // finished |
177 | 511 | if (_buf_queue.empty()) { |
178 | 92 | DCHECK(_finished); |
179 | 92 | data->reset(); |
180 | 92 | *length = 0; |
181 | 92 | return Status::OK(); |
182 | 92 | } |
183 | 419 | auto buf = _buf_queue.front(); |
184 | 419 | *length = buf->remaining(); |
185 | 419 | *data = make_unique_buffer<uint8_t>(*length); |
186 | 419 | buf->get_bytes((char*)(data->get()), *length); |
187 | 419 | _buf_queue.pop_front(); |
188 | 419 | _buffered_bytes -= buf->limit; |
189 | 419 | if (_use_proto) { |
190 | 148 | auto row_ptr = std::move(_data_row_ptrs.front()); |
191 | 148 | _proto_buffered_bytes -= (sizeof(PDataRow*) + row_ptr->GetCachedSize()); |
192 | 148 | _data_row_ptrs.pop_front(); |
193 | | // PlainBinaryLineReader will hold the PDataRow |
194 | 148 | row_ptr.release(); |
195 | 148 | } |
196 | 419 | _put_cond.notify_one(); |
197 | 419 | return Status::OK(); |
198 | 511 | } |
199 | | |
200 | 305k | Status StreamLoadPipe::_append(const ByteBufferPtr& buf, size_t proto_byte_size) { |
201 | 305k | { |
202 | 305k | std::unique_lock<std::mutex> l(_lock); |
203 | | // if _buf_queue is empty, we append this buf without size check |
204 | 305k | if (_use_proto) { |
205 | 148 | while (!_cancelled && !_buf_queue.empty() && |
206 | 148 | (_proto_buffered_bytes + proto_byte_size > _max_buffered_bytes)) { |
207 | 0 | _put_cond.wait(l); |
208 | 0 | } |
209 | 305k | } else { |
210 | 305k | while (!_cancelled && !_buf_queue.empty() && |
211 | 305k | _buffered_bytes + buf->remaining() > _max_buffered_bytes) { |
212 | 200 | _put_cond.wait(l); |
213 | 200 | } |
214 | 305k | } |
215 | 305k | if (_cancelled) { |
216 | 3 | return Status::Cancelled("cancelled: {}", _cancelled_reason); |
217 | 3 | } |
218 | 305k | _buf_queue.push_back(buf); |
219 | 305k | if (_use_proto) { |
220 | 148 | _proto_buffered_bytes += proto_byte_size; |
221 | 305k | } else { |
222 | 305k | _buffered_bytes += buf->remaining(); |
223 | 305k | } |
224 | 305k | } |
225 | 0 | _get_cond.notify_one(); |
226 | 305k | return Status::OK(); |
227 | 305k | } |
228 | | |
229 | | // called when producer finished |
230 | 2.25k | Status StreamLoadPipe::finish() { |
231 | 2.25k | if (_write_buf != nullptr) { |
232 | 57 | _write_buf->flip(); |
233 | 57 | RETURN_IF_ERROR(_append(_write_buf)); |
234 | 57 | _write_buf.reset(); |
235 | 57 | } |
236 | 2.25k | { |
237 | 2.25k | std::lock_guard<std::mutex> l(_lock); |
238 | 2.25k | _finished = true; |
239 | 2.25k | } |
240 | 2.25k | _get_cond.notify_all(); |
241 | 2.25k | return Status::OK(); |
242 | 2.25k | } |
243 | | |
244 | | // called when producer/consumer failed |
245 | 3.34k | void StreamLoadPipe::cancel(const std::string& reason) { |
246 | 3.34k | { |
247 | 3.34k | std::lock_guard<std::mutex> l(_lock); |
248 | 3.34k | _cancelled = true; |
249 | 3.34k | _cancelled_reason = reason; |
250 | 3.34k | } |
251 | 3.34k | _get_cond.notify_all(); |
252 | 3.34k | _put_cond.notify_all(); |
253 | 3.34k | } |
254 | | |
255 | 0 | TUniqueId StreamLoadPipe::calculate_pipe_id(const UniqueId& query_id, int32_t fragment_id) { |
256 | 0 | TUniqueId pipe_id; |
257 | 0 | pipe_id.lo = query_id.lo + fragment_id; |
258 | 0 | pipe_id.hi = query_id.hi; |
259 | 0 | return pipe_id; |
260 | 0 | } |
261 | | |
262 | 15 | size_t StreamLoadPipe::current_capacity() { |
263 | 15 | std::unique_lock<std::mutex> l(_lock); |
264 | 15 | if (_use_proto) { |
265 | 0 | return _proto_buffered_bytes; |
266 | 15 | } else { |
267 | 15 | return _buffered_bytes; |
268 | 15 | } |
269 | 15 | } |
270 | | |
271 | | } // namespace io |
272 | | } // namespace doris |