Coverage Report

Created: 2026-03-13 09:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/service/http/action/stream_load.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "service/http/action/stream_load.h"
19
20
// use string iequal
21
#include <event2/buffer.h>
22
#include <event2/http.h>
23
#include <gen_cpp/FrontendService.h>
24
#include <gen_cpp/FrontendService_types.h>
25
#include <gen_cpp/HeartbeatService_types.h>
26
#include <gen_cpp/PaloInternalService_types.h>
27
#include <gen_cpp/PlanNodes_types.h>
28
#include <gen_cpp/Types_types.h>
29
#include <sys/time.h>
30
#include <thrift/protocol/TDebugProtocol.h>
31
32
#include <algorithm>
33
#include <cstdint>
34
#include <cstdlib>
35
#include <ctime>
36
#include <functional>
37
#include <future>
38
#include <sstream>
39
#include <stdexcept>
40
#include <utility>
41
42
#include "cloud/config.h"
43
#include "common/config.h"
44
#include "common/consts.h"
45
#include "common/logging.h"
46
#include "common/metrics/doris_metrics.h"
47
#include "common/metrics/metrics.h"
48
#include "common/status.h"
49
#include "common/utils.h"
50
#include "io/fs/stream_load_pipe.h"
51
#include "load/group_commit/group_commit_mgr.h"
52
#include "load/load_path_mgr.h"
53
#include "load/message_body_sink.h"
54
#include "load/stream_load/new_load_stream_mgr.h"
55
#include "load/stream_load/stream_load_context.h"
56
#include "load/stream_load/stream_load_executor.h"
57
#include "load/stream_load/stream_load_recorder.h"
58
#include "runtime/exec_env.h"
59
#include "service/http/http_channel.h"
60
#include "service/http/http_common.h"
61
#include "service/http/http_headers.h"
62
#include "service/http/http_request.h"
63
#include "service/http/utils.h"
64
#include "storage/storage_engine.h"
65
#include "util/byte_buffer.h"
66
#include "util/client_cache.h"
67
#include "util/load_util.h"
68
#include "util/string_util.h"
69
#include "util/thrift_rpc_helper.h"
70
#include "util/time.h"
71
#include "util/uid_util.h"
72
#include "util/url_coding.h"
73
74
namespace doris {
75
using namespace ErrorCode;
76
77
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_requests_total, MetricUnit::REQUESTS);
78
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_duration_ms, MetricUnit::MILLISECONDS);
79
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_current_processing, MetricUnit::REQUESTS);
80
81
bvar::LatencyRecorder g_stream_load_receive_data_latency_ms("stream_load_receive_data_latency_ms");
82
bvar::LatencyRecorder g_stream_load_commit_and_publish_latency_ms("stream_load",
83
                                                                  "commit_and_publish_ms");
84
85
static constexpr size_t MIN_CHUNK_SIZE = 64 * 1024;
86
static const std::string CHUNK = "chunked";
87
88
#ifdef BE_TEST
89
TStreamLoadPutResult k_stream_load_put_result;
90
#endif
91
92
8
StreamLoadAction::StreamLoadAction(ExecEnv* exec_env) : _exec_env(exec_env) {
93
8
    _stream_load_entity =
94
8
            DorisMetrics::instance()->metric_registry()->register_entity("stream_load");
95
8
    INT_COUNTER_METRIC_REGISTER(_stream_load_entity, streaming_load_requests_total);
96
8
    INT_COUNTER_METRIC_REGISTER(_stream_load_entity, streaming_load_duration_ms);
97
8
    INT_GAUGE_METRIC_REGISTER(_stream_load_entity, streaming_load_current_processing);
98
8
}
99
100
4
StreamLoadAction::~StreamLoadAction() {
101
4
    DorisMetrics::instance()->metric_registry()->deregister_entity(_stream_load_entity);
102
4
}
103
104
2.10k
void StreamLoadAction::handle(HttpRequest* req) {
105
2.10k
    std::shared_ptr<StreamLoadContext> ctx =
106
2.10k
            std::static_pointer_cast<StreamLoadContext>(req->handler_ctx());
107
2.10k
    if (ctx == nullptr) {
108
0
        return;
109
0
    }
110
111
2.10k
    {
112
2.10k
        std::unique_lock<std::mutex> lock1(ctx->_send_reply_lock);
113
2.10k
        ctx->_can_send_reply = true;
114
2.10k
        ctx->_can_send_reply_cv.notify_all();
115
2.10k
    }
116
117
    // status already set to fail
118
2.10k
    if (ctx->status.ok()) {
119
2.10k
        ctx->status = _handle(ctx, req);
120
2.10k
        if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
121
0
            _send_reply(ctx, req);
122
0
        }
123
2.10k
    }
124
2.10k
}
125
126
2.10k
Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req) {
127
2.10k
    if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
128
0
        LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes
129
0
                     << ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id;
130
0
        return Status::Error<ErrorCode::NETWORK_ERROR>("receive body don't equal with body bytes");
131
0
    }
132
133
    // if we use non-streaming, MessageBodyFileSink.finish will close the file
134
2.10k
    RETURN_IF_ERROR(ctx->body_sink->finish());
135
2.10k
    if (!ctx->use_streaming) {
136
        // we need to close file first, then execute_plan_fragment here
137
21
        ctx->body_sink.reset();
138
21
        TPipelineFragmentParamsList mocked;
139
21
        RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(
140
21
                ctx, mocked,
141
21
                [req, this](std::shared_ptr<StreamLoadContext> ctx) { _on_finish(ctx, req); }));
142
21
    }
143
144
2.10k
    return Status::OK();
