be/src/service/http/action/http_stream.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "service/http/action/http_stream.h" |
19 | | |
20 | | #include <cstddef> |
21 | | #include <future> |
22 | | #include <sstream> |
23 | | |
24 | | // use string iequal |
25 | | #include <event2/buffer.h> |
26 | | #include <event2/bufferevent.h> |
27 | | #include <event2/http.h> |
28 | | #include <gen_cpp/FrontendService.h> |
29 | | #include <gen_cpp/FrontendService_types.h> |
30 | | #include <gen_cpp/HeartbeatService_types.h> |
31 | | #include <rapidjson/prettywriter.h> |
32 | | #include <thrift/protocol/TDebugProtocol.h> |
33 | | |
34 | | #include "cloud/config.h" |
35 | | #include "common/config.h" |
36 | | #include "common/logging.h" |
37 | | #include "common/metrics/doris_metrics.h" |
38 | | #include "common/metrics/metrics.h" |
39 | | #include "common/status.h" |
40 | | #include "common/utils.h" |
41 | | #include "io/fs/stream_load_pipe.h" |
42 | | #include "load/group_commit/group_commit_mgr.h" |
43 | | #include "load/load_path_mgr.h" |
44 | | #include "load/stream_load/new_load_stream_mgr.h" |
45 | | #include "load/stream_load/stream_load_context.h" |
46 | | #include "load/stream_load/stream_load_executor.h" |
47 | | #include "load/stream_load/stream_load_recorder.h" |
48 | | #include "runtime/exec_env.h" |
49 | | #include "runtime/fragment_mgr.h" |
50 | | #include "service/http/http_channel.h" |
51 | | #include "service/http/http_common.h" |
52 | | #include "service/http/http_headers.h" |
53 | | #include "service/http/http_request.h" |
54 | | #include "service/http/utils.h" |
55 | | #include "storage/storage_engine.h" |
56 | | #include "util/byte_buffer.h" |
57 | | #include "util/client_cache.h" |
58 | | #include "util/string_util.h" |
59 | | #include "util/thrift_rpc_helper.h" |
60 | | #include "util/time.h" |
61 | | #include "util/uid_util.h" |
62 | | |
63 | | namespace doris { |
64 | | using namespace ErrorCode; |
65 | | |
66 | | DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(http_stream_requests_total, MetricUnit::REQUESTS); |
67 | | DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(http_stream_duration_ms, MetricUnit::MILLISECONDS); |
68 | | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(http_stream_current_processing, MetricUnit::REQUESTS); |
69 | | |
70 | 8 | HttpStreamAction::HttpStreamAction(ExecEnv* exec_env) : _exec_env(exec_env) { |
71 | 8 | _http_stream_entity = |
72 | 8 | DorisMetrics::instance()->metric_registry()->register_entity("http_stream"); |
73 | 8 | INT_COUNTER_METRIC_REGISTER(_http_stream_entity, http_stream_requests_total); |
74 | 8 | INT_COUNTER_METRIC_REGISTER(_http_stream_entity, http_stream_duration_ms); |
75 | 8 | INT_GAUGE_METRIC_REGISTER(_http_stream_entity, http_stream_current_processing); |
76 | 8 | } |
77 | | |
78 | 4 | HttpStreamAction::~HttpStreamAction() { |
79 | 4 | DorisMetrics::instance()->metric_registry()->deregister_entity(_http_stream_entity); |
80 | 4 | } |
81 | | |
82 | 134 | void HttpStreamAction::handle(HttpRequest* req) { |
83 | 134 | std::shared_ptr<StreamLoadContext> ctx = |
84 | 134 | std::static_pointer_cast<StreamLoadContext>(req->handler_ctx()); |
85 | 134 | if (ctx == nullptr) { |
86 | 0 | return; |
87 | 0 | } |
88 | | |
89 | | // status already set to fail |
90 | 134 | if (ctx->status.ok()) { |
91 | 113 | ctx->status = _handle(req, ctx); |
92 | 113 | if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) { |
93 | 3 | LOG(WARNING) << "handle streaming load failed, id=" << ctx->id |
94 | 3 | << ", errmsg=" << ctx->status; |
95 | 3 | } |
96 | 113 | } |
97 | 134 | ctx->load_cost_millis = UnixMillis() - ctx->start_millis; |
98 | | |
99 | 134 | if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) { |
100 | 24 | if (ctx->body_sink != nullptr) { |
101 | 24 | ctx->body_sink->cancel(ctx->status.to_string()); |
102 | 24 | } |
103 | 24 | } |
104 | | |
105 | 134 | if (!ctx->status.ok()) { |
106 | 24 | auto str = std::string(ctx->to_json()); |
107 | | // add new line at end |
108 | 24 | str = str + '\n'; |
109 | 24 | HttpChannel::send_reply(req, str); |
110 | 24 | return; |
111 | 24 | } |
112 | 110 | auto str = std::string(ctx->to_json()); |
113 | | // add new line at end |
114 | 110 | str = str + '\n'; |
115 | 110 | HttpChannel::send_reply(req, str); |
116 | 110 | if (config::enable_stream_load_record || config::enable_stream_load_record_to_audit_log_table) { |
117 | 110 | if (req->header(HTTP_SKIP_RECORD_TO_AUDIT_LOG_TABLE).empty()) { |
118 | 110 | str = ctx->prepare_stream_load_record(str); |
119 | 110 | _save_stream_load_record(ctx, str); |
120 | 110 | } |
121 | 110 | } |
122 | | // update statistics |
123 | 110 | http_stream_requests_total->increment(1); |
124 | 110 | http_stream_duration_ms->increment(ctx->load_cost_millis); |
125 | 110 | } |
126 | | |
127 | 113 | Status HttpStreamAction::_handle(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx) { |
128 | 113 | if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) { |
129 | 0 | LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes |
130 | 0 | << ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id; |
131 | 0 | return Status::Error<ErrorCode::NETWORK_ERROR>("receive body don't equal with body bytes"); |
132 | 0 | } |
133 | 113 | RETURN_IF_ERROR(ctx->body_sink->finish()); |
134 | | |
135 | | // wait stream load finish |
136 | 113 | RETURN_IF_ERROR(ctx->load_status_future.get()); |
137 | | |
138 | 110 | if (ctx->group_commit) { |
139 | 12 | LOG(INFO) << "skip commit because this is group commit, pipe_id=" << ctx->id.to_string(); |
140 | 12 | return Status::OK(); |
141 | 12 | } |
142 | | |
143 | 98 | if (ctx->two_phase_commit) { |
144 | 1 | int64_t pre_commit_start_time = MonotonicNanos(); |
145 | 1 | RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get())); |
146 | 1 | ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time; |
147 | 97 | } else { |
148 | | // If put file success we need commit this load |
149 | 97 | int64_t commit_and_publish_start_time = MonotonicNanos(); |
150 | 97 | RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get())); |
151 | 97 | ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - commit_and_publish_start_time; |
152 | 97 | } |
153 | 98 | return Status::OK(); |
154 | 98 | } |
155 | | |
156 | 134 | int HttpStreamAction::on_header(HttpRequest* req) { |
157 | 134 | http_stream_current_processing->increment(1); |
158 | | |
159 | 134 | std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env); |
160 | 134 | req->set_handler_ctx(ctx); |
161 | | |
162 | 134 | ctx->load_type = TLoadType::MANUL_LOAD; |
163 | 134 | ctx->load_src_type = TLoadSourceType::RAW; |
164 | 134 | ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true"; |
165 | 134 | Status st = _handle_group_commit(req, ctx); |
166 | | |
167 | 134 | LOG(INFO) << "new income streaming load request." << ctx->brief() |
168 | 134 | << " sql : " << req->header(HTTP_SQL) << ", group_commit=" << ctx->group_commit; |
169 | 134 | if (st.ok()) { |
170 | 134 | st = _on_header(req, ctx); |
171 | 134 | } |
172 | 134 | if (!st.ok()) { |
173 | 0 | ctx->status = std::move(st); |
174 | 0 | if (ctx->body_sink != nullptr) { |
175 | 0 | ctx->body_sink->cancel(ctx->status.to_string()); |
176 | 0 | } |
177 | 0 | auto str = ctx->to_json(); |
178 | | // add new line at end |
179 | 0 | str = str + '\n'; |
180 | 0 | HttpChannel::send_reply(req, str); |
181 | 0 | if (config::enable_stream_load_record || |
182 | 0 | config::enable_stream_load_record_to_audit_log_table) { |
183 | 0 | if (req->header(HTTP_SKIP_RECORD_TO_AUDIT_LOG_TABLE).empty()) { |
184 | 0 | str = ctx->prepare_stream_load_record(str); |
185 | 0 | _save_stream_load_record(ctx, str); |
186 | 0 | } |
187 | 0 | } |
188 | 0 | return -1; |
189 | 0 | } |
190 | 134 | return 0; |
191 | 134 | } |
192 | | |
193 | 134 | Status HttpStreamAction::_on_header(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx) { |
194 | | // auth information |
195 | 134 | if (!parse_basic_auth(*http_req, &ctx->auth)) { |
196 | 0 | LOG(WARNING) << "parse basic authorization failed." << ctx->brief(); |
197 | 0 | return Status::NotAuthorized("no valid Basic authorization"); |
198 | 0 | } |
199 | | |
200 | | // TODO(zs) : need Need to request an FE to obtain information such as format |
201 | | // check content length |
202 | 134 | ctx->body_bytes = 0; |
203 | 134 | size_t csv_max_body_bytes = config::streaming_load_max_mb * 1024 * 1024; |
204 | 134 | if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) { |
205 | 131 | try { |
206 | 131 | ctx->body_bytes = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); |
207 | 131 | } catch (const std::exception& e) { |
208 | 0 | return Status::InvalidArgument("invalid HTTP header CONTENT_LENGTH={}: {}", |
209 | 0 | http_req->header(HttpHeaders::CONTENT_LENGTH), e.what()); |
210 | 0 | } |
211 | | // csv max body size |
212 | 131 | if (ctx->body_bytes > csv_max_body_bytes) { |
213 | 0 | LOG(WARNING) << "body exceed max size." << ctx->brief(); |
214 | 0 | return Status::Error<ErrorCode::EXCEEDED_LIMIT>( |
215 | 0 | "body size {} exceed BE's conf `streaming_load_max_mb` {}. increase it if you " |
216 | 0 | "are sure this load is reasonable", |
217 | 0 | ctx->body_bytes, csv_max_body_bytes); |
218 | 0 | } |
219 | 131 | } |
220 | | |
221 | 134 | auto pipe = std::make_shared<io::StreamLoadPipe>( |
222 | 134 | io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, |
223 | 134 | ctx->body_bytes /* total_length */); |
224 | 134 | ctx->body_sink = pipe; |
225 | 134 | ctx->pipe = pipe; |
226 | | |
227 | 134 | RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, ctx)); |
228 | | |
229 | | // Here, transactions are set from fe's NativeInsertStmt. |
230 | | // TODO(zs) : How to support two_phase_commit |
231 | | |
232 | 134 | return Status::OK(); |
233 | 134 | } |
234 | | |
235 | 9.68k | void HttpStreamAction::on_chunk_data(HttpRequest* req) { |
236 | 9.68k | std::shared_ptr<StreamLoadContext> ctx = |
237 | 9.68k | std::static_pointer_cast<StreamLoadContext>(req->handler_ctx()); |
238 | 9.68k | if (ctx == nullptr || !ctx->status.ok()) { |
239 | 8 | return; |
240 | 8 | } |
241 | 9.68k | if (!req->header(HTTP_WAL_ID_KY).empty()) { |
242 | 0 | ctx->wal_id = std::stoll(req->header(HTTP_WAL_ID_KY)); |
243 | 0 | } |
244 | 9.68k | struct evhttp_request* ev_req = req->get_evhttp_request(); |
245 | 9.68k | auto evbuf = evhttp_request_get_input_buffer(ev_req); |
246 | | |
247 | | // In HttpStreamAction::on_chunk_data |
248 | | // -> process_put |
249 | | // -> StreamLoadExecutor::execute_plan_fragment |
250 | | // -> exec_plan_fragment |
251 | | // , SCOPED_ATTACH_TASK will be called. |
252 | 9.68k | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->stream_load_pipe_tracker()); |
253 | | |
254 | 9.68k | int64_t start_read_data_time = MonotonicNanos(); |
255 | 9.68k | Status st = ctx->allocate_schema_buffer(); |
256 | 9.68k | if (!st.ok()) { |
257 | 0 | ctx->status = st; |
258 | 0 | return; |
259 | 0 | } |
260 | 19.3k | while (evbuffer_get_length(evbuf) > 0) { |
261 | 9.68k | ByteBufferPtr bb; |
262 | 9.68k | st = ByteBuffer::allocate(128 * 1024, &bb); |
263 | 9.68k | if (!st.ok()) { |
264 | 0 | ctx->status = st; |
265 | 0 | return; |
266 | 0 | } |
267 | 9.68k | auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity); |
268 | 9.68k | bb->pos = remove_bytes; |
269 | 9.68k | bb->flip(); |
270 | 9.68k | st = ctx->body_sink->append(bb); |
271 | | // schema_buffer stores 1M of data for parsing column information |
272 | | // need to determine whether to cache for the first time |
273 | 9.68k | if (ctx->is_read_schema) { |
274 | 134 | if (ctx->schema_buffer()->pos + remove_bytes < config::stream_tvf_buffer_size) { |
275 | 134 | ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes); |
276 | 134 | } else { |
277 | 0 | LOG(INFO) << "use a portion of data to request fe to obtain column information"; |
278 | 0 | ctx->is_read_schema = false; |
279 | 0 | ctx->status = process_put(req, ctx); |
280 | 0 | } |
281 | 134 | } |
282 | 9.68k | if (!st.ok()) { |
283 | 0 | LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief(); |
284 | 0 | ctx->status = st; |
285 | 0 | return; |
286 | 0 | } |
287 | 9.68k | ctx->receive_bytes += remove_bytes; |
288 | 9.68k | } |
289 | | // after all the data has been read and it has not reached 1M, it will execute here |
290 | 9.68k | if (ctx->is_read_schema) { |
291 | 134 | LOG(INFO) << "after all the data has been read and it has not reached 1M, it will execute " |
292 | 134 | << "here"; |
293 | 134 | ctx->is_read_schema = false; |
294 | 134 | ctx->status = process_put(req, ctx); |
295 | 134 | } |
296 | 9.68k | ctx->read_data_cost_nanos += (MonotonicNanos() - start_read_data_time); |
297 | 9.68k | } |
298 | | |
299 | 134 | void HttpStreamAction::free_handler_ctx(std::shared_ptr<void> param) { |
300 | 134 | std::shared_ptr<StreamLoadContext> ctx = std::static_pointer_cast<StreamLoadContext>(param); |
301 | 134 | if (ctx == nullptr) { |
302 | 0 | return; |
303 | 0 | } |
304 | | // sender is gone, make receiver know it |
305 | 134 | if (ctx->body_sink != nullptr) { |
306 | 134 | ctx->body_sink->cancel("sender is gone"); |
307 | 134 | } |
308 | | // remove stream load context from stream load manager and the resource will be released |
309 | 134 | ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id); |
310 | 134 | http_stream_current_processing->increment(-1); |
311 | 134 | } |
312 | | |
313 | | Status HttpStreamAction::process_put(HttpRequest* http_req, |
314 | 134 | std::shared_ptr<StreamLoadContext> ctx) { |
315 | 134 | TStreamLoadPutRequest request; |
316 | 134 | if (http_req != nullptr) { |
317 | 134 | request.__set_load_sql(http_req->header(HTTP_SQL)); |
318 | 134 | if (!http_req->header(HTTP_MEMTABLE_ON_SINKNODE).empty()) { |
319 | 0 | bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), "true"); |
320 | 0 | request.__set_memtable_on_sink_node(value); |
321 | 0 | } |
322 | 134 | } else { |
323 | 0 | request.__set_token(ctx->auth.token); |
324 | 0 | request.__set_load_sql(ctx->sql_str); |
325 | 0 | ctx->auth.token = ""; |
326 | 0 | } |
327 | 134 | set_request_auth(&request, ctx->auth); |
328 | 134 | request.__set_loadId(ctx->id.to_thrift()); |
329 | 134 | request.__set_label(ctx->label); |
330 | 134 | if (ctx->group_commit) { |
331 | 13 | if (!http_req->header(HTTP_GROUP_COMMIT).empty()) { |
332 | 13 | request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT)); |
333 | 13 | } else { |
334 | | // used for wait_internal_group_commit_finish |
335 | 0 | request.__set_group_commit_mode("sync_mode"); |
336 | 0 | } |
337 | 13 | } |
338 | 134 | if (_exec_env->cluster_info()->backend_id != 0) { |
339 | 134 | request.__set_backend_id(_exec_env->cluster_info()->backend_id); |
340 | 134 | } else { |
341 | 0 | LOG(WARNING) << "_exec_env->cluster_info not set backend_id"; |
342 | 0 | } |
343 | 134 | if (ctx->wal_id > 0) { |
344 | 0 | request.__set_partial_update(false); |
345 | 0 | } |
346 | | |
347 | | // plan this load |
348 | 134 | TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; |
349 | 134 | int64_t stream_load_put_start_time = MonotonicNanos(); |
350 | 134 | RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( |
351 | 134 | master_addr.hostname, master_addr.port, |
352 | 134 | [&request, ctx](FrontendServiceConnection& client) { |
353 | 134 | client->streamLoadPut(ctx->put_result, request); |
354 | 134 | })); |
355 | 134 | ctx->put_result.pipeline_params.query_options.__set_enable_strict_cast(false); |
356 | 134 | ctx->stream_load_put_cost_nanos = MonotonicNanos() - stream_load_put_start_time; |
357 | 134 | Status plan_status(Status::create(ctx->put_result.status)); |
358 | 134 | if (!plan_status.ok()) { |
359 | 20 | LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << ctx->brief(); |
360 | 20 | return plan_status; |
361 | 20 | } |
362 | 114 | if (config::is_cloud_mode() && ctx->two_phase_commit && ctx->is_mow_table()) { |
363 | 1 | return Status::NotSupported("http stream 2pc is unsupported for mow table"); |
364 | 1 | } |
365 | 113 | ctx->db = ctx->put_result.pipeline_params.db_name; |
366 | 113 | ctx->table = ctx->put_result.pipeline_params.table_name; |
367 | 113 | ctx->txn_id = ctx->put_result.pipeline_params.txn_conf.txn_id; |
368 | 113 | ctx->label = ctx->put_result.pipeline_params.import_label; |
369 | 113 | ctx->put_result.pipeline_params.__set_wal_id(ctx->wal_id); |
370 | 113 | if (http_req != nullptr && http_req->header(HTTP_GROUP_COMMIT) == "async_mode") { |
371 | | // FIXME find a way to avoid chunked stream load write large WALs |
372 | 11 | size_t content_length = 0; |
373 | 11 | if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) { |
374 | 9 | try { |
375 | 9 | content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); |
376 | 9 | } catch (const std::exception& e) { |
377 | 0 | return Status::InvalidArgument("invalid HTTP header CONTENT_LENGTH={}: {}", |
378 | 0 | http_req->header(HttpHeaders::CONTENT_LENGTH), |
379 | 0 | e.what()); |
380 | 0 | } |
381 | 9 | if (ctx->format == TFileFormatType::FORMAT_CSV_GZ || |
382 | 9 | ctx->format == TFileFormatType::FORMAT_CSV_LZO || |
383 | 9 | ctx->format == TFileFormatType::FORMAT_CSV_BZ2 || |
384 | 9 | ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME || |
385 | 9 | ctx->format == TFileFormatType::FORMAT_CSV_LZOP || |
386 | 9 | ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK || |
387 | 9 | ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) { |
388 | 0 | content_length *= 3; |
389 | 0 | } |
390 | 9 | } |
391 | 11 | ctx->put_result.pipeline_params.__set_content_length(content_length); |
392 | 11 | } |
393 | 113 | TPipelineFragmentParamsList mocked; |
394 | 113 | return _exec_env->stream_load_executor()->execute_plan_fragment(ctx, mocked); |
395 | 113 | } |
396 | | |
397 | | void HttpStreamAction::_save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx, |
398 | 110 | const std::string& str) { |
399 | 110 | std::shared_ptr<StreamLoadRecorder> stream_load_recorder = |
400 | 110 | ExecEnv::GetInstance()->storage_engine().get_stream_load_recorder(); |
401 | | |
402 | 110 | if (stream_load_recorder != nullptr) { |
403 | 110 | std::string key = |
404 | 110 | std::to_string(ctx->start_millis + ctx->load_cost_millis) + "_" + ctx->label; |
405 | 110 | auto st = stream_load_recorder->put(key, str); |
406 | 110 | if (st.ok()) { |
407 | 110 | LOG(INFO) << "put stream_load_record rocksdb successfully. label: " << ctx->label |
408 | 110 | << ", key: " << key; |
409 | 110 | } |
410 | 110 | } else { |
411 | 0 | LOG(WARNING) << "put stream_load_record rocksdb failed. stream_load_recorder is null."; |
412 | 0 | } |
413 | 110 | } |
414 | | |
415 | | Status HttpStreamAction::_handle_group_commit(HttpRequest* req, |
416 | 134 | std::shared_ptr<StreamLoadContext> ctx) { |
417 | 134 | std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT); |
418 | 134 | if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") && |
419 | 134 | !iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) { |
420 | 0 | return Status::InvalidArgument( |
421 | 0 | "group_commit can only be [async_mode, sync_mode, off_mode]"); |
422 | 0 | } |
423 | 134 | if (config::wait_internal_group_commit_finish) { |
424 | 0 | group_commit_mode = "sync_mode"; |
425 | 0 | } |
426 | 134 | int64_t content_length = req->header(HttpHeaders::CONTENT_LENGTH).empty() |
427 | 134 | ? 0 |
428 | 134 | : std::stoll(req->header(HttpHeaders::CONTENT_LENGTH)); |
429 | 134 | if (content_length < 0) { |
430 | 0 | std::stringstream ss; |
431 | 0 | ss << "This http load content length <0 (" << content_length |
432 | 0 | << "), please check your content length."; |
433 | 0 | LOG(WARNING) << ss.str(); |
434 | 0 | return Status::InvalidArgument(ss.str()); |
435 | 0 | } |
436 | | // allow chunked stream load in flink |
437 | 134 | auto is_chunk = |
438 | 134 | !req->header(HttpHeaders::TRANSFER_ENCODING).empty() && |
439 | 134 | req->header(HttpHeaders::TRANSFER_ENCODING).find("chunked") != std::string::npos; |
440 | 134 | if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode") || |
441 | 134 | (content_length == 0 && !is_chunk)) { |
442 | | // off_mode and empty |
443 | 121 | ctx->group_commit = false; |
444 | 121 | return Status::OK(); |
445 | 121 | } |
446 | 13 | if (is_chunk) { |
447 | 2 | ctx->label = ""; |
448 | 2 | } |
449 | | |
450 | 13 | auto partial_columns = !req->header(HTTP_PARTIAL_COLUMNS).empty() && |
451 | 13 | iequal(req->header(HTTP_PARTIAL_COLUMNS), "true"); |
452 | 13 | auto temp_partitions = !req->header(HTTP_TEMP_PARTITIONS).empty(); |
453 | 13 | auto partitions = !req->header(HTTP_PARTITIONS).empty(); |
454 | 13 | if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit) { |
455 | 13 | if (!config::wait_internal_group_commit_finish && !ctx->label.empty()) { |
456 | 0 | return Status::InvalidArgument("label and group_commit can't be set at the same time"); |
457 | 0 | } |
458 | 13 | ctx->group_commit = true; |
459 | 13 | if (iequal(group_commit_mode, "async_mode")) { |
460 | 12 | if (!load_size_smaller_than_wal_limit(content_length)) { |
461 | 0 | std::stringstream ss; |
462 | 0 | ss << "There is no space for group commit http load async WAL. This http load " |
463 | 0 | "size is " |
464 | 0 | << content_length << ". WAL dir info: " |
465 | 0 | << ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string(); |
466 | 0 | LOG(WARNING) << ss.str(); |
467 | 0 | return Status::Error<EXCEEDED_LIMIT>(ss.str()); |
468 | 0 | } |
469 | 12 | } |
470 | 13 | } |
471 | 13 | return Status::OK(); |
472 | 13 | } |
473 | | |
474 | | } // namespace doris |