be/src/service/http/action/http_stream.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "service/http/action/http_stream.h" |
19 | | |
20 | | #include <algorithm> |
21 | | #include <cstddef> |
22 | | #include <future> |
23 | | #include <sstream> |
24 | | |
25 | | // use string iequal |
26 | | #include <event2/buffer.h> |
27 | | #include <event2/bufferevent.h> |
28 | | #include <event2/http.h> |
29 | | #include <gen_cpp/FrontendService.h> |
30 | | #include <gen_cpp/FrontendService_types.h> |
31 | | #include <gen_cpp/HeartbeatService_types.h> |
32 | | #include <rapidjson/prettywriter.h> |
33 | | #include <thrift/protocol/TDebugProtocol.h> |
34 | | |
35 | | #include "cloud/config.h" |
36 | | #include "common/config.h" |
37 | | #include "common/logging.h" |
38 | | #include "common/metrics/doris_metrics.h" |
39 | | #include "common/metrics/metrics.h" |
40 | | #include "common/status.h" |
41 | | #include "common/utils.h" |
42 | | #include "io/fs/stream_load_pipe.h" |
43 | | #include "load/group_commit/group_commit_mgr.h" |
44 | | #include "load/load_path_mgr.h" |
45 | | #include "load/stream_load/new_load_stream_mgr.h" |
46 | | #include "load/stream_load/stream_load_context.h" |
47 | | #include "load/stream_load/stream_load_executor.h" |
48 | | #include "load/stream_load/stream_load_recorder.h" |
49 | | #include "runtime/exec_env.h" |
50 | | #include "runtime/fragment_mgr.h" |
51 | | #include "service/http/http_channel.h" |
52 | | #include "service/http/http_common.h" |
53 | | #include "service/http/http_headers.h" |
54 | | #include "service/http/http_request.h" |
55 | | #include "service/http/utils.h" |
56 | | #include "storage/storage_engine.h" |
57 | | #include "util/byte_buffer.h" |
58 | | #include "util/client_cache.h" |
59 | | #include "util/load_util.h" |
60 | | #include "util/string_util.h" |
61 | | #include "util/thrift_rpc_helper.h" |
62 | | #include "util/time.h" |
63 | | #include "util/uid_util.h" |
64 | | |
65 | | namespace doris { |
66 | | using namespace ErrorCode; |
67 | | |
68 | | namespace { |
69 | | |
70 | 10 | bool is_compressed_file_scan(const TPipelineFragmentParams& params) { |
71 | 10 | if (!params.__isset.file_scan_params) { |
72 | 0 | return false; |
73 | 0 | } |
74 | 10 | return std::ranges::any_of(params.file_scan_params, [](const auto& file_scan_param) { |
75 | 10 | const auto& file_scan_params = file_scan_param.second; |
76 | 10 | TFileCompressType::type compress_type = file_scan_params.__isset.compress_type |
77 | 10 | ? file_scan_params.compress_type |
78 | 10 | : TFileCompressType::UNKNOWN; |
79 | 10 | TFileFormatType::type format_type = file_scan_params.__isset.format_type |
80 | 10 | ? file_scan_params.format_type |
81 | 10 | : TFileFormatType::FORMAT_UNKNOWN; |
82 | 10 | return LoadUtil::is_compressed_load(compress_type, format_type); |
83 | 10 | }); |
84 | 10 | } |
85 | | |
86 | | } // namespace |
87 | | |
88 | | DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(http_stream_requests_total, MetricUnit::REQUESTS); |
89 | | DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(http_stream_duration_ms, MetricUnit::MILLISECONDS); |
90 | | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(http_stream_current_processing, MetricUnit::REQUESTS); |
91 | | |
92 | | HttpStreamAction::HttpStreamAction(ExecEnv* exec_env) |
93 | 8 | : HttpHandlerWithAuth(exec_env, TPrivilegeHier::GLOBAL, TPrivilegeType::LOAD) { |
94 | | // Use LOAD privilege type: requires LOAD permission |
95 | | // Note: _exec_env is set by parent class HttpHandlerWithAuth |
96 | 8 | _http_stream_entity = |
97 | 8 | DorisMetrics::instance()->metric_registry()->register_entity("http_stream"); |
98 | 8 | INT_COUNTER_METRIC_REGISTER(_http_stream_entity, http_stream_requests_total); |
99 | 8 | INT_COUNTER_METRIC_REGISTER(_http_stream_entity, http_stream_duration_ms); |
100 | 8 | INT_GAUGE_METRIC_REGISTER(_http_stream_entity, http_stream_current_processing); |
101 | 8 | } |
102 | | |
103 | 4 | HttpStreamAction::~HttpStreamAction() { |
104 | 4 | DorisMetrics::instance()->metric_registry()->deregister_entity(_http_stream_entity); |
105 | 4 | } |
106 | | |
107 | 133 | void HttpStreamAction::handle(HttpRequest* req) { |
108 | 133 | std::shared_ptr<StreamLoadContext> ctx = |
109 | 133 | std::static_pointer_cast<StreamLoadContext>(req->handler_ctx()); |
110 | 133 | if (ctx == nullptr) { |
111 | 0 | return; |
112 | 0 | } |
113 | | |
114 | | // status already set to fail |
115 | 133 | if (ctx->status.ok()) { |
116 | 112 | ctx->status = _handle(req, ctx); |
117 | 112 | if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) { |
118 | 3 | LOG(WARNING) << "handle streaming load failed, id=" << ctx->id |
119 | 3 | << ", errmsg=" << ctx->status; |
120 | 3 | } |
121 | 112 | } |
122 | 133 | ctx->load_cost_millis = UnixMillis() - ctx->start_millis; |
123 | | |
124 | 133 | if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) { |
125 | 24 | if (ctx->body_sink != nullptr) { |
126 | 24 | ctx->body_sink->cancel(ctx->status.to_string()); |
127 | 24 | } |
128 | 24 | } |
129 | | |
130 | 133 | if (!ctx->status.ok()) { |
131 | 24 | auto str = std::string(ctx->to_json()); |
132 | | // add new line at end |
133 | 24 | str = str + '\n'; |
134 | 24 | HttpChannel::send_reply(req, str); |
135 | 24 | return; |
136 | 24 | } |
137 | 109 | auto str = std::string(ctx->to_json()); |
138 | | // add new line at end |
139 | 109 | str = str + '\n'; |
140 | 109 | HttpChannel::send_reply(req, str); |
141 | 109 | if (config::enable_stream_load_record || config::enable_stream_load_record_to_audit_log_table) { |
142 | 109 | if (req->header(HTTP_SKIP_RECORD_TO_AUDIT_LOG_TABLE).empty()) { |
143 | 109 | str = ctx->prepare_stream_load_record(str); |
144 | 109 | _save_stream_load_record(ctx, str); |
145 | 109 | } |
146 | 109 | } |
147 | | // update statistics |
148 | 109 | http_stream_requests_total->increment(1); |
149 | 109 | http_stream_duration_ms->increment(ctx->load_cost_millis); |
150 | 109 | } |
151 | | |
152 | 112 | Status HttpStreamAction::_handle(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx) { |
153 | 112 | if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) { |
154 | 0 | LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes |
155 | 0 | << ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id; |
156 | 0 | return Status::Error<ErrorCode::NETWORK_ERROR>("receive body don't equal with body bytes"); |
157 | 0 | } |
158 | 112 | RETURN_IF_ERROR(ctx->body_sink->finish()); |
159 | | |
160 | | // wait stream load finish |
161 | 112 | RETURN_IF_ERROR(ctx->load_status_future.get()); |
162 | | |
163 | 109 | if (ctx->group_commit) { |
164 | 12 | LOG(INFO) << "skip commit because this is group commit, pipe_id=" << ctx->id.to_string(); |
165 | 12 | return Status::OK(); |
166 | 12 | } |
167 | | |
168 | 97 | if (ctx->two_phase_commit) { |
169 | 1 | int64_t pre_commit_start_time = MonotonicNanos(); |
170 | 1 | RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get())); |
171 | 1 | ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time; |
172 | 96 | } else { |
173 | | // If put file success we need commit this load |
174 | 96 | int64_t commit_and_publish_start_time = MonotonicNanos(); |
175 | 96 | RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get())); |
176 | 96 | ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - commit_and_publish_start_time; |
177 | 96 | } |
178 | 97 | return Status::OK(); |
179 | 97 | } |
180 | | |
181 | 133 | int HttpStreamAction::on_header(HttpRequest* req) { |
182 | | // Call parent's auth check first |
183 | 133 | int ret = HttpHandlerWithAuth::on_header(req); |
184 | 133 | if (ret != 0) { |
185 | 0 | return ret; // Auth failed, return error |
186 | 0 | } |
187 | | |
188 | 133 | http_stream_current_processing->increment(1); |
189 | | |
190 | 133 | std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env); |
191 | 133 | req->set_handler_ctx(ctx); |
192 | | |
193 | 133 | ctx->load_type = TLoadType::MANUL_LOAD; |
194 | 133 | ctx->load_src_type = TLoadSourceType::RAW; |
195 | 133 | ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true"; |
196 | 133 | Status st = _handle_group_commit(req, ctx); |
197 | | |
198 | 133 | LOG(INFO) << "new income streaming load request." << ctx->brief() |
199 | 133 | << " sql : " << req->header(HTTP_SQL) << ", group_commit=" << ctx->group_commit; |
200 | 133 | if (st.ok()) { |
201 | 133 | st = _on_header(req, ctx); |
202 | 133 | } |
203 | 133 | if (!st.ok()) { |
204 | 0 | ctx->status = std::move(st); |
205 | 0 | if (ctx->body_sink != nullptr) { |
206 | 0 | ctx->body_sink->cancel(ctx->status.to_string()); |
207 | 0 | } |
208 | 0 | auto str = ctx->to_json(); |
209 | | // add new line at end |
210 | 0 | str = str + '\n'; |
211 | 0 | HttpChannel::send_reply(req, str); |
212 | 0 | if (config::enable_stream_load_record || |
213 | 0 | config::enable_stream_load_record_to_audit_log_table) { |
214 | 0 | if (req->header(HTTP_SKIP_RECORD_TO_AUDIT_LOG_TABLE).empty()) { |
215 | 0 | str = ctx->prepare_stream_load_record(str); |
216 | 0 | _save_stream_load_record(ctx, str); |
217 | 0 | } |
218 | 0 | } |
219 | 0 | return -1; |
220 | 0 | } |
221 | 133 | return 0; |
222 | 133 | } |
223 | | |
224 | 133 | Status HttpStreamAction::_on_header(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx) { |
225 | | // auth information |
226 | 133 | if (!parse_basic_auth(*http_req, &ctx->auth)) { |
227 | 0 | LOG(WARNING) << "parse basic authorization failed." << ctx->brief(); |
228 | 0 | return Status::NotAuthorized("no valid Basic authorization"); |
229 | 0 | } |
230 | | |
231 | | // TODO(zs) : need Need to request an FE to obtain information such as format |
232 | | // check content length |
233 | 133 | ctx->body_bytes = 0; |
234 | 133 | size_t csv_max_body_bytes = config::streaming_load_max_mb * 1024 * 1024; |
235 | 133 | if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) { |
236 | 130 | try { |
237 | 130 | ctx->body_bytes = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); |
238 | 130 | } catch (const std::exception& e) { |
239 | 0 | return Status::InvalidArgument("invalid HTTP header CONTENT_LENGTH={}: {}", |
240 | 0 | http_req->header(HttpHeaders::CONTENT_LENGTH), e.what()); |
241 | 0 | } |
242 | | // csv max body size |
243 | 130 | if (ctx->body_bytes > csv_max_body_bytes) { |
244 | 0 | LOG(WARNING) << "body exceed max size." << ctx->brief(); |
245 | 0 | return Status::Error<ErrorCode::EXCEEDED_LIMIT>( |
246 | 0 | "body size {} exceed BE's conf `streaming_load_max_mb` {}. increase it if you " |
247 | 0 | "are sure this load is reasonable", |
248 | 0 | ctx->body_bytes, csv_max_body_bytes); |
249 | 0 | } |
250 | 130 | } |
251 | | |
252 | 133 | auto pipe = std::make_shared<io::StreamLoadPipe>( |
253 | 133 | io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, |
254 | 133 | ctx->body_bytes /* total_length */); |
255 | 133 | ctx->body_sink = pipe; |
256 | 133 | ctx->pipe = pipe; |
257 | | |
258 | 133 | RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, ctx)); |
259 | | |
260 | | // Here, transactions are set from fe's NativeInsertStmt. |
261 | | // TODO(zs) : How to support two_phase_commit |
262 | | |
263 | 133 | return Status::OK(); |
264 | 133 | } |
265 | | |
266 | 9.85k | void HttpStreamAction::on_chunk_data(HttpRequest* req) { |
267 | 9.85k | std::shared_ptr<StreamLoadContext> ctx = |
268 | 9.85k | std::static_pointer_cast<StreamLoadContext>(req->handler_ctx()); |
269 | 9.85k | if (ctx == nullptr || !ctx->status.ok()) { |
270 | 8 | return; |
271 | 8 | } |
272 | 9.84k | if (!req->header(HTTP_WAL_ID_KY).empty()) { |
273 | 0 | ctx->wal_id = std::stoll(req->header(HTTP_WAL_ID_KY)); |
274 | 0 | } |
275 | 9.84k | struct evhttp_request* ev_req = req->get_evhttp_request(); |
276 | 9.84k | auto evbuf = evhttp_request_get_input_buffer(ev_req); |
277 | | |
278 | | // In HttpStreamAction::on_chunk_data |
279 | | // -> process_put |
280 | | // -> StreamLoadExecutor::execute_plan_fragment |
281 | | // -> exec_plan_fragment |
282 | | // , SCOPED_ATTACH_TASK will be called. |
283 | 9.84k | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->stream_load_pipe_tracker()); |
284 | | |
285 | 9.84k | int64_t start_read_data_time = MonotonicNanos(); |
286 | 9.84k | Status st = ctx->allocate_schema_buffer(); |
287 | 9.84k | if (!st.ok()) { |
288 | 0 | ctx->status = st; |
289 | 0 | return; |
290 | 0 | } |
291 | 19.6k | while (evbuffer_get_length(evbuf) > 0) { |
292 | 9.84k | ByteBufferPtr bb; |
293 | 9.84k | st = ByteBuffer::allocate(128 * 1024, &bb); |
294 | 9.84k | if (!st.ok()) { |
295 | 0 | ctx->status = st; |
296 | 0 | return; |
297 | 0 | } |
298 | 9.84k | auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity); |
299 | 9.84k | bb->pos = remove_bytes; |
300 | 9.84k | bb->flip(); |
301 | 9.84k | st = ctx->body_sink->append(bb); |
302 | | // schema_buffer stores 1M of data for parsing column information |
303 | | // need to determine whether to cache for the first time |
304 | 9.84k | if (ctx->is_read_schema) { |
305 | 133 | if (ctx->schema_buffer()->pos + remove_bytes < config::stream_tvf_buffer_size) { |
306 | 133 | ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes); |
307 | 133 | } else { |
308 | 0 | LOG(INFO) << "use a portion of data to request fe to obtain column information"; |
309 | 0 | ctx->is_read_schema = false; |
310 | 0 | ctx->status = process_put(req, ctx); |
311 | 0 | } |
312 | 133 | } |
313 | 9.84k | if (!st.ok()) { |
314 | 0 | LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief(); |
315 | 0 | ctx->status = st; |
316 | 0 | return; |
317 | 0 | } |
318 | 9.84k | ctx->receive_bytes += remove_bytes; |
319 | 9.84k | } |
320 | | // after all the data has been read and it has not reached 1M, it will execute here |
321 | 9.84k | if (ctx->is_read_schema) { |
322 | 133 | LOG(INFO) << "after all the data has been read and it has not reached 1M, it will execute " |
323 | 133 | << "here"; |
324 | 133 | ctx->is_read_schema = false; |
325 | 133 | ctx->status = process_put(req, ctx); |
326 | 133 | } |
327 | 9.84k | ctx->read_data_cost_nanos += (MonotonicNanos() - start_read_data_time); |
328 | 9.84k | } |
329 | | |
330 | 133 | void HttpStreamAction::free_handler_ctx(std::shared_ptr<void> param) { |
331 | 133 | std::shared_ptr<StreamLoadContext> ctx = std::static_pointer_cast<StreamLoadContext>(param); |
332 | 133 | if (ctx == nullptr) { |
333 | 0 | return; |
334 | 0 | } |
335 | | // sender is gone, make receiver know it |
336 | 133 | if (ctx->body_sink != nullptr) { |
337 | 133 | ctx->body_sink->cancel("sender is gone"); |
338 | 133 | } |
339 | | // remove stream load context from stream load manager and the resource will be released |
340 | 133 | ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id); |
341 | 133 | http_stream_current_processing->increment(-1); |
342 | 133 | } |
343 | | |
344 | | Status HttpStreamAction::process_put(HttpRequest* http_req, |
345 | 133 | std::shared_ptr<StreamLoadContext> ctx) { |
346 | 133 | TStreamLoadPutRequest request; |
347 | 133 | if (http_req != nullptr) { |
348 | 133 | request.__set_load_sql(http_req->header(HTTP_SQL)); |
349 | 133 | if (!http_req->header(HTTP_MEMTABLE_ON_SINKNODE).empty()) { |
350 | 0 | bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), "true"); |
351 | 0 | request.__set_memtable_on_sink_node(value); |
352 | 0 | } |
353 | 133 | } else { |
354 | 0 | request.__set_token(ctx->auth.token); |
355 | 0 | request.__set_load_sql(ctx->sql_str); |
356 | 0 | ctx->auth.token = ""; |
357 | 0 | } |
358 | 133 | set_request_auth(&request, ctx->auth); |
359 | 133 | request.__set_loadId(ctx->id.to_thrift()); |
360 | 133 | request.__set_label(ctx->label); |
361 | 133 | if (ctx->group_commit) { |
362 | 13 | if (!http_req->header(HTTP_GROUP_COMMIT).empty()) { |
363 | 13 | request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT)); |
364 | 13 | } else { |
365 | | // used for wait_internal_group_commit_finish |
366 | 0 | request.__set_group_commit_mode("sync_mode"); |
367 | 0 | } |
368 | 13 | } |
369 | 133 | if (_exec_env->cluster_info()->backend_id != 0) { |
370 | 133 | request.__set_backend_id(_exec_env->cluster_info()->backend_id); |
371 | 133 | } else { |
372 | 0 | LOG(WARNING) << "_exec_env->cluster_info not set backend_id"; |
373 | 0 | } |
374 | 133 | if (ctx->wal_id > 0) { |
375 | 0 | request.__set_partial_update(false); |
376 | 0 | } |
377 | | |
378 | | // plan this load |
379 | 133 | TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; |
380 | 133 | int64_t stream_load_put_start_time = MonotonicNanos(); |
381 | 133 | RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( |
382 | 133 | master_addr.hostname, master_addr.port, |
383 | 133 | [&request, ctx](FrontendServiceConnection& client) { |
384 | 133 | client->streamLoadPut(ctx->put_result, request); |
385 | 133 | })); |
386 | 133 | ctx->put_result.pipeline_params.query_options.__set_enable_strict_cast(false); |
387 | 133 | ctx->stream_load_put_cost_nanos = MonotonicNanos() - stream_load_put_start_time; |
388 | 133 | Status plan_status(Status::create(ctx->put_result.status)); |
389 | 133 | if (!plan_status.ok()) { |
390 | 20 | LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << ctx->brief(); |
391 | 20 | return plan_status; |
392 | 20 | } |
393 | 113 | if (config::is_cloud_mode() && ctx->two_phase_commit && ctx->is_mow_table()) { |
394 | 1 | return Status::NotSupported("http stream 2pc is unsupported for mow table"); |
395 | 1 | } |
396 | 112 | ctx->db = ctx->put_result.pipeline_params.db_name; |
397 | 112 | ctx->table = ctx->put_result.pipeline_params.table_name; |
398 | 112 | ctx->txn_id = ctx->put_result.pipeline_params.txn_conf.txn_id; |
399 | 112 | ctx->label = ctx->put_result.pipeline_params.import_label; |
400 | 112 | ctx->put_result.pipeline_params.__set_wal_id(ctx->wal_id); |
401 | 112 | if (http_req != nullptr && http_req->header(HTTP_GROUP_COMMIT) == "async_mode") { |
402 | | // FIXME find a way to avoid chunked stream load write large WALs |
403 | 12 | size_t content_length = 0; |
404 | 12 | if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) { |
405 | 10 | try { |
406 | 10 | content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); |
407 | 10 | } catch (const std::exception& e) { |
408 | 0 | return Status::InvalidArgument("invalid HTTP header CONTENT_LENGTH={}: {}", |
409 | 0 | http_req->header(HttpHeaders::CONTENT_LENGTH), |
410 | 0 | e.what()); |
411 | 0 | } |
412 | 10 | if (is_compressed_file_scan(ctx->put_result.pipeline_params)) { |
413 | 5 | content_length *= 3; |
414 | 5 | } |
415 | 10 | } |
416 | 12 | ctx->put_result.pipeline_params.__set_content_length(content_length); |
417 | 12 | } |
418 | 112 | TPipelineFragmentParamsList mocked; |
419 | 112 | return _exec_env->stream_load_executor()->execute_plan_fragment(ctx, mocked); |
420 | 112 | } |
421 | | |
422 | | void HttpStreamAction::_save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx, |
423 | 109 | const std::string& str) { |
424 | 109 | std::shared_ptr<StreamLoadRecorder> stream_load_recorder = |
425 | 109 | ExecEnv::GetInstance()->storage_engine().get_stream_load_recorder(); |
426 | | |
427 | 109 | if (stream_load_recorder != nullptr) { |
428 | 109 | std::string key = |
429 | 109 | std::to_string(ctx->start_millis + ctx->load_cost_millis) + "_" + ctx->label; |
430 | 109 | auto st = stream_load_recorder->put(key, str); |
431 | 109 | if (st.ok()) { |
432 | 109 | LOG(INFO) << "put stream_load_record rocksdb successfully. label: " << ctx->label |
433 | 109 | << ", key: " << key; |
434 | 109 | } |
435 | 109 | } else { |
436 | 0 | LOG(WARNING) << "put stream_load_record rocksdb failed. stream_load_recorder is null."; |
437 | 0 | } |
438 | 109 | } |
439 | | |
440 | | Status HttpStreamAction::_handle_group_commit(HttpRequest* req, |
441 | 133 | std::shared_ptr<StreamLoadContext> ctx) { |
442 | 133 | std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT); |
443 | 133 | if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") && |
444 | 133 | !iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) { |
445 | 0 | return Status::InvalidArgument( |
446 | 0 | "group_commit can only be [async_mode, sync_mode, off_mode]"); |
447 | 0 | } |
448 | 133 | if (config::wait_internal_group_commit_finish) { |
449 | 0 | group_commit_mode = "sync_mode"; |
450 | 0 | } |
451 | 133 | int64_t content_length = req->header(HttpHeaders::CONTENT_LENGTH).empty() |
452 | 133 | ? 0 |
453 | 133 | : std::stoll(req->header(HttpHeaders::CONTENT_LENGTH)); |
454 | 133 | if (content_length < 0) { |
455 | 0 | std::stringstream ss; |
456 | 0 | ss << "This http load content length <0 (" << content_length |
457 | 0 | << "), please check your content length."; |
458 | 0 | LOG(WARNING) << ss.str(); |
459 | 0 | return Status::InvalidArgument(ss.str()); |
460 | 0 | } |
461 | | // allow chunked stream load in flink |
462 | 133 | auto is_chunk = |
463 | 133 | !req->header(HttpHeaders::TRANSFER_ENCODING).empty() && |
464 | 133 | req->header(HttpHeaders::TRANSFER_ENCODING).find("chunked") != std::string::npos; |
465 | 133 | if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode") || |
466 | 133 | (content_length == 0 && !is_chunk)) { |
467 | | // off_mode and empty |
468 | 120 | ctx->group_commit = false; |
469 | 120 | return Status::OK(); |
470 | 120 | } |
471 | 13 | if (is_chunk) { |
472 | 2 | ctx->label = ""; |
473 | 2 | } |
474 | | |
475 | 13 | auto partial_columns = !req->header(HTTP_PARTIAL_COLUMNS).empty() && |
476 | 13 | iequal(req->header(HTTP_PARTIAL_COLUMNS), "true"); |
477 | 13 | auto temp_partitions = !req->header(HTTP_TEMP_PARTITIONS).empty(); |
478 | 13 | auto partitions = !req->header(HTTP_PARTITIONS).empty(); |
479 | 13 | if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit) { |
480 | 13 | if (!config::wait_internal_group_commit_finish && !ctx->label.empty()) { |
481 | 0 | return Status::InvalidArgument("label and group_commit can't be set at the same time"); |
482 | 0 | } |
483 | 13 | ctx->group_commit = true; |
484 | 13 | if (iequal(group_commit_mode, "async_mode")) { |
485 | 13 | if (!load_size_smaller_than_wal_limit(content_length)) { |
486 | 0 | std::stringstream ss; |
487 | 0 | ss << "There is no space for group commit http load async WAL. This http load " |
488 | 0 | "size is " |
489 | 0 | << content_length << ". WAL dir info: " |
490 | 0 | << ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string(); |
491 | 0 | LOG(WARNING) << ss.str(); |
492 | 0 | return Status::Error<EXCEEDED_LIMIT>(ss.str()); |
493 | 0 | } |
494 | 13 | } |
495 | 13 | } |
496 | 13 | return Status::OK(); |
497 | 13 | } |
498 | | |
499 | | } // namespace doris |