be/src/service/http/action/stream_load.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "service/http/action/stream_load.h" |
19 | | |
20 | | // use string iequal |
21 | | #include <event2/buffer.h> |
22 | | #include <event2/http.h> |
23 | | #include <gen_cpp/FrontendService.h> |
24 | | #include <gen_cpp/FrontendService_types.h> |
25 | | #include <gen_cpp/HeartbeatService_types.h> |
26 | | #include <gen_cpp/PaloInternalService_types.h> |
27 | | #include <gen_cpp/PlanNodes_types.h> |
28 | | #include <gen_cpp/Types_types.h> |
29 | | #include <sys/time.h> |
30 | | #include <thrift/protocol/TDebugProtocol.h> |
31 | | |
32 | | #include <algorithm> |
33 | | #include <cstdint> |
34 | | #include <cstdlib> |
35 | | #include <ctime> |
36 | | #include <functional> |
37 | | #include <future> |
38 | | #include <sstream> |
39 | | #include <stdexcept> |
40 | | #include <utility> |
41 | | |
42 | | #include "cloud/config.h" |
43 | | #include "common/config.h" |
44 | | #include "common/consts.h" |
45 | | #include "common/logging.h" |
46 | | #include "common/metrics/doris_metrics.h" |
47 | | #include "common/metrics/metrics.h" |
48 | | #include "common/status.h" |
49 | | #include "common/utils.h" |
50 | | #include "io/fs/stream_load_pipe.h" |
51 | | #include "load/group_commit/group_commit_mgr.h" |
52 | | #include "load/load_path_mgr.h" |
53 | | #include "load/message_body_sink.h" |
54 | | #include "load/stream_load/new_load_stream_mgr.h" |
55 | | #include "load/stream_load/stream_load_context.h" |
56 | | #include "load/stream_load/stream_load_executor.h" |
57 | | #include "load/stream_load/stream_load_recorder.h" |
58 | | #include "runtime/exec_env.h" |
59 | | #include "service/http/http_channel.h" |
60 | | #include "service/http/http_common.h" |
61 | | #include "service/http/http_headers.h" |
62 | | #include "service/http/http_request.h" |
63 | | #include "service/http/utils.h" |
64 | | #include "storage/storage_engine.h" |
65 | | #include "util/byte_buffer.h" |
66 | | #include "util/client_cache.h" |
67 | | #include "util/load_util.h" |
68 | | #include "util/string_util.h" |
69 | | #include "util/thrift_rpc_helper.h" |
70 | | #include "util/time.h" |
71 | | #include "util/uid_util.h" |
72 | | #include "util/url_coding.h" |
73 | | |
74 | | namespace doris { |
75 | | using namespace ErrorCode; |
76 | | |
77 | | DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_requests_total, MetricUnit::REQUESTS); |
78 | | DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_duration_ms, MetricUnit::MILLISECONDS); |
79 | | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_current_processing, MetricUnit::REQUESTS); |
80 | | |
81 | | bvar::LatencyRecorder g_stream_load_receive_data_latency_ms("stream_load_receive_data_latency_ms"); |
82 | | bvar::LatencyRecorder g_stream_load_commit_and_publish_latency_ms("stream_load", |
83 | | "commit_and_publish_ms"); |
84 | | |
85 | | static constexpr size_t MIN_CHUNK_SIZE = 64 * 1024; |
86 | | static const std::string CHUNK = "chunked"; |
87 | | static const std::string OFF_MODE = "off_mode"; |
88 | | static const std::string SYNC_MODE = "sync_mode"; |
89 | | static const std::string ASYNC_MODE = "async_mode"; |
90 | | |
91 | | #ifdef BE_TEST |
92 | | TStreamLoadPutResult k_stream_load_put_result; |
93 | | #endif |
94 | | |
95 | 8 | StreamLoadAction::StreamLoadAction(ExecEnv* exec_env) : _exec_env(exec_env) { |
96 | 8 | _stream_load_entity = |
97 | 8 | DorisMetrics::instance()->metric_registry()->register_entity("stream_load"); |
98 | 8 | INT_COUNTER_METRIC_REGISTER(_stream_load_entity, streaming_load_requests_total); |
99 | 8 | INT_COUNTER_METRIC_REGISTER(_stream_load_entity, streaming_load_duration_ms); |
100 | 8 | INT_GAUGE_METRIC_REGISTER(_stream_load_entity, streaming_load_current_processing); |
101 | 8 | } |
102 | | |
103 | 4 | StreamLoadAction::~StreamLoadAction() { |
104 | 4 | DorisMetrics::instance()->metric_registry()->deregister_entity(_stream_load_entity); |
105 | 4 | } |
106 | | |
107 | 1.89k | void StreamLoadAction::handle(HttpRequest* req) { |
108 | 1.89k | std::shared_ptr<StreamLoadContext> ctx = |
109 | 1.89k | std::static_pointer_cast<StreamLoadContext>(req->handler_ctx()); |
110 | 1.89k | if (ctx == nullptr) { |
111 | 0 | return; |
112 | 0 | } |
113 | | |
114 | 1.89k | { |
115 | 1.89k | std::unique_lock<std::mutex> lock1(ctx->_send_reply_lock); |
116 | 1.89k | ctx->_can_send_reply = true; |
117 | 1.89k | ctx->_can_send_reply_cv.notify_all(); |
118 | 1.89k | } |
119 | | |
120 | | // status already set to fail |
121 | 1.89k | if (ctx->status.ok()) { |
122 | 1.89k | ctx->status = _handle(ctx, req); |
123 | 1.89k | if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) { |
124 | 0 | _send_reply(ctx, req); |
125 | 0 | } |
126 | 1.89k | } |
127 | 1.89k | } |
128 | | |
129 | 1.89k | Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req) { |
130 | 1.89k | if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) { |
131 | 0 | LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes |
132 | 0 | << ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id; |
133 | 0 | return Status::Error<ErrorCode::NETWORK_ERROR>("receive body don't equal with body bytes"); |
134 | 0 | } |
135 | | |
136 | | // if we use non-streaming, MessageBodyFileSink.finish will close the file |
137 | 1.89k | RETURN_IF_ERROR(ctx->body_sink->finish()); |
138 | 1.89k | if (!ctx->use_streaming) { |
139 | | // we need to close file first, then execute_plan_fragment here |
140 | 17 | ctx->body_sink.reset(); |
141 | 17 | TPipelineFragmentParamsList mocked; |
142 | 17 | RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment( |
143 | 17 | ctx, mocked, |
144 | 17 | [req, this](std::shared_ptr<StreamLoadContext> ctx) { _on_finish(ctx, req); })); |
145 | 17 | } |
146 | | |
147 | 1.89k | return Status::OK(); |
148 | 1.89k | } |
149 | | |
150 | 1.89k | void StreamLoadAction::_on_finish(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req) { |
151 | 1.89k | ctx->status = ctx->load_status_future.get(); |
152 | 1.89k | if (ctx->status.ok()) { |
153 | 1.57k | if (ctx->group_commit) { |
154 | 23 | LOG(INFO) << "skip commit because this is group commit, pipe_id=" |
155 | 23 | << ctx->id.to_string(); |
156 | 1.55k | } else if (ctx->two_phase_commit) { |
157 | 32 | int64_t pre_commit_start_time = MonotonicNanos(); |
158 | 32 | ctx->status = _exec_env->stream_load_executor()->pre_commit_txn(ctx.get()); |
159 | 32 | ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time; |
160 | 1.52k | } else { |
161 | | // If put file success we need commit this load |
162 | 1.52k | int64_t commit_and_publish_start_time = MonotonicNanos(); |
163 | 1.52k | ctx->status = _exec_env->stream_load_executor()->commit_txn(ctx.get()); |
164 | 1.52k | ctx->commit_and_publish_txn_cost_nanos = |
165 | 1.52k | MonotonicNanos() - commit_and_publish_start_time; |
166 | 1.52k | g_stream_load_commit_and_publish_latency_ms |
167 | 1.52k | << ctx->commit_and_publish_txn_cost_nanos / 1000000; |
168 | 1.52k | } |
169 | 1.57k | } |
170 | 1.89k | _send_reply(ctx, req); |
171 | 1.89k | } |
172 | | |
173 | 2.07k | void StreamLoadAction::_send_reply(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req) { |
174 | 2.07k | std::unique_lock<std::mutex> lock1(ctx->_send_reply_lock); |
175 | | // 1. _can_send_reply: ensure `send_reply` is invoked only after on_header/handle complete, |
176 | | // avoid client errors (e.g., broken pipe). |
177 | | // 2. _finish_send_reply: Prevent duplicate reply sending; skip reply if HTTP request is canceled |
178 | | // due to long import execution time. |
179 | 2.08k | while (!ctx->_finish_send_reply && !ctx->_can_send_reply) { |
180 | 3 | ctx->_can_send_reply_cv.wait(lock1); |
181 | 3 | } |
182 | 2.07k | if (ctx->_finish_send_reply) { |
183 | 0 | return; |
184 | 0 | } |
185 | 2.07k | DCHECK(ctx->_can_send_reply); |
186 | 2.07k | ctx->_finish_send_reply = true; |
187 | 2.07k | ctx->_can_send_reply_cv.notify_all(); |
188 | 2.07k | ctx->load_cost_millis = UnixMillis() - ctx->start_millis; |
189 | | |
190 | 2.07k | if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) { |
191 | 498 | LOG(WARNING) << "handle streaming load failed, id=" << ctx->id |
192 | 498 | << ", errmsg=" << ctx->status; |
193 | 498 | if (ctx->need_rollback) { |
194 | 467 | _exec_env->stream_load_executor()->rollback_txn(ctx.get()); |
195 | 467 | ctx->need_rollback = false; |
196 | 467 | } |
197 | 498 | if (ctx->body_sink != nullptr) { |
198 | 467 | ctx->body_sink->cancel(ctx->status.to_string()); |
199 | 467 | } |
200 | 498 | } |
201 | | |
202 | 2.07k | auto str = ctx->to_json(); |
203 | | // add new line at end |
204 | 2.07k | str = str + '\n'; |
205 | | |
206 | 2.07k | #ifndef BE_TEST |
207 | 2.07k | if (config::enable_stream_load_record || config::enable_stream_load_record_to_audit_log_table) { |
208 | 2.07k | if (req->header(HTTP_SKIP_RECORD_TO_AUDIT_LOG_TABLE).empty()) { |
209 | 2.02k | str = ctx->prepare_stream_load_record(str); |
210 | 2.02k | _save_stream_load_record(ctx, str); |
211 | 2.02k | } |
212 | 2.07k | } |
213 | 2.07k | #endif |
214 | | |
215 | 2.07k | HttpChannel::send_reply(req, str); |
216 | | |
217 | 2.07k | LOG(INFO) << "finished to execute stream load. label=" << ctx->label |
218 | 2.07k | << ", txn_id=" << ctx->txn_id << ", query_id=" << ctx->id |
219 | 2.07k | << ", load_cost_ms=" << ctx->load_cost_millis << ", receive_data_cost_ms=" |
220 | 2.07k | << (ctx->receive_and_read_data_cost_nanos - ctx->read_data_cost_nanos) / 1000000 |
221 | 2.07k | << ", read_data_cost_ms=" << ctx->read_data_cost_nanos / 1000000 |
222 | 2.07k | << ", write_data_cost_ms=" << ctx->write_data_cost_nanos / 1000000 |
223 | 2.07k | << ", commit_and_publish_txn_cost_ms=" |
224 | 2.07k | << ctx->commit_and_publish_txn_cost_nanos / 1000000 |
225 | 2.07k | << ", number_total_rows=" << ctx->number_total_rows |
226 | 2.07k | << ", number_loaded_rows=" << ctx->number_loaded_rows |
227 | 2.07k | << ", receive_bytes=" << ctx->receive_bytes << ", loaded_bytes=" << ctx->loaded_bytes |
228 | 2.07k | << ", error_url=" << ctx->error_url; |
229 | | |
230 | | // update statistics |
231 | 2.07k | streaming_load_requests_total->increment(1); |
232 | 2.07k | streaming_load_duration_ms->increment(ctx->load_cost_millis); |
233 | 2.07k | if (!ctx->data_saved_path.empty()) { |
234 | 18 | _exec_env->load_path_mgr()->clean_tmp_files(ctx->data_saved_path); |
235 | 18 | } |
236 | 2.07k | } |
237 | | |
238 | 2.07k | int StreamLoadAction::on_header(HttpRequest* req) { |
239 | 2.07k | req->mark_send_reply(); |
240 | | |
241 | 2.07k | streaming_load_current_processing->increment(1); |
242 | | |
243 | 2.07k | std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env); |
244 | 2.07k | req->set_handler_ctx(ctx); |
245 | | |
246 | 2.07k | ctx->load_type = TLoadType::MANUL_LOAD; |
247 | 2.07k | ctx->load_src_type = TLoadSourceType::RAW; |
248 | | |
249 | 2.07k | url_decode(req->param(HTTP_DB_KEY), &ctx->db); |
250 | 2.07k | url_decode(req->param(HTTP_TABLE_KEY), &ctx->table); |
251 | 2.07k | ctx->label = req->header(HTTP_LABEL_KEY); |
252 | 2.07k | ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true"; |
253 | 2.07k | Status st = _handle_group_commit(req, ctx); |
254 | 2.07k | if (!ctx->group_commit && ctx->label.empty()) { |
255 | 339 | ctx->label = generate_uuid_string(); |
256 | 339 | } |
257 | | |
258 | 2.07k | LOG(INFO) << "new income streaming load request." << ctx->brief() << ", db=" << ctx->db |
259 | 2.07k | << ", tbl=" << ctx->table << ", group_commit=" << ctx->group_commit |
260 | 2.07k | << ", group_commit_mode=" << ctx->group_commit_mode |
261 | 2.07k | << ", HTTP headers=" << req->get_all_headers(); |
262 | 2.07k | ctx->begin_receive_and_read_data_cost_nanos = MonotonicNanos(); |
263 | | |
264 | 2.07k | if (st.ok()) { |
265 | 2.07k | st = _on_header(req, ctx); |
266 | 2.07k | LOG(INFO) << "finished to handle HTTP header, " << ctx->brief(); |
267 | 2.07k | } |
268 | 2.07k | if (!st.ok()) { |
269 | 180 | ctx->status = std::move(st); |
270 | 180 | { |
271 | 180 | std::unique_lock<std::mutex> lock1(ctx->_send_reply_lock); |
272 | 180 | ctx->_can_send_reply = true; |
273 | 180 | ctx->_can_send_reply_cv.notify_all(); |
274 | 180 | } |
275 | 180 | _send_reply(ctx, req); |
276 | 180 | return -1; |
277 | 180 | } |
278 | 1.89k | return 0; |
279 | 2.07k | } |
280 | | |
281 | 2.07k | Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx) { |
282 | | // auth information |
283 | 2.07k | if (!parse_basic_auth(*http_req, &ctx->auth)) { |
284 | 0 | LOG(WARNING) << "parse basic authorization failed." << ctx->brief(); |
285 | 0 | return Status::NotAuthorized("no valid Basic authorization"); |
286 | 0 | } |
287 | | |
288 | | // get format of this put |
289 | 2.07k | std::string format_str = http_req->header(HTTP_FORMAT_KEY); |
290 | 2.07k | if (iequal(format_str, BeConsts::CSV_WITH_NAMES) || |
291 | 2.07k | iequal(format_str, BeConsts::CSV_WITH_NAMES_AND_TYPES)) { |
292 | 8 | ctx->header_type = format_str; |
293 | | //treat as CSV |
294 | 8 | format_str = BeConsts::CSV; |
295 | 8 | } |
296 | 2.07k | LoadUtil::parse_format(format_str, http_req->header(HTTP_COMPRESS_TYPE), &ctx->format, |
297 | 2.07k | &ctx->compress_type); |
298 | 2.07k | if (ctx->format == TFileFormatType::FORMAT_UNKNOWN) { |
299 | 1 | return Status::Error<ErrorCode::DATA_FILE_TYPE_ERROR>("unknown data format, format={}", |
300 | 1 | http_req->header(HTTP_FORMAT_KEY)); |
301 | 1 | } |
302 | | |
303 | | // check content length |
304 | 2.07k | ctx->body_bytes = 0; |
305 | 2.07k | size_t csv_max_body_bytes = config::streaming_load_max_mb * 1024 * 1024; |
306 | 2.07k | size_t json_max_body_bytes = config::streaming_load_json_max_mb * 1024 * 1024; |
307 | 2.07k | bool read_json_by_line = false; |
308 | 2.07k | if (!http_req->header(HTTP_READ_JSON_BY_LINE).empty()) { |
309 | 329 | if (iequal(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) { |
310 | 327 | read_json_by_line = true; |
311 | 327 | } |
312 | 329 | } |
313 | 2.07k | if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) { |
314 | 1.76k | try { |
315 | 1.76k | ctx->body_bytes = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); |
316 | 1.76k | } catch (const std::exception& e) { |
317 | 0 | return Status::InvalidArgument("invalid HTTP header CONTENT_LENGTH={}: {}", |
318 | 0 | http_req->header(HttpHeaders::CONTENT_LENGTH), e.what()); |
319 | 0 | } |
320 | | // json max body size |
321 | 1.76k | if ((ctx->format == TFileFormatType::FORMAT_JSON) && |
322 | 1.76k | (ctx->body_bytes > json_max_body_bytes) && !read_json_by_line) { |
323 | 0 | return Status::Error<ErrorCode::EXCEEDED_LIMIT>( |
324 | 0 | "json body size {} exceed BE's conf `streaming_load_json_max_mb` {}. increase " |
325 | 0 | "it if you are sure this load is reasonable", |
326 | 0 | ctx->body_bytes, json_max_body_bytes); |
327 | 0 | } |
328 | | // csv max body size |
329 | 1.76k | else if (ctx->body_bytes > csv_max_body_bytes) { |
330 | 0 | LOG(WARNING) << "body exceed max size." << ctx->brief(); |
331 | 0 | return Status::Error<ErrorCode::EXCEEDED_LIMIT>( |
332 | 0 | "body size {} exceed BE's conf `streaming_load_max_mb` {}. increase it if you " |
333 | 0 | "are sure this load is reasonable", |
334 | 0 | ctx->body_bytes, csv_max_body_bytes); |
335 | 0 | } |
336 | 1.76k | } else { |
337 | 314 | #ifndef BE_TEST |
338 | 314 | evhttp_connection_set_max_body_size( |
339 | 314 | evhttp_request_get_connection(http_req->get_evhttp_request()), csv_max_body_bytes); |
340 | 314 | #endif |
341 | 314 | } |
342 | | |
343 | 2.07k | if (!http_req->header(HttpHeaders::TRANSFER_ENCODING).empty()) { |
344 | 314 | if (http_req->header(HttpHeaders::TRANSFER_ENCODING).find(CHUNK) != std::string::npos) { |
345 | 314 | ctx->is_chunked_transfer = true; |
346 | 314 | } |
347 | 314 | } |
348 | 2.07k | if (UNLIKELY((http_req->header(HttpHeaders::CONTENT_LENGTH).empty() && |
349 | 2.07k | !ctx->is_chunked_transfer))) { |
350 | 1 | LOG(WARNING) << "content_length is empty and transfer-encoding!=chunked, please set " |
351 | 1 | "content_length or transfer-encoding=chunked"; |
352 | 1 | return Status::InvalidArgument( |
353 | 1 | "content_length is empty and transfer-encoding!=chunked, please set content_length " |
354 | 1 | "or transfer-encoding=chunked"); |
355 | 2.07k | } else if (UNLIKELY(!http_req->header(HttpHeaders::CONTENT_LENGTH).empty() && |
356 | 2.07k | ctx->is_chunked_transfer)) { |
357 | 1 | LOG(WARNING) << "please do not set both content_length and transfer-encoding"; |
358 | 1 | return Status::InvalidArgument( |
359 | 1 | "please do not set both content_length and transfer-encoding"); |
360 | 1 | } |
361 | | |
362 | 2.07k | if (!http_req->header(HTTP_TIMEOUT).empty()) { |
363 | 70 | ctx->timeout_second = DORIS_TRY(safe_stoi(http_req->header(HTTP_TIMEOUT), HTTP_TIMEOUT)); |
364 | 69 | } |
365 | 2.07k | if (!http_req->header(HTTP_COMMENT).empty()) { |
366 | 1 | ctx->load_comment = http_req->header(HTTP_COMMENT); |
367 | 1 | } |
368 | | // begin transaction |
369 | 2.07k | if (!ctx->group_commit) { |
370 | 2.05k | int64_t begin_txn_start_time = MonotonicNanos(); |
371 | 2.05k | RETURN_IF_ERROR(_exec_env->stream_load_executor()->begin_txn(ctx.get())); |
372 | 2.02k | ctx->begin_txn_cost_nanos = MonotonicNanos() - begin_txn_start_time; |
373 | 2.02k | if (ctx->group_commit) { |
374 | 2 | RETURN_IF_ERROR(_check_wal_space(ctx->group_commit_mode, ctx->body_bytes)); |
375 | 2 | } |
376 | 2.02k | } |
377 | | |
378 | | // process put file |
379 | 2.04k | return _process_put(http_req, ctx); |
380 | 2.07k | } |
381 | | |
382 | 315k | void StreamLoadAction::on_chunk_data(HttpRequest* req) { |
383 | 315k | std::shared_ptr<StreamLoadContext> ctx = |
384 | 315k | std::static_pointer_cast<StreamLoadContext>(req->handler_ctx()); |
385 | 315k | if (ctx == nullptr || !ctx->status.ok()) { |
386 | 20.7k | return; |
387 | 20.7k | } |
388 | | |
389 | 294k | struct evhttp_request* ev_req = req->get_evhttp_request(); |
390 | 294k | auto evbuf = evhttp_request_get_input_buffer(ev_req); |
391 | | |
392 | 294k | SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); |
393 | | |
394 | 294k | int64_t start_read_data_time = MonotonicNanos(); |
395 | 589k | while (evbuffer_get_length(evbuf) > 0) { |
396 | 294k | ByteBufferPtr bb; |
397 | 294k | Status st = ByteBuffer::allocate(128 * 1024, &bb); |
398 | 294k | if (!st.ok()) { |
399 | 0 | ctx->status = st; |
400 | 0 | return; |
401 | 0 | } |
402 | 294k | auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity); |
403 | 294k | bb->pos = remove_bytes; |
404 | 294k | bb->flip(); |
405 | 294k | st = ctx->body_sink->append(bb); |
406 | 294k | if (!st.ok()) { |
407 | 3 | LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief(); |
408 | 3 | ctx->status = st; |
409 | 3 | return; |
410 | 3 | } |
411 | 294k | ctx->receive_bytes += remove_bytes; |
412 | 294k | } |
413 | 294k | int64_t read_data_time = MonotonicNanos() - start_read_data_time; |
414 | 294k | int64_t last_receive_and_read_data_cost_nanos = ctx->receive_and_read_data_cost_nanos; |
415 | 294k | ctx->read_data_cost_nanos += read_data_time; |
416 | 294k | ctx->receive_and_read_data_cost_nanos = |
417 | 294k | MonotonicNanos() - ctx->begin_receive_and_read_data_cost_nanos; |
418 | 294k | g_stream_load_receive_data_latency_ms |
419 | 294k | << (ctx->receive_and_read_data_cost_nanos - last_receive_and_read_data_cost_nanos - |
420 | 294k | read_data_time) / |
421 | 294k | 1000000; |
422 | 294k | } |
423 | | |
424 | 2.07k | void StreamLoadAction::free_handler_ctx(std::shared_ptr<void> param) { |
425 | 2.07k | std::shared_ptr<StreamLoadContext> ctx = std::static_pointer_cast<StreamLoadContext>(param); |
426 | 2.07k | if (ctx == nullptr) { |
427 | 0 | return; |
428 | 0 | } |
429 | | // sender is gone, make receiver know it |
430 | 2.07k | if (ctx->body_sink != nullptr) { |
431 | 2.03k | ctx->body_sink->cancel("sender is gone"); |
432 | 2.03k | } |
433 | | // remove stream load context from stream load manager and the resource will be released |
434 | 2.07k | ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id); |
435 | 2.07k | streaming_load_current_processing->increment(-1); |
436 | 2.07k | } |
437 | | |
438 | | Status StreamLoadAction::_process_put(HttpRequest* http_req, |
439 | 2.04k | std::shared_ptr<StreamLoadContext> ctx) { |
440 | | // Now we use stream |
441 | 2.04k | ctx->use_streaming = LoadUtil::is_format_support_streaming(ctx->format); |
442 | | |
443 | | // put request |
444 | 2.04k | TStreamLoadPutRequest request; |
445 | 2.04k | set_request_auth(&request, ctx->auth); |
446 | 2.04k | request.db = ctx->db; |
447 | 2.04k | request.tbl = ctx->table; |
448 | 2.04k | request.txnId = ctx->txn_id; |
449 | 2.04k | request.formatType = ctx->format; |
450 | 2.04k | request.__set_compress_type(ctx->compress_type); |
451 | 2.04k | request.__set_header_type(ctx->header_type); |
452 | 2.04k | request.__set_loadId(ctx->id.to_thrift()); |
453 | 2.04k | if (ctx->use_streaming) { |
454 | 2.02k | std::shared_ptr<io::StreamLoadPipe> pipe; |
455 | 2.02k | if (ctx->is_chunked_transfer) { |
456 | 313 | pipe = std::make_shared<io::StreamLoadPipe>( |
457 | 313 | io::kMaxPipeBufferedBytes /* max_buffered_bytes */); |
458 | 313 | pipe->set_is_chunked_transfer(true); |
459 | 1.71k | } else { |
460 | 1.71k | pipe = std::make_shared<io::StreamLoadPipe>( |
461 | 1.71k | io::kMaxPipeBufferedBytes /* max_buffered_bytes */, |
462 | 1.71k | MIN_CHUNK_SIZE /* min_chunk_size */, ctx->body_bytes /* total_length */); |
463 | 1.71k | } |
464 | 2.02k | request.fileType = TFileType::FILE_STREAM; |
465 | 2.02k | ctx->body_sink = pipe; |
466 | 2.02k | ctx->pipe = pipe; |
467 | 2.02k | RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, ctx)); |
468 | 2.02k | } else { |
469 | 18 | RETURN_IF_ERROR(_data_saved_path(http_req, &request.path, ctx->body_bytes)); |
470 | 18 | auto file_sink = std::make_shared<MessageBodyFileSink>(request.path); |
471 | 18 | RETURN_IF_ERROR(file_sink->open()); |
472 | 18 | request.__isset.path = true; |
473 | 18 | request.fileType = TFileType::FILE_LOCAL; |
474 | 18 | request.__set_file_size(ctx->body_bytes); |
475 | 18 | ctx->body_sink = file_sink; |
476 | 18 | ctx->data_saved_path = request.path; |
477 | 18 | } |
478 | 2.04k | if (!http_req->header(HTTP_COLUMNS).empty()) { |
479 | 1.01k | request.__set_columns(http_req->header(HTTP_COLUMNS)); |
480 | 1.01k | } |
481 | 2.04k | if (!http_req->header(HTTP_WHERE).empty()) { |
482 | 10 | request.__set_where(http_req->header(HTTP_WHERE)); |
483 | 10 | } |
484 | 2.04k | if (!http_req->header(HTTP_COLUMN_SEPARATOR).empty()) { |
485 | 1.43k | request.__set_columnSeparator(http_req->header(HTTP_COLUMN_SEPARATOR)); |
486 | 1.43k | } |
487 | 2.04k | if (!http_req->header(HTTP_LINE_DELIMITER).empty()) { |
488 | 41 | request.__set_line_delimiter(http_req->header(HTTP_LINE_DELIMITER)); |
489 | 41 | } |
490 | 2.04k | if (!http_req->header(HTTP_ENCLOSE).empty() && !http_req->header(HTTP_ENCLOSE).empty()) { |
491 | 80 | const auto& enclose_str = http_req->header(HTTP_ENCLOSE); |
492 | 80 | if (enclose_str.length() != 1) { |
493 | 0 | return Status::InvalidArgument("enclose must be single-char, actually is {}", |
494 | 0 | enclose_str); |
495 | 0 | } |
496 | 80 | request.__set_enclose(http_req->header(HTTP_ENCLOSE)[0]); |
497 | 80 | } |
498 | 2.04k | if (!http_req->header(HTTP_ESCAPE).empty() && !http_req->header(HTTP_ESCAPE).empty()) { |
499 | 80 | const auto& escape_str = http_req->header(HTTP_ESCAPE); |
500 | 80 | if (escape_str.length() != 1) { |
501 | 0 | return Status::InvalidArgument("escape must be single-char, actually is {}", |
502 | 0 | escape_str); |
503 | 0 | } |
504 | 80 | request.__set_escape(http_req->header(HTTP_ESCAPE)[0]); |
505 | 80 | } |
506 | 2.04k | if (!http_req->header(HTTP_PARTITIONS).empty()) { |
507 | 15 | request.__set_partitions(http_req->header(HTTP_PARTITIONS)); |
508 | 15 | request.__set_isTempPartition(false); |
509 | 15 | if (!http_req->header(HTTP_TEMP_PARTITIONS).empty()) { |
510 | 0 | return Status::InvalidArgument( |
511 | 0 | "Can not specify both partitions and temporary partitions"); |
512 | 0 | } |
513 | 15 | } |
514 | 2.04k | if (!http_req->header(HTTP_TEMP_PARTITIONS).empty()) { |
515 | 1 | request.__set_partitions(http_req->header(HTTP_TEMP_PARTITIONS)); |
516 | 1 | request.__set_isTempPartition(true); |
517 | 1 | if (!http_req->header(HTTP_PARTITIONS).empty()) { |
518 | 0 | return Status::InvalidArgument( |
519 | 0 | "Can not specify both partitions and temporary partitions"); |
520 | 0 | } |
521 | 1 | } |
522 | 2.04k | if (!http_req->header(HTTP_NEGATIVE).empty() && http_req->header(HTTP_NEGATIVE) == "true") { |
523 | 0 | request.__set_negative(true); |
524 | 2.04k | } else { |
525 | 2.04k | request.__set_negative(false); |
526 | 2.04k | } |
527 | 2.04k | bool strictMode = false; |
528 | 2.04k | if (!http_req->header(HTTP_STRICT_MODE).empty()) { |
529 | 348 | if (iequal(http_req->header(HTTP_STRICT_MODE), "false")) { |
530 | 217 | strictMode = false; |
531 | 217 | } else if (iequal(http_req->header(HTTP_STRICT_MODE), "true")) { |
532 | 131 | strictMode = true; |
533 | 131 | } else { |
534 | 0 | return Status::InvalidArgument("Invalid strict mode format. Must be bool type"); |
535 | 0 | } |
536 | 348 | request.__set_strictMode(strictMode); |
537 | 348 | } |
538 | | // timezone first. if not, try system time_zone |
539 | 2.04k | if (!http_req->header(HTTP_TIMEZONE).empty()) { |
540 | 17 | request.__set_timezone(http_req->header(HTTP_TIMEZONE)); |
541 | 2.03k | } else if (!http_req->header(HTTP_TIME_ZONE).empty()) { |
542 | 0 | request.__set_timezone(http_req->header(HTTP_TIME_ZONE)); |
543 | 0 | } |
544 | 2.04k | if (!http_req->header(HTTP_EXEC_MEM_LIMIT).empty()) { |
545 | 22 | try { |
546 | 22 | request.__set_execMemLimit(std::stoll(http_req->header(HTTP_EXEC_MEM_LIMIT))); |
547 | 22 | } catch (const std::invalid_argument& e) { |
548 | 7 | return Status::InvalidArgument("Invalid mem limit format, {}", e.what()); |
549 | 7 | } |
550 | 22 | } |
551 | 2.04k | if (!http_req->header(HTTP_JSONPATHS).empty()) { |
552 | 27 | request.__set_jsonpaths(http_req->header(HTTP_JSONPATHS)); |
553 | 27 | } |
554 | 2.04k | if (!http_req->header(HTTP_JSONROOT).empty()) { |
555 | 7 | request.__set_json_root(http_req->header(HTTP_JSONROOT)); |
556 | 7 | } |
557 | 2.04k | if (!http_req->header(HTTP_STRIP_OUTER_ARRAY).empty()) { |
558 | 66 | if (iequal(http_req->header(HTTP_STRIP_OUTER_ARRAY), "true")) { |
559 | 60 | request.__set_strip_outer_array(true); |
560 | 60 | } else { |
561 | 6 | request.__set_strip_outer_array(false); |
562 | 6 | } |
563 | 1.97k | } else { |
564 | 1.97k | request.__set_strip_outer_array(false); |
565 | 1.97k | } |
566 | | |
567 | 2.04k | if (!http_req->header(HTTP_READ_JSON_BY_LINE).empty()) { |
568 | 322 | if (iequal(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) { |
569 | 320 | request.__set_read_json_by_line(true); |
570 | 320 | } else { |
571 | 2 | request.__set_read_json_by_line(false); |
572 | 2 | } |
573 | 1.71k | } else { |
574 | 1.71k | request.__set_read_json_by_line(false); |
575 | 1.71k | } |
576 | | |
577 | 2.04k | if (http_req->header(HTTP_READ_JSON_BY_LINE).empty() && |
578 | 2.04k | http_req->header(HTTP_STRIP_OUTER_ARRAY).empty()) { |
579 | 1.66k | request.__set_read_json_by_line(true); |
580 | 1.66k | request.__set_strip_outer_array(false); |
581 | 1.66k | } |
582 | | |
583 | 2.04k | if (!http_req->header(HTTP_NUM_AS_STRING).empty()) { |
584 | 2 | if (iequal(http_req->header(HTTP_NUM_AS_STRING), "true")) { |
585 | 1 | request.__set_num_as_string(true); |
586 | 1 | } else { |
587 | 1 | request.__set_num_as_string(false); |
588 | 1 | } |
589 | 2.03k | } else { |
590 | 2.03k | request.__set_num_as_string(false); |
591 | 2.03k | } |
592 | 2.04k | if (!http_req->header(HTTP_FUZZY_PARSE).empty()) { |
593 | 9 | if (iequal(http_req->header(HTTP_FUZZY_PARSE), "true")) { |
594 | 8 | request.__set_fuzzy_parse(true); |
595 | 8 | } else { |
596 | 1 | request.__set_fuzzy_parse(false); |
597 | 1 | } |
598 | 2.03k | } else { |
599 | 2.03k | request.__set_fuzzy_parse(false); |
600 | 2.03k | } |
601 | | |
602 | 2.04k | if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) { |
603 | 90 | request.__set_sequence_col( |
604 | 90 | http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL)); |
605 | 90 | } |
606 | | |
607 | 2.04k | if (!http_req->header(HTTP_SEND_BATCH_PARALLELISM).empty()) { |
608 | 4 | int parallelism = DORIS_TRY(safe_stoi(http_req->header(HTTP_SEND_BATCH_PARALLELISM), |
609 | 2 | HTTP_SEND_BATCH_PARALLELISM)); |
610 | 2 | request.__set_send_batch_parallelism(parallelism); |
611 | 2 | } |
612 | | |
613 | 2.03k | if (!http_req->header(HTTP_LOAD_TO_SINGLE_TABLET).empty()) { |
614 | 8 | if (iequal(http_req->header(HTTP_LOAD_TO_SINGLE_TABLET), "true")) { |
615 | 7 | request.__set_load_to_single_tablet(true); |
616 | 7 | } else { |
617 | 1 | request.__set_load_to_single_tablet(false); |
618 | 1 | } |
619 | 8 | } |
620 | | |
621 | 2.03k | if (ctx->timeout_second != -1) { |
622 | 69 | request.__set_timeout(ctx->timeout_second); |
623 | 69 | } |
624 | 2.03k | request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms); |
625 | 2.03k | TMergeType::type merge_type = TMergeType::APPEND; |
626 | 2.03k | StringCaseMap<TMergeType::type> merge_type_map = {{"APPEND", TMergeType::APPEND}, |
627 | 2.03k | {"DELETE", TMergeType::DELETE}, |
628 | 2.03k | {"MERGE", TMergeType::MERGE}}; |
629 | 2.03k | if (!http_req->header(HTTP_MERGE_TYPE).empty()) { |
630 | 75 | std::string merge_type_str = http_req->header(HTTP_MERGE_TYPE); |
631 | 75 | auto iter = merge_type_map.find(merge_type_str); |
632 | 75 | if (iter != merge_type_map.end()) { |
633 | 74 | merge_type = iter->second; |
634 | 74 | } else { |
635 | 1 | return Status::InvalidArgument("Invalid merge type {}", merge_type_str); |
636 | 1 | } |
637 | 74 | if (merge_type == TMergeType::MERGE && http_req->header(HTTP_DELETE_CONDITION).empty()) { |
638 | 0 | return Status::InvalidArgument("Excepted DELETE ON clause when merge type is MERGE."); |
639 | 74 | } else if (merge_type != TMergeType::MERGE && |
640 | 74 | !http_req->header(HTTP_DELETE_CONDITION).empty()) { |
641 | 0 | return Status::InvalidArgument( |
642 | 0 | "Not support DELETE ON clause when merge type is not MERGE."); |
643 | 0 | } |
644 | 74 | } |
645 | 2.03k | request.__set_merge_type(merge_type); |
646 | 2.03k | if (!http_req->header(HTTP_DELETE_CONDITION).empty()) { |
647 | 17 | request.__set_delete_condition(http_req->header(HTTP_DELETE_CONDITION)); |
648 | 17 | } |
649 | | |
650 | 2.03k | if (!http_req->header(HTTP_MAX_FILTER_RATIO).empty()) { |
651 | 216 | ctx->max_filter_ratio = strtod(http_req->header(HTTP_MAX_FILTER_RATIO).c_str(), nullptr); |
652 | 216 | request.__set_max_filter_ratio(ctx->max_filter_ratio); |
653 | 216 | } |
654 | | |
655 | 2.03k | if (!http_req->header(HTTP_HIDDEN_COLUMNS).empty()) { |
656 | 6 | request.__set_hidden_columns(http_req->header(HTTP_HIDDEN_COLUMNS)); |
657 | 6 | } |
658 | 2.03k | if (!http_req->header(HTTP_TRIM_DOUBLE_QUOTES).empty()) { |
659 | 89 | if (iequal(http_req->header(HTTP_TRIM_DOUBLE_QUOTES), "true")) { |
660 | 25 | request.__set_trim_double_quotes(true); |
661 | 64 | } else { |
662 | 64 | request.__set_trim_double_quotes(false); |
663 | 64 | } |
664 | 89 | } |
665 | 2.03k | if (!http_req->header(HTTP_SKIP_LINES).empty()) { |
666 | 81 | int skip_lines = DORIS_TRY(safe_stoi(http_req->header(HTTP_SKIP_LINES), HTTP_SKIP_LINES)); |
667 | 81 | if (skip_lines < 0) { |
668 | 1 | return Status::InvalidArgument("Invalid 'skip_lines': {}", skip_lines); |
669 | 1 | } |
670 | 80 | request.__set_skip_lines(skip_lines); |
671 | 80 | } |
672 | 2.03k | if (!http_req->header(HTTP_ENABLE_PROFILE).empty()) { |
673 | 0 | if (iequal(http_req->header(HTTP_ENABLE_PROFILE), "true")) { |
674 | 0 | request.__set_enable_profile(true); |
675 | 0 | } else { |
676 | 0 | request.__set_enable_profile(false); |
677 | 0 | } |
678 | 0 | } |
679 | | |
680 | 2.03k | if (!http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty()) { |
681 | 170 | static const StringCaseMap<TUniqueKeyUpdateMode::type> unique_key_update_mode_map = { |
682 | 170 | {"UPSERT", TUniqueKeyUpdateMode::UPSERT}, |
683 | 170 | {"UPDATE_FIXED_COLUMNS", TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS}, |
684 | 170 | {"UPDATE_FLEXIBLE_COLUMNS", TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS}}; |
685 | 170 | std::string unique_key_update_mode_str = http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE); |
686 | 170 | auto iter = unique_key_update_mode_map.find(unique_key_update_mode_str); |
687 | 170 | if (iter != unique_key_update_mode_map.end()) { |
688 | 169 | TUniqueKeyUpdateMode::type unique_key_update_mode = iter->second; |
689 | 169 | if (unique_key_update_mode == TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS) { |
690 | | // check constraints when flexible partial update is enabled |
691 | 169 | if (ctx->format != TFileFormatType::FORMAT_JSON) { |
692 | 1 | return Status::InvalidArgument( |
693 | 1 | "flexible partial update only support json format as input file " |
694 | 1 | "currently"); |
695 | 1 | } |
696 | 168 | if (!http_req->header(HTTP_FUZZY_PARSE).empty() && |
697 | 168 | iequal(http_req->header(HTTP_FUZZY_PARSE), "true")) { |
698 | 1 | return Status::InvalidArgument( |
699 | 1 | "Don't support flexible partial update when 'fuzzy_parse' is enabled"); |
700 | 1 | } |
701 | 167 | if (!http_req->header(HTTP_COLUMNS).empty()) { |
702 | 1 | return Status::InvalidArgument( |
703 | 1 | "Don't support flexible partial update when 'columns' is specified"); |
704 | 1 | } |
705 | 166 | if (!http_req->header(HTTP_JSONPATHS).empty()) { |
706 | 1 | return Status::InvalidArgument( |
707 | 1 | "Don't support flexible partial update when 'jsonpaths' is specified"); |
708 | 1 | } |
709 | 165 | if (!http_req->header(HTTP_HIDDEN_COLUMNS).empty()) { |
710 | 1 | return Status::InvalidArgument( |
711 | 1 | "Don't support flexible partial update when 'hidden_columns' is " |
712 | 1 | "specified"); |
713 | 1 | } |
714 | 164 | if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) { |
715 | 0 | return Status::InvalidArgument( |
716 | 0 | "Don't support flexible partial update when " |
717 | 0 | "'function_column.sequence_col' is specified"); |
718 | 0 | } |
719 | 164 | if (!http_req->header(HTTP_MERGE_TYPE).empty()) { |
720 | 1 | return Status::InvalidArgument( |
721 | 1 | "Don't support flexible partial update when " |
722 | 1 | "'merge_type' is specified"); |
723 | 1 | } |
724 | 163 | if (!http_req->header(HTTP_WHERE).empty()) { |
725 | 1 | return Status::InvalidArgument( |
726 | 1 | "Don't support flexible partial update when " |
727 | 1 | "'where' is specified"); |
728 | 1 | } |
729 | 163 | } |
730 | 162 | request.__set_unique_key_update_mode(unique_key_update_mode); |
731 | 162 | } else { |
732 | 1 | return Status::InvalidArgument( |
733 | 1 | "Invalid unique_key_partial_mode {}, must be one of 'UPSERT', " |
734 | 1 | "'UPDATE_FIXED_COLUMNS' or 'UPDATE_FLEXIBLE_COLUMNS'", |
735 | 1 | unique_key_update_mode_str); |
736 | 1 | } |
737 | 170 | } |
738 | | |
739 | 2.02k | if (http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty() && |
740 | 2.02k | !http_req->header(HTTP_PARTIAL_COLUMNS).empty()) { |
741 | | // only consider `partial_columns` parameter when `unique_key_update_mode` is not set |
742 | 338 | if (iequal(http_req->header(HTTP_PARTIAL_COLUMNS), "true")) { |
743 | 333 | request.__set_unique_key_update_mode(TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS); |
744 | | // for backward compatibility |
745 | 333 | request.__set_partial_update(true); |
746 | 333 | } |
747 | 338 | } |
748 | | |
749 | 2.02k | if (!http_req->header(HTTP_PARTIAL_UPDATE_NEW_ROW_POLICY).empty()) { |
750 | 18 | static const std::map<std::string, TPartialUpdateNewRowPolicy::type> policy_map { |
751 | 18 | {"APPEND", TPartialUpdateNewRowPolicy::APPEND}, |
752 | 18 | {"ERROR", TPartialUpdateNewRowPolicy::ERROR}}; |
753 | | |
754 | 18 | auto policy_name = http_req->header(HTTP_PARTIAL_UPDATE_NEW_ROW_POLICY); |
755 | 18 | std::transform(policy_name.begin(), policy_name.end(), policy_name.begin(), |
756 | 92 | [](unsigned char c) { return std::toupper(c); }); |
757 | 18 | auto it = policy_map.find(policy_name); |
758 | 18 | if (it == policy_map.end()) { |
759 | 0 | return Status::InvalidArgument( |
760 | 0 | "Invalid partial_update_new_key_behavior {}, must be one of {'APPEND', " |
761 | 0 | "'ERROR'}", |
762 | 0 | policy_name); |
763 | 0 | } |
764 | 18 | request.__set_partial_update_new_key_policy(it->second); |
765 | 18 | } |
766 | | |
767 | 2.02k | if (!http_req->header(HTTP_MEMTABLE_ON_SINKNODE).empty()) { |
768 | 52 | bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), "true"); |
769 | 52 | request.__set_memtable_on_sink_node(value); |
770 | 52 | } |
771 | 2.02k | if (!http_req->header(HTTP_LOAD_STREAM_PER_NODE).empty()) { |
772 | 0 | int stream_per_node = DORIS_TRY( |
773 | 0 | safe_stoi(http_req->header(HTTP_LOAD_STREAM_PER_NODE), HTTP_LOAD_STREAM_PER_NODE)); |
774 | 0 | request.__set_stream_per_node(stream_per_node); |
775 | 0 | } |
776 | 2.02k | if (ctx->group_commit) { |
777 | 24 | request.__set_group_commit_mode(ctx->group_commit_mode); |
778 | 24 | } |
779 | | |
780 | 2.02k | if (!http_req->header(HTTP_COMPUTE_GROUP).empty()) { |
781 | 0 | request.__set_cloud_cluster(http_req->header(HTTP_COMPUTE_GROUP)); |
782 | 2.02k | } else if (!http_req->header(HTTP_CLOUD_CLUSTER).empty()) { |
783 | 66 | request.__set_cloud_cluster(http_req->header(HTTP_CLOUD_CLUSTER)); |
784 | 66 | } |
785 | | |
786 | 2.02k | if (!http_req->header(HTTP_EMPTY_FIELD_AS_NULL).empty()) { |
787 | 1 | if (iequal(http_req->header(HTTP_EMPTY_FIELD_AS_NULL), "true")) { |
788 | 1 | request.__set_empty_field_as_null(true); |
789 | 1 | } |
790 | 1 | } |
791 | | |
792 | 2.02k | #ifndef BE_TEST |
793 | | // plan this load |
794 | 2.02k | TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; |
795 | 2.02k | int64_t stream_load_put_start_time = MonotonicNanos(); |
796 | 2.02k | RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( |
797 | 2.02k | master_addr.hostname, master_addr.port, |
798 | 2.02k | [&request, ctx](FrontendServiceConnection& client) { |
799 | 2.02k | client->streamLoadPut(ctx->put_result, request); |
800 | 2.02k | })); |
801 | 2.02k | ctx->stream_load_put_cost_nanos = MonotonicNanos() - stream_load_put_start_time; |
802 | | #else |
803 | | ctx->put_result = k_stream_load_put_result; |
804 | | #endif |
805 | 2.02k | Status plan_status(Status::create(ctx->put_result.status)); |
806 | 2.02k | if (!plan_status.ok()) { |
807 | 130 | LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << ctx->brief(); |
808 | 130 | return plan_status; |
809 | 130 | } |
810 | 2.02k | DCHECK(ctx->put_result.__isset.pipeline_params); |
811 | 1.89k | ctx->put_result.pipeline_params.query_options.__set_enable_strict_cast(false); |
812 | 1.89k | ctx->put_result.pipeline_params.query_options.__set_enable_insert_strict(strictMode); |
813 | 1.89k | if (config::is_cloud_mode() && ctx->two_phase_commit && ctx->is_mow_table()) { |
814 | 1 | return Status::NotSupported("stream load 2pc is unsupported for mow table"); |
815 | 1 | } |
816 | 1.89k | if (iequal(ctx->group_commit_mode, ASYNC_MODE)) { |
817 | | // FIXME find a way to avoid chunked stream load write large WALs |
818 | 20 | size_t content_length = 0; |
819 | 20 | if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) { |
820 | 18 | try { |
821 | 18 | content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); |
822 | 18 | } catch (const std::exception& e) { |
823 | 0 | return Status::InvalidArgument("invalid HTTP header CONTENT_LENGTH={}: {}", |
824 | 0 | http_req->header(HttpHeaders::CONTENT_LENGTH), |
825 | 0 | e.what()); |
826 | 0 | } |
827 | 18 | if (ctx->format == TFileFormatType::FORMAT_CSV_GZ || |
828 | 18 | ctx->format == TFileFormatType::FORMAT_CSV_LZO || |
829 | 18 | ctx->format == TFileFormatType::FORMAT_CSV_BZ2 || |
830 | 18 | ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME || |
831 | 18 | ctx->format == TFileFormatType::FORMAT_CSV_LZOP || |
832 | 18 | ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK || |
833 | 18 | ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) { |
834 | 4 | content_length *= 3; |
835 | 4 | } |
836 | 18 | } |
837 | 20 | ctx->put_result.pipeline_params.__set_content_length(content_length); |
838 | 20 | } |
839 | | |
840 | 1.89k | VLOG_NOTICE << "params is " |
841 | 0 | << apache::thrift::ThriftDebugString(ctx->put_result.pipeline_params); |
842 | | // if we not use streaming, we must download total content before we begin |
843 | | // to process this load |
844 | 1.89k | if (!ctx->use_streaming) { |
845 | 17 | return Status::OK(); |
846 | 17 | } |
847 | | |
848 | 1.88k | TPipelineFragmentParamsList mocked; |
849 | 1.88k | return _exec_env->stream_load_executor()->execute_plan_fragment( |
850 | 1.88k | ctx, mocked, [http_req, this](std::shared_ptr<StreamLoadContext> ctx) { |
851 | 1.88k | _on_finish(ctx, http_req); |
852 | 1.88k | }); |
853 | 1.89k | } |
854 | | |
855 | | Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* file_path, |
856 | 18 | int64_t file_bytes) { |
857 | 18 | std::string prefix; |
858 | 18 | RETURN_IF_ERROR(_exec_env->load_path_mgr()->allocate_dir(req->param(HTTP_DB_KEY), "", &prefix, |
859 | 18 | file_bytes)); |
860 | 18 | timeval tv; |
861 | 18 | gettimeofday(&tv, nullptr); |
862 | 18 | struct tm tm; |
863 | 18 | time_t cur_sec = tv.tv_sec; |
864 | 18 | localtime_r(&cur_sec, &tm); |
865 | 18 | char buf[64]; |
866 | 18 | strftime(buf, 64, "%Y%m%d%H%M%S", &tm); |
867 | 18 | std::stringstream ss; |
868 | 18 | ss << prefix << "/" << req->param(HTTP_TABLE_KEY) << "." << buf << "." << tv.tv_usec; |
869 | 18 | *file_path = ss.str(); |
870 | 18 | return Status::OK(); |
871 | 18 | } |
872 | | |
873 | | void StreamLoadAction::_save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx, |
874 | 2.02k | const std::string& str) { |
875 | 2.02k | std::shared_ptr<StreamLoadRecorder> stream_load_recorder = |
876 | 2.02k | ExecEnv::GetInstance()->storage_engine().get_stream_load_recorder(); |
877 | | |
878 | 2.02k | if (stream_load_recorder != nullptr) { |
879 | 2.02k | std::string key = |
880 | 2.02k | std::to_string(ctx->start_millis + ctx->load_cost_millis) + "_" + ctx->label; |
881 | 2.02k | auto st = stream_load_recorder->put(key, str); |
882 | 2.02k | if (st.ok()) { |
883 | 2.02k | LOG(INFO) << "put stream_load_record rocksdb successfully. label: " << ctx->label |
884 | 2.02k | << ", key: " << key; |
885 | 2.02k | } |
886 | 18.4E | } else { |
887 | 18.4E | LOG(WARNING) << "put stream_load_record rocksdb failed. stream_load_recorder is null."; |
888 | 18.4E | } |
889 | 2.02k | } |
890 | | |
891 | | Status StreamLoadAction::_check_wal_space(const std::string& group_commit_mode, |
892 | 1.50k | int64_t content_length) { |
893 | 1.50k | if (iequal(group_commit_mode, ASYNC_MODE) && |
894 | 1.50k | !load_size_smaller_than_wal_limit(content_length)) { |
895 | 0 | std::stringstream ss; |
896 | 0 | ss << "There is no space for group commit stream load async WAL. This stream load " |
897 | 0 | "size is " |
898 | 0 | << content_length |
899 | 0 | << ". WAL dir info: " << ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string(); |
900 | 0 | LOG(WARNING) << ss.str(); |
901 | 0 | return Status::Error<EXCEEDED_LIMIT>(ss.str()); |
902 | 0 | } |
903 | 1.50k | return Status::OK(); |
904 | 1.50k | } |
905 | | |
906 | | Status StreamLoadAction::_can_group_commit(HttpRequest* req, std::shared_ptr<StreamLoadContext> ctx, |
907 | | std::string& group_commit_header, |
908 | 2.07k | bool& can_group_commit) { |
909 | 2.07k | int64_t content_length = req->header(HttpHeaders::CONTENT_LENGTH).empty() |
910 | 2.07k | ? 0 |
911 | 2.07k | : std::stoll(req->header(HttpHeaders::CONTENT_LENGTH)); |
912 | 2.07k | if (content_length < 0) { |
913 | 0 | std::stringstream ss; |
914 | 0 | ss << "This stream load content length <0 (" << content_length |
915 | 0 | << "), please check your content length."; |
916 | 0 | LOG(WARNING) << ss.str(); |
917 | 0 | return Status::InvalidArgument(ss.str()); |
918 | 0 | } |
919 | 2.07k | auto is_chunk = !req->header(HttpHeaders::TRANSFER_ENCODING).empty() && |
920 | 2.07k | req->header(HttpHeaders::TRANSFER_ENCODING).find(CHUNK) != std::string::npos; |
921 | 2.07k | if (content_length == 0 && !is_chunk) { |
922 | | // off_mode and empty |
923 | 7 | can_group_commit = false; |
924 | 7 | return Status::OK(); |
925 | 7 | } |
926 | 2.06k | if (is_chunk) { |
927 | 314 | ctx->label = ""; |
928 | 314 | } |
929 | | |
930 | 2.06k | auto partial_columns = !req->header(HTTP_PARTIAL_COLUMNS).empty() && |
931 | 2.06k | iequal(req->header(HTTP_PARTIAL_COLUMNS), "true"); |
932 | 2.06k | auto temp_partitions = !req->header(HTTP_TEMP_PARTITIONS).empty(); |
933 | 2.06k | auto partitions = !req->header(HTTP_PARTITIONS).empty(); |
934 | 2.06k | auto update_mode = |
935 | 2.06k | !req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty() && |
936 | 2.06k | (iequal(req->header(HTTP_UNIQUE_KEY_UPDATE_MODE), "UPDATE_FIXED_COLUMNS") || |
937 | 170 | iequal(req->header(HTTP_UNIQUE_KEY_UPDATE_MODE), "UPDATE_FLEXIBLE_COLUMNS")); |
938 | 2.06k | if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit && |
939 | 2.06k | !update_mode) { |
940 | 1.49k | if (!config::wait_internal_group_commit_finish && !group_commit_header.empty() && |
941 | 1.49k | !ctx->label.empty()) { |
942 | 1 | return Status::InvalidArgument("label and group_commit can't be set at the same time"); |
943 | 1 | } |
944 | 1.49k | RETURN_IF_ERROR(_check_wal_space(group_commit_header, content_length)); |
945 | 1.49k | can_group_commit = true; |
946 | 1.49k | } |
947 | 2.06k | return Status::OK(); |
948 | 2.06k | } |
949 | | |
950 | | Status StreamLoadAction::_handle_group_commit(HttpRequest* req, |
951 | 2.07k | std::shared_ptr<StreamLoadContext> ctx) { |
952 | 2.07k | std::string group_commit_header = req->header(HTTP_GROUP_COMMIT); |
953 | 2.07k | if (!group_commit_header.empty() && !iequal(group_commit_header, SYNC_MODE) && |
954 | 2.07k | !iequal(group_commit_header, ASYNC_MODE) && !iequal(group_commit_header, OFF_MODE)) { |
955 | 0 | return Status::InvalidArgument( |
956 | 0 | "group_commit can only be [async_mode, sync_mode, off_mode]"); |
957 | 0 | } |
958 | 2.07k | if (config::wait_internal_group_commit_finish) { |
959 | 0 | group_commit_header = SYNC_MODE; |
960 | 0 | } |
961 | | |
962 | | // if group_commit_header is off_mode, we will not use group commit |
963 | 2.07k | if (iequal(group_commit_header, OFF_MODE)) { |
964 | 1 | ctx->group_commit_mode = OFF_MODE; |
965 | 1 | ctx->group_commit = false; |
966 | 1 | return Status::OK(); |
967 | 1 | } |
968 | 2.07k | bool can_group_commit = false; |
969 | 2.07k | RETURN_IF_ERROR(_can_group_commit(req, ctx, group_commit_header, can_group_commit)); |
970 | 2.07k | if (!can_group_commit) { |
971 | 577 | ctx->group_commit_mode = OFF_MODE; |
972 | 577 | ctx->group_commit = false; |
973 | 1.49k | } else { |
974 | 1.49k | if (!group_commit_header.empty()) { |
975 | 22 | ctx->group_commit_mode = group_commit_header; |
976 | 22 | ctx->group_commit = true; |
977 | 1.47k | } else { |
978 | | // use table property to decide group commit or not |
979 | 1.47k | ctx->group_commit_mode = ""; |
980 | 1.47k | ctx->group_commit = false; |
981 | 1.47k | } |
982 | 1.49k | } |
983 | 2.07k | return Status::OK(); |
984 | 2.07k | } |
985 | | |
986 | | } // namespace doris |