145
2.10k
}
146
147
2.10k
void StreamLoadAction::_on_finish(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req) {
148
2.10k
    ctx->status = ctx->load_status_future.get();
149
2.10k
    if (ctx->status.ok()) {
150
1.78k
        if (ctx->group_commit) {
151
21
            LOG(INFO) << "skip commit because this is group commit, pipe_id="
152
21
                      << ctx->id.to_string();
153
1.76k
        } else if (ctx->two_phase_commit) {
154
32
            int64_t pre_commit_start_time = MonotonicNanos();
155
32
            ctx->status = _exec_env->stream_load_executor()->pre_commit_txn(ctx.get());
156
32
            ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time;
157
1.72k
        } else {
158
            // If put file success we need commit this load
159
1.72k
            int64_t commit_and_publish_start_time = MonotonicNanos();
160
1.72k
            ctx->status = _exec_env->stream_load_executor()->commit_txn(ctx.get());
161
1.72k
            ctx->commit_and_publish_txn_cost_nanos =
162
1.72k
                    MonotonicNanos() - commit_and_publish_start_time;
163
1.72k
            g_stream_load_commit_and_publish_latency_ms
164
1.72k
                    << ctx->commit_and_publish_txn_cost_nanos / 1000000;
165
1.72k
        }
166
1.78k
    }
167
2.10k
    _send_reply(ctx, req);
168
2.10k
}
169
170
2.28k
void StreamLoadAction::_send_reply(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req) {
171
2.28k
    std::unique_lock<std::mutex> lock1(ctx->_send_reply_lock);
172
    // 1. _can_send_reply: ensure `send_reply` is invoked only after on_header/handle complete,
173
    //    avoid client errors (e.g., broken pipe).
174
    // 2. _finish_send_reply: Prevent duplicate reply sending; skip reply if HTTP request is canceled
175
    //    due to long import execution time.
176
2.28k
    while (!ctx->_finish_send_reply && !ctx->_can_send_reply) {
177
2
        ctx->_can_send_reply_cv.wait(lock1);
178
2
    }
179
2.28k
    if (ctx->_finish_send_reply) {
180
0
        return;
181
0
    }
182
2.28k
    DCHECK(ctx->_can_send_reply);
183
2.28k
    ctx->_finish_send_reply = true;
184
2.28k
    ctx->_can_send_reply_cv.notify_all();
185
2.28k
    ctx->load_cost_millis = UnixMillis() - ctx->start_millis;
186
187
2.28k
    if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
188
506
        LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
189
506
                     << ", errmsg=" << ctx->status;
190
506
        if (ctx->need_rollback) {
191
475
            _exec_env->stream_load_executor()->rollback_txn(ctx.get());
192
475
            ctx->need_rollback = false;
193
475
        }
194
506
        if (ctx->body_sink != nullptr) {
195
475
            ctx->body_sink->cancel(ctx->status.to_string());
196
475
        }
197
506
    }
198
199
2.28k
    auto str = ctx->to_json();
200
    // add new line at end
201
2.28k
    str = str + '\n';
202
203
2.28k
#ifndef BE_TEST
204
2.28k
    if (config::enable_stream_load_record || config::enable_stream_load_record_to_audit_log_table) {
205
2.28k
        if (req->header(HTTP_SKIP_RECORD_TO_AUDIT_LOG_TABLE).empty()) {
206
2.13k
            str = ctx->prepare_stream_load_record(str);
207
2.13k
            _save_stream_load_record(ctx, str);
208
2.13k
        }
209
2.28k
    }
210
2.28k
#endif
211
212
2.28k
    HttpChannel::send_reply(req, str);
213
214
2.28k
    LOG(INFO) << "finished to execute stream load. label=" << ctx->label
215
2.28k
              << ", txn_id=" << ctx->txn_id << ", query_id=" << ctx->id
216
2.28k
              << ", load_cost_ms=" << ctx->load_cost_millis << ", receive_data_cost_ms="
217
2.28k
              << (ctx->receive_and_read_data_cost_nanos - ctx->read_data_cost_nanos) / 1000000
218
2.28k
              << ", read_data_cost_ms=" << ctx->read_data_cost_nanos / 1000000
219
2.28k
              << ", write_data_cost_ms=" << ctx->write_data_cost_nanos / 1000000
220
2.28k
              << ", commit_and_publish_txn_cost_ms="
221
2.28k
              << ctx->commit_and_publish_txn_cost_nanos / 1000000
222
2.28k
              << ", number_total_rows=" << ctx->number_total_rows
223
2.28k
              << ", number_loaded_rows=" << ctx->number_loaded_rows
224
2.28k
              << ", receive_bytes=" << ctx->receive_bytes << ", loaded_bytes=" << ctx->loaded_bytes
225
2.28k
              << ", error_url=" << ctx->error_url;
226
227
    // update statistics
228
2.28k
    streaming_load_requests_total->increment(1);
229
2.28k
    streaming_load_duration_ms->increment(ctx->load_cost_millis);
230
2.28k
    if (!ctx->data_saved_path.empty()) {
231
22
        _exec_env->load_path_mgr()->clean_tmp_files(ctx->data_saved_path);
232
22
    }
233
2.28k
}
234
235
2.28k
int StreamLoadAction::on_header(HttpRequest* req) {
236
2.28k
    req->mark_send_reply();
237
238
2.28k
    streaming_load_current_processing->increment(1);
239
240
2.28k
    std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env);
241
2.28k
    req->set_handler_ctx(ctx);
242
243
2.28k
    ctx->load_type = TLoadType::MANUL_LOAD;
244
2.28k
    ctx->load_src_type = TLoadSourceType::RAW;
245
246
2.28k
    url_decode(req->param(HTTP_DB_KEY), &ctx->db);
247
2.28k
    url_decode(req->param(HTTP_TABLE_KEY), &ctx->table);
248
2.28k
    ctx->label = req->header(HTTP_LABEL_KEY);
249
2.28k
    ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true";
250
2.28k
    Status st = _handle_group_commit(req, ctx);
251
2.28k
    if (!ctx->group_commit && ctx->label.empty()) {
252
96
        ctx->label = generate_uuid_string();
253
96
    }
254
255
2.28k
    LOG(INFO) << "new income streaming load request." << ctx->brief() << ", db=" << ctx->db
256
2.28k
              << ", tbl=" << ctx->table << ", group_commit=" << ctx->group_commit
257
2.28k
              << ", HTTP headers=" << req->get_all_headers();
258
2.28k
    ctx->begin_receive_and_read_data_cost_nanos = MonotonicNanos();
259
260
2.28k
    if (st.ok()) {
261
2.28k
        st = _on_header(req, ctx);
262
2.28k
        LOG(INFO) << "finished to handle HTTP header, " << ctx->brief();
263
2.28k
    }
264
2.28k
    if (!st.ok()) {
265
180
        ctx->status = std::move(st);
266
180
        {
267
180
            std::unique_lock<std::mutex> lock1(ctx->_send_reply_lock);
268
180
            ctx->_can_send_reply = true;
269
180
            ctx->_can_send_reply_cv.notify_all();
270
180
        }
271
180
        _send_reply(ctx, req);
272
180
        return -1;
273
180
    }
274
2.10k
    return 0;
275
2.28k
}
276
277
2.28k
Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx) {
278
    // auth information
279
2.28k
    if (!parse_basic_auth(*http_req, &ctx->auth)) {
280
0
        LOG(WARNING) << "parse basic authorization failed." << ctx->brief();
281
0
        return Status::NotAuthorized("no valid Basic authorization");
282
0
    }
283
284
    // get format of this put
285
2.28k
    std::string format_str = http_req->header(HTTP_FORMAT_KEY);
286
2.28k
    if (iequal(format_str, BeConsts::CSV_WITH_NAMES) ||
287
2.28k
        iequal(format_str, BeConsts::CSV_WITH_NAMES_AND_TYPES)) {
288
12
        ctx->header_type = format_str;
289
        //treat as CSV
290
12
        format_str = BeConsts::CSV;
291
12
    }
292
2.28k
    LoadUtil::parse_format(format_str, http_req->header(HTTP_COMPRESS_TYPE), &ctx->format,
293
2.28k
                           &ctx->compress_type);
294
2.28k
    if (ctx->format == TFileFormatType::FORMAT_UNKNOWN) {
295
1
        return Status::Error<ErrorCode::DATA_FILE_TYPE_ERROR>("unknown data format, format={}",
296
1
                                                              http_req->header(HTTP_FORMAT_KEY));
297
1
    }
298
299
    // check content length
300
2.28k
    ctx->body_bytes = 0;
301
2.28k
    size_t csv_max_body_bytes = config::streaming_load_max_mb * 1024 * 1024;
302
2.28k
    size_t json_max_body_bytes = config::streaming_load_json_max_mb * 1024 * 1024;
303
2.28k
    bool read_json_by_line = false;
304
2.28k
    if (!http_req->header(HTTP_READ_JSON_BY_LINE).empty()) {
305
419
        if (iequal(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) {
306
417
            read_json_by_line = true;
307
417
        }
308
419
    }
309
2.28k
    if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
310
1.97k
        try {
311
1.97k
            ctx->body_bytes = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
312
1.97k
        } catch (const std::exception& e) {
313
0
            return Status::InvalidArgument("invalid HTTP header CONTENT_LENGTH={}: {}",
314
0
                                           http_req->header(HttpHeaders::CONTENT_LENGTH), e.what());
315
0
        }
316
        // json max body size
317
1.97k
        if ((ctx->format == TFileFormatType::FORMAT_JSON) &&
318
1.97k
            (ctx->body_bytes > json_max_body_bytes) && !read_json_by_line) {
319
0
            return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
320
0
                    "json body size {} exceed BE's conf `streaming_load_json_max_mb` {}. increase "
321
0
                    "it if you are sure this load is reasonable",
322
0
                    ctx->body_bytes, json_max_body_bytes);
323
0
        }
324
        // csv max body size
325
1.97k
        else if (ctx->body_bytes > csv_max_body_bytes) {
326
0
            LOG(WARNING) << "body exceed max size." << ctx->brief();
327
0
            return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
328
0
                    "body size {} exceed BE's conf `streaming_load_max_mb` {}. increase it if you "
329
0
                    "are sure this load is reasonable",
330
0
                    ctx->body_bytes, csv_max_body_bytes);
331
0
        }
332
1.97k
    } else {
333
314
#ifndef BE_TEST
334
314
        evhttp_connection_set_max_body_size(
335
314
                evhttp_request_get_connection(http_req->get_evhttp_request()), csv_max_body_bytes);
336
314
#endif
337
314
    }
338
339
2.28k
    if (!http_req->header(HttpHeaders::TRANSFER_ENCODING).empty()) {
340
314
        if (http_req->header(HttpHeaders::TRANSFER_ENCODING).find(CHUNK) != std::string::npos) {
341
314
            ctx->is_chunked_transfer = true;
342
314
        }
343
314
    }
344
2.28k
    if (UNLIKELY((http_req->header(HttpHeaders::CONTENT_LENGTH).empty() &&
345
2.28k
                  !ctx->is_chunked_transfer))) {
346
1
        LOG(WARNING) << "content_length is empty and transfer-encoding!=chunked, please set "
347
1
                        "content_length or transfer-encoding=chunked";
348
1
        return Status::InvalidArgument(
349
1
                "content_length is empty and transfer-encoding!=chunked, please set content_length "
350
1
                "or transfer-encoding=chunked");
351
2.28k
    } else if (UNLIKELY(!http_req->header(HttpHeaders::CONTENT_LENGTH).empty() &&
352
2.28k
                        ctx->is_chunked_transfer)) {
353
1
        LOG(WARNING) << "please do not set both content_length and transfer-encoding";
354
1
        return Status::InvalidArgument(
355
1
                "please do not set both content_length and transfer-encoding");
356
1
    }
357
358
2.28k
    if (!http_req->header(HTTP_TIMEOUT).empty()) {
359
168
        ctx->timeout_second = DORIS_TRY(safe_stoi(http_req->header(HTTP_TIMEOUT), HTTP_TIMEOUT));
360
167
    }
361
2.28k
    if (!http_req->header(HTTP_COMMENT).empty()) {
362
1
        ctx->load_comment = http_req->header(HTTP_COMMENT);
363
1
    }
364
    // begin transaction
365
2.28k
    if (!ctx->group_commit) {
366
2.26k
        int64_t begin_txn_start_time = MonotonicNanos();
367
2.26k
        RETURN_IF_ERROR(_exec_env->stream_load_executor()->begin_txn(ctx.get()));
368
2.23k
        ctx->begin_txn_cost_nanos = MonotonicNanos() - begin_txn_start_time;
369
2.23k
    }
370
371
    // process put file
372
2.25k
    return _process_put(http_req, ctx);
373
2.28k
}
374
375
316k
void StreamLoadAction::on_chunk_data(HttpRequest* req) {
376
316k
    std::shared_ptr<StreamLoadContext> ctx =
377
316k
            std::static_pointer_cast<StreamLoadContext>(req->handler_ctx());
378
316k
    if (ctx == nullptr || !ctx->status.ok()) {
379
20.7k
        return;
380
20.7k
    }
381
382
296k
    struct evhttp_request* ev_req = req->get_evhttp_request();
383
296k
    auto evbuf = evhttp_request_get_input_buffer(ev_req);
384
385
296k
    SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
386
387
296k
    int64_t start_read_data_time = MonotonicNanos();
388
592k
    while (evbuffer_get_length(evbuf) > 0) {
389
296k
        ByteBufferPtr bb;
390
296k
        Status st = ByteBuffer::allocate(128 * 1024, &bb);
391
296k
        if (!st.ok()) {
392
0
            ctx->status = st;
393
0
            return;
394
0
        }
395
296k
        auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
396
296k
        bb->pos = remove_bytes;
397
296k
        bb->flip();
398
296k
        st = ctx->body_sink->append(bb);
399
296k
        if (!st.ok()) {
400
3
            LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief();
401
3
            ctx->status = st;
402
3
            return;
403
3
        }
404
296k
        ctx->receive_bytes += remove_bytes;
405
296k
    }
406
296k
    int64_t read_data_time = MonotonicNanos() - start_read_data_time;
407
296k
    int64_t last_receive_and_read_data_cost_nanos = ctx->receive_and_read_data_cost_nanos;
408
296k
    ctx->read_data_cost_nanos += read_data_time;
409
296k
    ctx->receive_and_read_data_cost_nanos =
410
296k
            MonotonicNanos() - ctx->begin_receive_and_read_data_cost_nanos;
411
296k
    g_stream_load_receive_data_latency_ms
412
296k
            << (ctx->receive_and_read_data_cost_nanos - last_receive_and_read_data_cost_nanos -
413
296k
                read_data_time) /
414
296k
                       1000000;
415
296k
}
416
417
2.28k
void StreamLoadAction::free_handler_ctx(std::shared_ptr<void> param) {
418
2.28k
    std::shared_ptr<StreamLoadContext> ctx = std::static_pointer_cast<StreamLoadContext>(param);
419
2.28k
    if (ctx == nullptr) {
420
0
        return;
421
0
    }
422
    // sender is gone, make receiver know it
423
2.28k
    if (ctx->body_sink != nullptr) {
424
2.23k
        ctx->body_sink->cancel("sender is gone");
425
2.23k
    }
426
    // remove stream load context from stream load manager and the resource will be released
427
2.28k
    ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
428
2.28k
    streaming_load_current_processing->increment(-1);
429
2.28k
}
430
431
Status StreamLoadAction::_process_put(HttpRequest* http_req,
432
2.25k
                                      std::shared_ptr<StreamLoadContext> ctx) {
433
    // Now we use stream
434
2.25k
    ctx->use_streaming = LoadUtil::is_format_support_streaming(ctx->format);
435
436
    // put request
437
2.25k
    TStreamLoadPutRequest request;
438
2.25k
    set_request_auth(&request, ctx->auth);
439
2.25k
    request.db = ctx->db;
440
2.25k
    request.tbl = ctx->table;
441
2.25k
    request.txnId = ctx->txn_id;
442
2.25k
    request.formatType = ctx->format;
443
2.25k
    request.__set_compress_type(ctx->compress_type);
444
2.25k
    request.__set_header_type(ctx->header_type);
445
2.25k
    request.__set_loadId(ctx->id.to_thrift());
446
2.25k
    if (ctx->use_streaming) {
447
2.23k
        std::shared_ptr<io::StreamLoadPipe> pipe;
448
2.23k
        if (ctx->is_chunked_transfer) {
449
313
            pipe = std::make_shared<io::StreamLoadPipe>(
450
313
                    io::kMaxPipeBufferedBytes /* max_buffered_bytes */);
451
313
            pipe->set_is_chunked_transfer(true);
452
1.92k
        } else {
453
1.92k
            pipe = std::make_shared<io::StreamLoadPipe>(
454
1.92k
                    io::kMaxPipeBufferedBytes /* max_buffered_bytes */,
455
1.92k
                    MIN_CHUNK_SIZE /* min_chunk_size */, ctx->body_bytes /* total_length */);
456
1.92k
        }
457
2.23k
        request.fileType = TFileType::FILE_STREAM;
458
2.23k
        ctx->body_sink = pipe;
459
2.23k
        ctx->pipe = pipe;
460
2.23k
        RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, ctx));
461
2.23k
    } else {
462
22
        RETURN_IF_ERROR(_data_saved_path(http_req, &request.path, ctx->body_bytes));
463
22
        auto file_sink = std::make_shared<MessageBodyFileSink>(request.path);
464
22
        RETURN_IF_ERROR(file_sink->open());
465
22
        request.__isset.path = true;
466
22
        request.fileType = TFileType::FILE_LOCAL;
467
22
        request.__set_file_size(ctx->body_bytes);
468
22
        ctx->body_sink = file_sink;
469
22
        ctx->data_saved_path = request.path;
470
22
    }
471
2.25k
    if (!http_req->header(HTTP_COLUMNS).empty()) {
472
1.11k
        request.__set_columns(http_req->header(HTTP_COLUMNS));
473
1.11k
    }
474
2.25k
    if (!http_req->header(HTTP_WHERE).empty()) {
475
10
        request.__set_where(http_req->header(HTTP_WHERE));
476
10
    }
477
2.25k
    if (!http_req->header(HTTP_COLUMN_SEPARATOR).empty()) {
478
1.53k
        request.__set_columnSeparator(http_req->header(HTTP_COLUMN_SEPARATOR));
479
1.53k
    }
480
2.25k
    if (!http_req->header(HTTP_LINE_DELIMITER).empty()) {
481
126
        request.__set_line_delimiter(http_req->header(HTTP_LINE_DELIMITER));
482
126
    }
483
2.25k
    if (!http_req->header(HTTP_ENCLOSE).empty() && !http_req->header(HTTP_ENCLOSE).empty()) {
484
80
        const auto& enclose_str = http_req->header(HTTP_ENCLOSE);
485
80
        if (enclose_str.length() != 1) {
486
0
            return Status::InvalidArgument("enclose must be single-char, actually is {}",
487
0
                                           enclose_str);
488
0
        }
489
80
        request.__set_enclose(http_req->header(HTTP_ENCLOSE)[0]);
490
80
    }
491
2.25k
    if (!http_req->header(HTTP_ESCAPE).empty() && !http_req->header(HTTP_ESCAPE).empty()) {
492
80
        const auto& escape_str = http_req->header(HTTP_ESCAPE);
493
80
        if (escape_str.length() != 1) {
494
0
            return Status::InvalidArgument("escape must be single-char, actually is {}",
495
0
                                           escape_str);
496
0
        }
497
80
        request.__set_escape(http_req->header(HTTP_ESCAPE)[0]);
498
80
    }
499
2.25k
    if (!http_req->header(HTTP_PARTITIONS).empty()) {
500
14
        request.__set_partitions(http_req->header(HTTP_PARTITIONS));
501
14
        request.__set_isTempPartition(false);
502
14
        if (!http_req->header(HTTP_TEMP_PARTITIONS).empty()) {
503
0
            return Status::InvalidArgument(
504
0
                    "Can not specify both partitions and temporary partitions");
505
0
        }
506
14
    }
507
2.25k
    if (!http_req->header(HTTP_TEMP_PARTITIONS).empty()) {
508
1
        request.__set_partitions(http_req->header(HTTP_TEMP_PARTITIONS));
509
1
        request.__set_isTempPartition(true);
510
1
        if (!http_req->header(HTTP_PARTITIONS).empty()) {
511
0
            return Status::InvalidArgument(
512
0
                    "Can not specify both partitions and temporary partitions");
513
0
        }
514
1
    }
515
2.25k
    if (!http_req->header(HTTP_NEGATIVE).empty() && http_req->header(HTTP_NEGATIVE) == "true") {
516
0
        request.__set_negative(true);
517
2.25k
    } else {
518
2.25k
        request.__set_negative(false);
519
2.25k
    }
520
2.25k
    bool strictMode = false;
521
2.25k
    if (!http_req->header(HTTP_STRICT_MODE).empty()) {
522
348
        if (iequal(http_req->header(HTTP_STRICT_MODE), "false")) {
523
217
            strictMode = false;
524
217
        } else if (iequal(http_req->header(HTTP_STRICT_MODE), "true")) {
525
131
            strictMode = true;
526
131
        } else {
527
0
            return Status::InvalidArgument("Invalid strict mode format. Must be bool type");
528
0
        }
529
348
        request.__set_strictMode(strictMode);
530
348
    }
531
    // timezone first. if not, try system time_zone
532
2.25k
    if (!http_req->header(HTTP_TIMEZONE).empty()) {
533
17
        request.__set_timezone(http_req->header(HTTP_TIMEZONE));
534
2.24k
    } else if (!http_req->header(HTTP_TIME_ZONE).empty()) {
535
0
        request.__set_timezone(http_req->header(HTTP_TIME_ZONE));
536
0
    }
537
2.25k
    if (!http_req->header(HTTP_EXEC_MEM_LIMIT).empty()) {
538
22
        try {
539
22
            request.__set_execMemLimit(std::stoll(http_req->header(HTTP_EXEC_MEM_LIMIT)));
540
22
        } catch (const std::invalid_argument& e) {
541
7
            return Status::InvalidArgument("Invalid mem limit format, {}", e.what());
542
7
        }
543
22
    }
544
2.25k
    if (!http_req->header(HTTP_JSONPATHS).empty()) {
545
27
        request.__set_jsonpaths(http_req->header(HTTP_JSONPATHS));
546
27
    }
547
2.25k
    if (!http_req->header(HTTP_JSONROOT).empty()) {
548
7
        request.__set_json_root(http_req->header(HTTP_JSONROOT));
549
7
    }
550
2.25k
    if (!http_req->header(HTTP_STRIP_OUTER_ARRAY).empty()) {
551
76
        if (iequal(http_req->header(HTTP_STRIP_OUTER_ARRAY), "true")) {
552
70
            request.__set_strip_outer_array(true);
553
70
        } else {
554
6
            request.__set_strip_outer_array(false);
555
6
        }
556
2.17k
    } else {
557
2.17k
        request.__set_strip_outer_array(false);
558
2.17k
    }
559
560
2.25k
    if (!http_req->header(HTTP_READ_JSON_BY_LINE).empty()) {
561
412
        if (iequal(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) {
562
410
            request.__set_read_json_by_line(true);
563
410
        } else {
564
2
            request.__set_read_json_by_line(false);
565
2
        }
566
1.83k
    } else {
567
1.83k
        request.__set_read_json_by_line(false);
568
1.83k
    }
569
570
2.25k
    if (http_req->header(HTTP_READ_JSON_BY_LINE).empty() &&
571
2.25k
        http_req->header(HTTP_STRIP_OUTER_ARRAY).empty()) {
572
1.77k
        request.__set_read_json_by_line(true);
573
1.77k
        request.__set_strip_outer_array(false);
574
1.77k
    }
575
576
2.25k
    if (!http_req->header(HTTP_NUM_AS_STRING).empty()) {
577
2
        if (iequal(http_req->header(HTTP_NUM_AS_STRING), "true")) {
578
1
            request.__set_num_as_string(true);
579
1
        } else {
580
1
            request.__set_num_as_string(false);
581
1
        }
582
2.24k
    } else {
583
2.24k
        request.__set_num_as_string(false);
584
2.24k
    }
585
2.25k
    if (!http_req->header(HTTP_FUZZY_PARSE).empty()) {
586
9
        if (iequal(http_req->header(HTTP_FUZZY_PARSE), "true")) {
587
8
            request.__set_fuzzy_parse(true);
588
8
        } else {
589
1
            request.__set_fuzzy_parse(false);
590
1
        }
591
2.24k
    } else {
592
2.24k
        request.__set_fuzzy_parse(false);
593
2.24k
    }
594
595
2.25k
    if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) {
596
90
        request.__set_sequence_col(
597
90
                http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL));
598
90
    }
599
600
2.25k
    if (!http_req->header(HTTP_SEND_BATCH_PARALLELISM).empty()) {
601
4
        int parallelism = DORIS_TRY(safe_stoi(http_req->header(HTTP_SEND_BATCH_PARALLELISM),
602
2
                                              HTTP_SEND_BATCH_PARALLELISM));
603
2
        request.__set_send_batch_parallelism(parallelism);
604
2
    }
605
606
2.24k
    if (!http_req->header(HTTP_LOAD_TO_SINGLE_TABLET).empty()) {
607
8
        if (iequal(http_req->header(HTTP_LOAD_TO_SINGLE_TABLET), "true")) {
608
7
            request.__set_load_to_single_tablet(true);
609
7
        } else {
610
1
            request.__set_load_to_single_tablet(false);
611
1
        }
612
8
    }
613
614
2.24k
    if (ctx->timeout_second != -1) {
615
167
        request.__set_timeout(ctx->timeout_second);
616
167
    }
617
2.24k
    request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms);
618
2.24k
    TMergeType::type merge_type = TMergeType::APPEND;
619
2.24k
    StringCaseMap<TMergeType::type> merge_type_map = {{"APPEND", TMergeType::APPEND},
620
2.24k
                                                      {"DELETE", TMergeType::DELETE},
621
2.24k
                                                      {"MERGE", TMergeType::MERGE}};
622
2.24k
    if (!http_req->header(HTTP_MERGE_TYPE).empty()) {
623
75
        std::string merge_type_str = http_req->header(HTTP_MERGE_TYPE);
624
75
        auto iter = merge_type_map.find(merge_type_str);
625
75
        if (iter != merge_type_map.end()) {
626
74
            merge_type = iter->second;
627
74
        } else {
628
1
            return Status::InvalidArgument("Invalid merge type {}", merge_type_str);
629
1
        }
630
74
        if (merge_type == TMergeType::MERGE && http_req->header(HTTP_DELETE_CONDITION).empty()) {
631
0
            return Status::InvalidArgument("Excepted DELETE ON clause when merge type is MERGE.");
632
74
        } else if (merge_type != TMergeType::MERGE &&
633
74
                   !http_req->header(HTTP_DELETE_CONDITION).empty()) {
634
0
            return Status::InvalidArgument(
635
0
                    "Not support DELETE ON clause when merge type is not MERGE.");
636
0
        }
637
74
    }
638
2.24k
    request.__set_merge_type(merge_type);
639
2.24k
    if (!http_req->header(HTTP_DELETE_CONDITION).empty()) {
640
17
        request.__set_delete_condition(http_req->header(HTTP_DELETE_CONDITION));
641
17
    }
642
643
2.24k
    if (!http_req->header(HTTP_MAX_FILTER_RATIO).empty()) {
644
328
        ctx->max_filter_ratio = strtod(http_req->header(HTTP_MAX_FILTER_RATIO).c_str(), nullptr);
645
328
        request.__set_max_filter_ratio(ctx->max_filter_ratio);
646
328
    }
647
648
2.24k
    if (!http_req->header(HTTP_HIDDEN_COLUMNS).empty()) {
649
96
        request.__set_hidden_columns(http_req->header(HTTP_HIDDEN_COLUMNS));
650
96
    }
651
2.24k
    if (!http_req->header(HTTP_TRIM_DOUBLE_QUOTES).empty()) {
652
89
        if (iequal(http_req->header(HTTP_TRIM_DOUBLE_QUOTES), "true")) {
653
25
            request.__set_trim_double_quotes(true);
654
64
        } else {
655
64
            request.__set_trim_double_quotes(false);
656
64
        }
657
89
    }
658
2.24k
    if (!http_req->header(HTTP_SKIP_LINES).empty()) {
659
81
        int skip_lines = DORIS_TRY(safe_stoi(http_req->header(HTTP_SKIP_LINES), HTTP_SKIP_LINES));
660
81
        if (skip_lines < 0) {
661
1
            return Status::InvalidArgument("Invalid 'skip_lines': {}", skip_lines);
662
1
        }
663
80
        request.__set_skip_lines(skip_lines);
664
80
    }
665
2.24k
    if (!http_req->header(HTTP_ENABLE_PROFILE).empty()) {
666
0
        if (iequal(http_req->header(HTTP_ENABLE_PROFILE), "true")) {
667
0
            request.__set_enable_profile(true);
668
0
        } else {
669
0
            request.__set_enable_profile(false);
670
0
        }
671
0
    }
672
673
2.24k
    if (!http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty()) {
674
170
        static const StringCaseMap<TUniqueKeyUpdateMode::type> unique_key_update_mode_map = {
675
170
                {"UPSERT", TUniqueKeyUpdateMode::UPSERT},
676
170
                {"UPDATE_FIXED_COLUMNS", TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS},
677
170
                {"UPDATE_FLEXIBLE_COLUMNS", TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS}};
678
170
        std::string unique_key_update_mode_str = http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE);
679
170
        auto iter = unique_key_update_mode_map.find(unique_key_update_mode_str);
680
170
        if (iter != unique_key_update_mode_map.end()) {
681
169
            TUniqueKeyUpdateMode::type unique_key_update_mode = iter->second;
682
169
            if (unique_key_update_mode == TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS) {
683
                // check constraints when flexible partial update is enabled
684
169
                if (ctx->format != TFileFormatType::FORMAT_JSON) {
685
1
                    return Status::InvalidArgument(
686
1
                            "flexible partial update only support json format as input file "
687
1
                            "currently");
688
1
                }
689
168
                if (!http_req->header(HTTP_FUZZY_PARSE).empty() &&
690
168
                    iequal(http_req->header(HTTP_FUZZY_PARSE), "true")) {
691
1
                    return Status::InvalidArgument(
692
1
                            "Don't support flexible partial update when 'fuzzy_parse' is enabled");
693
1
                }
694
167
                if (!http_req->header(HTTP_COLUMNS).empty()) {
695
1
                    return Status::InvalidArgument(
696
1
                            "Don't support flexible partial update when 'columns' is specified");
697
1
                }
698
166
                if (!http_req->header(HTTP_JSONPATHS).empty()) {
699
1
                    return Status::InvalidArgument(
700
1
                            "Don't support flexible partial update when 'jsonpaths' is specified");
701
1
                }
702
165
                if (!http_req->header(HTTP_HIDDEN_COLUMNS).empty()) {
703
1
                    return Status::InvalidArgument(
704
1
                            "Don't support flexible partial update when 'hidden_columns' is "
705
1
                            "specified");
706
1
                }
707
164
                if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) {
708
0
                    return Status::InvalidArgument(
709
0
                            "Don't support flexible partial update when "
710
0
                            "'function_column.sequence_col' is specified");
711
0
                }
712
164
                if (!http_req->header(HTTP_MERGE_TYPE).empty()) {
713
1
                    return Status::InvalidArgument(
714
1
                            "Don't support flexible partial update when "
715
1
                            "'merge_type' is specified");
716
1
                }
717
163
                if (!http_req->header(HTTP_WHERE).empty()) {
718
1
                    return Status::InvalidArgument(
719
1
                            "Don't support flexible partial update when "
720
1
                            "'where' is specified");
721
1
                }
722
163
            }
723
162
            request.__set_unique_key_update_mode(unique_key_update_mode);
724
162
        } else {
725
1
            return Status::InvalidArgument(
726
1
                    "Invalid unique_key_partial_mode {}, must be one of 'UPSERT', "
727
1
                    "'UPDATE_FIXED_COLUMNS' or 'UPDATE_FLEXIBLE_COLUMNS'",
728
1
                    unique_key_update_mode_str);
729
1
        }
730
170
    }
731
732
2.23k
    if (http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty() &&
733
2.23k
        !http_req->header(HTTP_PARTIAL_COLUMNS).empty()) {
734
        // only consider `partial_columns` parameter when `unique_key_update_mode` is not set
735
337
        if (iequal(http_req->header(HTTP_PARTIAL_COLUMNS), "true")) {
736
332
            request.__set_unique_key_update_mode(TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS);
737
            // for backward compatibility
738
332
            request.__set_partial_update(true);
739
332
        }
740
337
    }
741
742
2.23k
    if (!http_req->header(HTTP_PARTIAL_UPDATE_NEW_ROW_POLICY).empty()) {
743
18
        static const std::map<std::string, TPartialUpdateNewRowPolicy::type> policy_map {
744
18
                {"APPEND", TPartialUpdateNewRowPolicy::APPEND},
745
18
                {"ERROR", TPartialUpdateNewRowPolicy::ERROR}};
746
747
18
        auto policy_name = http_req->header(HTTP_PARTIAL_UPDATE_NEW_ROW_POLICY);
748
18
        std::transform(policy_name.begin(), policy_name.end(), policy_name.begin(),
749
92
                       [](unsigned char c) { return std::toupper(c); });
750
18
        auto it = policy_map.find(policy_name);
751
18
        if (it == policy_map.end()) {
752
0
            return Status::InvalidArgument(
753
0
                    "Invalid partial_update_new_key_behavior {}, must be one of {'APPEND', "
754
0
                    "'ERROR'}",
755
0
                    policy_name);
756
0
        }
757
18
        request.__set_partial_update_new_key_policy(it->second);
758
18
    }
759
760
2.23k
    if (!http_req->header(HTTP_MEMTABLE_ON_SINKNODE).empty()) {
761
52
        bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), "true");
762
52
        request.__set_memtable_on_sink_node(value);
763
52
    }
764
2.23k
    if (!http_req->header(HTTP_LOAD_STREAM_PER_NODE).empty()) {
765
0
        int stream_per_node = DORIS_TRY(
766
0
                safe_stoi(http_req->header(HTTP_LOAD_STREAM_PER_NODE), HTTP_LOAD_STREAM_PER_NODE));
767
0
        request.__set_stream_per_node(stream_per_node);
768
0
    }
769
2.23k
    if (ctx->group_commit) {
770
22
        if (!http_req->header(HTTP_GROUP_COMMIT).empty()) {
771
22
            request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT));
772
22
        } else {
773
            // used for wait_internal_group_commit_finish
774
0
            request.__set_group_commit_mode("sync_mode");
775
0
        }
776
22
    }
777
778
2.23k
    if (!http_req->header(HTTP_COMPUTE_GROUP).empty()) {
779
0
        request.__set_cloud_cluster(http_req->header(HTTP_COMPUTE_GROUP));
780
2.23k
    } else if (!http_req->header(HTTP_CLOUD_CLUSTER).empty()) {
781
66
        request.__set_cloud_cluster(http_req->header(HTTP_CLOUD_CLUSTER));
782
66
    }
783
784
2.23k
    if (!http_req->header(HTTP_EMPTY_FIELD_AS_NULL).empty()) {
785
1
        if (iequal(http_req->header(HTTP_EMPTY_FIELD_AS_NULL), "true")) {
786
1
            request.__set_empty_field_as_null(true);
787
1
        }
788
1
    }
789
790
2.23k
#ifndef BE_TEST
791
    // plan this load
792
2.23k
    TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
793
2.23k
    int64_t stream_load_put_start_time = MonotonicNanos();
794
2.23k
    RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
795
2.23k
            master_addr.hostname, master_addr.port,
796
2.23k
            [&request, ctx](FrontendServiceConnection& client) {
797
2.23k
                client->streamLoadPut(ctx->put_result, request);
798
2.23k
            }));
799
2.23k
    ctx->stream_load_put_cost_nanos = MonotonicNanos() - stream_load_put_start_time;
800
#else
801
    ctx->put_result = k_stream_load_put_result;
802
#endif
803
2.23k
    Status plan_status(Status::create(ctx->put_result.status));
804
2.23k
    if (!plan_status.ok()) {
805
130
        LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << ctx->brief();
806
130
        return plan_status;
807
130
    }
808
2.23k
    DCHECK(ctx->put_result.__isset.pipeline_params);
809
2.10k
    ctx->put_result.pipeline_params.query_options.__set_enable_strict_cast(false);
810
2.10k
    ctx->put_result.pipeline_params.query_options.__set_enable_insert_strict(strictMode);
811
2.10k
    if (config::is_cloud_mode() && ctx->two_phase_commit && ctx->is_mow_table()) {
812
1
        return Status::NotSupported("stream load 2pc is unsupported for mow table");
813
1
    }
814
2.10k
    if (http_req->header(HTTP_GROUP_COMMIT) == "async_mode") {
815
        // FIXME find a way to avoid chunked stream load write large WALs
816
20
        size_t content_length = 0;
817
20
        if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
818
18
            try {
819
18
                content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
820
18
            } catch (const std::exception& e) {
821
0
                return Status::InvalidArgument("invalid HTTP header CONTENT_LENGTH={}: {}",
822
0
                                               http_req->header(HttpHeaders::CONTENT_LENGTH),
823
0
                                               e.what());
824
0
            }
825
18
            if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
826
18
                ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
827
18
                ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
828
18
                ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
829
18
                ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
830
18
                ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
831
18
                ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
832
4
                content_length *= 3;
833
4
            }
834
18
        }
835
20
        ctx->put_result.pipeline_params.__set_content_length(content_length);
836
20
    }
837
838
2.10k
    VLOG_NOTICE << "params is "
839
0
                << apache::thrift::ThriftDebugString(ctx->put_result.pipeline_params);
840
    // if we not use streaming, we must download total content before we begin
841
    // to process this load
842
2.10k
    if (!ctx->use_streaming) {
843
21
        return Status::OK();
844
21
    }
845
846
2.08k
    TPipelineFragmentParamsList mocked;
847
2.08k
    return _exec_env->stream_load_executor()->execute_plan_fragment(
848
2.08k
            ctx, mocked, [http_req, this](std::shared_ptr<StreamLoadContext> ctx) {
849
2.08k
                _on_finish(ctx, http_req);
850
2.08k
            });
851
2.10k
}
852
853
Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* file_path,
854
22
                                          int64_t file_bytes) {
855
22
    std::string prefix;
856
22
    RETURN_IF_ERROR(_exec_env->load_path_mgr()->allocate_dir(req->param(HTTP_DB_KEY), "", &prefix,
857
22
                                                             file_bytes));
858
22
    timeval tv;
859
22
    gettimeofday(&tv, nullptr);
860
22
    struct tm tm;
861
22
    time_t cur_sec = tv.tv_sec;
862
22
    localtime_r(&cur_sec, &tm);
863
22
    char buf[64];
864
22
    strftime(buf, 64, "%Y%m%d%H%M%S", &tm);
865
22
    std::stringstream ss;
866
22
    ss << prefix << "/" << req->param(HTTP_TABLE_KEY) << "." << buf << "." << tv.tv_usec;
867
22
    *file_path = ss.str();
868
22
    return Status::OK();
869
22
}
870
871
void StreamLoadAction::_save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx,
872
2.13k
                                                const std::string& str) {
873
2.13k
    std::shared_ptr<StreamLoadRecorder> stream_load_recorder =
874
2.13k
            ExecEnv::GetInstance()->storage_engine().get_stream_load_recorder();
875
876
2.13k
    if (stream_load_recorder != nullptr) {
877
2.13k
        std::string key =
878
2.13k
                std::to_string(ctx->start_millis + ctx->load_cost_millis) + "_" + ctx->label;
879
2.13k
        auto st = stream_load_recorder->put(key, str);
880
2.13k
        if (st.ok()) {
881
2.13k
            LOG(INFO) << "put stream_load_record rocksdb successfully. label: " << ctx->label
882
2.13k
                      << ", key: " << key;
883
2.13k
        }
884
2.13k
    } else {
885
0
        LOG(WARNING) << "put stream_load_record rocksdb failed. stream_load_recorder is null.";
886
0
    }
887
2.13k
}
888
889
Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
890
2.28k
                                              std::shared_ptr<StreamLoadContext> ctx) {
891
2.28k
    std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT);
892
2.28k
    if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") &&
893
2.28k
        !iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) {
894
0
        return Status::InvalidArgument(
895
0
                "group_commit can only be [async_mode, sync_mode, off_mode]");
896
0
    }
897
2.28k
    if (config::wait_internal_group_commit_finish) {
898
0
        group_commit_mode = "sync_mode";
899
0
    }
900
2.28k
    int64_t content_length = req->header(HttpHeaders::CONTENT_LENGTH).empty()
901
2.28k
                                     ? 0
902
2.28k
                                     : std::stoll(req->header(HttpHeaders::CONTENT_LENGTH));
903
2.28k
    if (content_length < 0) {
904
0
        std::stringstream ss;
905
0
        ss << "This stream load content length <0 (" << content_length
906
0
           << "), please check your content length.";
907
0
        LOG(WARNING) << ss.str();
908
0
        return Status::InvalidArgument(ss.str());
909
0
    }
910
    // allow chunked stream load in flink
911
2.28k
    auto is_chunk = !req->header(HttpHeaders::TRANSFER_ENCODING).empty() &&
912
2.28k
                    req->header(HttpHeaders::TRANSFER_ENCODING).find(CHUNK) != std::string::npos;
913
2.28k
    if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode") ||
914
2.28k
        (content_length == 0 && !is_chunk)) {
915
        // off_mode and empty
916
2.26k
        ctx->group_commit = false;
917
2.26k
        return Status::OK();
918
2.26k
    }
919
24
    if (is_chunk) {
920
2
        ctx->label = "";
921
2
    }
922
923
24
    auto partial_columns = !req->header(HTTP_PARTIAL_COLUMNS).empty() &&
924
24
                           iequal(req->header(HTTP_PARTIAL_COLUMNS), "true");
925
24
    auto temp_partitions = !req->header(HTTP_TEMP_PARTITIONS).empty();
926
24
    auto partitions = !req->header(HTTP_PARTITIONS).empty();
927
24
    auto update_mode =
928
24
            !req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty() &&
929
24
            (iequal(req->header(HTTP_UNIQUE_KEY_UPDATE_MODE), "UPDATE_FIXED_COLUMNS") ||
930
1
             iequal(req->header(HTTP_UNIQUE_KEY_UPDATE_MODE), "UPDATE_FLEXIBLE_COLUMNS"));
931
24
    if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit &&
932
24
        !update_mode) {
933
23
        if (!config::wait_internal_group_commit_finish && !ctx->label.empty()) {
934
1
            return Status::InvalidArgument("label and group_commit can't be set at the same time");
935
1
        }
936
22
        ctx->group_commit = true;
937
22
        if (iequal(group_commit_mode, "async_mode")) {
938
19
            if (!load_size_smaller_than_wal_limit(content_length)) {
939
0
                std::stringstream ss;
940
0
                ss << "There is no space for group commit stream load async WAL. This stream load "
941
0
                      "size is "
942
0
                   << content_length << ". WAL dir info: "
943
0
                   << ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string();
944
0
                LOG(WARNING) << ss.str();
945
0
                return Status::Error<EXCEEDED_LIMIT>(ss.str());
946
0
            }
947
19
        }
948
22
    }
949
23
    return Status::OK();
950
24
}
951
952
} // namespace doris