Coverage Report

Created: 2026-04-10 10:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/service/http/action/stream_load.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "service/http/action/stream_load.h"
19
20
// use string iequal
21
#include <event2/buffer.h>
22
#include <event2/http.h>
23
#include <gen_cpp/FrontendService.h>
24
#include <gen_cpp/FrontendService_types.h>
25
#include <gen_cpp/HeartbeatService_types.h>
26
#include <gen_cpp/PaloInternalService_types.h>
27
#include <gen_cpp/PlanNodes_types.h>
28
#include <gen_cpp/Types_types.h>
29
#include <sys/time.h>
30
#include <thrift/protocol/TDebugProtocol.h>
31
32
#include <algorithm>
33
#include <cstdint>
34
#include <cstdlib>
35
#include <ctime>
36
#include <functional>
37
#include <future>
38
#include <sstream>
39
#include <stdexcept>
40
#include <utility>
41
42
#include "cloud/config.h"
43
#include "common/config.h"
44
#include "common/consts.h"
45
#include "common/logging.h"
46
#include "common/metrics/doris_metrics.h"
47
#include "common/metrics/metrics.h"
48
#include "common/status.h"
49
#include "common/utils.h"
50
#include "io/fs/stream_load_pipe.h"
51
#include "load/group_commit/group_commit_mgr.h"
52
#include "load/load_path_mgr.h"
53
#include "load/message_body_sink.h"
54
#include "load/stream_load/new_load_stream_mgr.h"
55
#include "load/stream_load/stream_load_context.h"
56
#include "load/stream_load/stream_load_executor.h"
57
#include "load/stream_load/stream_load_recorder.h"
58
#include "runtime/exec_env.h"
59
#include "service/http/http_channel.h"
60
#include "service/http/http_common.h"
61
#include "service/http/http_headers.h"
62
#include "service/http/http_request.h"
63
#include "service/http/utils.h"
64
#include "storage/storage_engine.h"
65
#include "util/byte_buffer.h"
66
#include "util/client_cache.h"
67
#include "util/load_util.h"
68
#include "util/string_util.h"
69
#include "util/thrift_rpc_helper.h"
70
#include "util/time.h"
71
#include "util/uid_util.h"
72
#include "util/url_coding.h"
73
74
namespace doris {
75
using namespace ErrorCode;
76
77
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_requests_total, MetricUnit::REQUESTS);
78
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_duration_ms, MetricUnit::MILLISECONDS);
79
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_current_processing, MetricUnit::REQUESTS);
80
81
bvar::LatencyRecorder g_stream_load_receive_data_latency_ms("stream_load_receive_data_latency_ms");
82
bvar::LatencyRecorder g_stream_load_commit_and_publish_latency_ms("stream_load",
83
                                                                  "commit_and_publish_ms");
84
85
static constexpr size_t MIN_CHUNK_SIZE = 64 * 1024;
86
static const std::string CHUNK = "chunked";
87
static const std::string OFF_MODE = "off_mode";
88
static const std::string SYNC_MODE = "sync_mode";
89
static const std::string ASYNC_MODE = "async_mode";
90
91
#ifdef BE_TEST
92
TStreamLoadPutResult k_stream_load_put_result;
93
#endif
94
95
StreamLoadAction::StreamLoadAction(ExecEnv* exec_env)
96
1
        : HttpHandlerWithAuth(exec_env, TPrivilegeHier::GLOBAL, TPrivilegeType::LOAD) {
97
    // Use LOAD privilege type: requires LOAD permission
98
1
    _stream_load_entity =
99
1
            DorisMetrics::instance()->metric_registry()->register_entity("stream_load");
100
1
    INT_COUNTER_METRIC_REGISTER(_stream_load_entity, streaming_load_requests_total);
101
1
    INT_COUNTER_METRIC_REGISTER(_stream_load_entity, streaming_load_duration_ms);
102
1
    INT_GAUGE_METRIC_REGISTER(_stream_load_entity, streaming_load_current_processing);
103
1
}
104
105
1
StreamLoadAction::~StreamLoadAction() {
106
1
    DorisMetrics::instance()->metric_registry()->deregister_entity(_stream_load_entity);
107
1
}
108
109
0
void StreamLoadAction::handle(HttpRequest* req) {
110
0
    std::shared_ptr<StreamLoadContext> ctx =
111
0
            std::static_pointer_cast<StreamLoadContext>(req->handler_ctx());
112
0
    if (ctx == nullptr) {
113
0
        return;
114
0
    }
115
116
0
    {
117
0
        std::unique_lock<std::mutex> lock1(ctx->_send_reply_lock);
118
0
        ctx->_can_send_reply = true;
119
0
        ctx->_can_send_reply_cv.notify_all();
120
0
    }
121
122
    // status already set to fail
123
0
    if (ctx->status.ok()) {
124
0
        ctx->status = _handle(ctx, req);
125
0
        if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
126
0
            _send_reply(ctx, req);
127
0
        }
128
0
    }
129
0
}
130
131
0
Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req) {
132
0
    if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
133
0
        LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes
134
0
                     << ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id;
135
0
        return Status::Error<ErrorCode::NETWORK_ERROR>("receive body don't equal with body bytes");
136
0
    }
137
138
    // if we use non-streaming, MessageBodyFileSink.finish will close the file
139
0
    RETURN_IF_ERROR(ctx->body_sink->finish());
140
0
    if (!ctx->use_streaming) {
141
        // we need to close file first, then execute_plan_fragment here
142
0
        ctx->body_sink.reset();
143
0
        TPipelineFragmentParamsList mocked;
144
0
        RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(
145
0
                ctx, mocked,
146
0
                [req, this](std::shared_ptr<StreamLoadContext> ctx) { _on_finish(ctx, req); }));
147
0
    }
148
149
0
    return Status::OK();
150
0
}
151
152
0
void StreamLoadAction::_on_finish(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req) {
153
0
    ctx->status = ctx->load_status_future.get();
154
0
    if (ctx->status.ok()) {
155
0
        if (ctx->group_commit) {
156
0
            LOG(INFO) << "skip commit because this is group commit, pipe_id="
157
0
                      << ctx->id.to_string();
158
0
        } else if (ctx->two_phase_commit) {
159
0
            int64_t pre_commit_start_time = MonotonicNanos();
160
0
            ctx->status = _exec_env->stream_load_executor()->pre_commit_txn(ctx.get());
161
0
            ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time;
162
0
        } else {
163
            // If put file success we need commit this load
164
0
            int64_t commit_and_publish_start_time = MonotonicNanos();
165
0
            ctx->status = _exec_env->stream_load_executor()->commit_txn(ctx.get());
166
0
            ctx->commit_and_publish_txn_cost_nanos =
167
0
                    MonotonicNanos() - commit_and_publish_start_time;
168
0
            g_stream_load_commit_and_publish_latency_ms
169
0
                    << ctx->commit_and_publish_txn_cost_nanos / 1000000;
170
0
        }
171
0
    }
172
0
    _send_reply(ctx, req);
173
0
}
174
175
0
void StreamLoadAction::_send_reply(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req) {
176
0
    std::unique_lock<std::mutex> lock1(ctx->_send_reply_lock);
177
    // 1. _can_send_reply: ensure `send_reply` is invoked only after on_header/handle complete,
178
    //    avoid client errors (e.g., broken pipe).
179
    // 2. _finish_send_reply: Prevent duplicate reply sending; skip reply if HTTP request is canceled
180
    //    due to long import execution time.
181
0
    while (!ctx->_finish_send_reply && !ctx->_can_send_reply) {
182
0
        ctx->_can_send_reply_cv.wait(lock1);
183
0
    }
184
0
    if (ctx->_finish_send_reply) {
185
0
        return;
186
0
    }
187
0
    DCHECK(ctx->_can_send_reply);
188
0
    ctx->_finish_send_reply = true;
189
0
    ctx->_can_send_reply_cv.notify_all();
190
0
    ctx->load_cost_millis = UnixMillis() - ctx->start_millis;
191
192
0
    if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
193
0
        LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
194
0
                     << ", errmsg=" << ctx->status;
195
0
        if (ctx->need_rollback) {
196
0
            _exec_env->stream_load_executor()->rollback_txn(ctx.get());
197
0
            ctx->need_rollback = false;
198
0
        }
199
0
        if (ctx->body_sink != nullptr) {
200
0
            ctx->body_sink->cancel(ctx->status.to_string());
201
0
        }
202
0
    }
203
204
0
    auto str = ctx->to_json();
205
    // add new line at end
206
0
    str = str + '\n';
207
208
#ifndef BE_TEST
209
    if (config::enable_stream_load_record || config::enable_stream_load_record_to_audit_log_table) {
210
        if (req->header(HTTP_SKIP_RECORD_TO_AUDIT_LOG_TABLE).empty()) {
211
            str = ctx->prepare_stream_load_record(str);
212
            _save_stream_load_record(ctx, str);
213
        }
214
    }
215
#endif
216
217
0
    HttpChannel::send_reply(req, str);
218
219
0
    LOG(INFO) << "finished to execute stream load. label=" << ctx->label
220
0
              << ", txn_id=" << ctx->txn_id << ", query_id=" << ctx->id
221
0
              << ", load_cost_ms=" << ctx->load_cost_millis << ", receive_data_cost_ms="
222
0
              << (ctx->receive_and_read_data_cost_nanos - ctx->read_data_cost_nanos) / 1000000
223
0
              << ", read_data_cost_ms=" << ctx->read_data_cost_nanos / 1000000
224
0
              << ", write_data_cost_ms=" << ctx->write_data_cost_nanos / 1000000
225
0
              << ", commit_and_publish_txn_cost_ms="
226
0
              << ctx->commit_and_publish_txn_cost_nanos / 1000000
227
0
              << ", number_total_rows=" << ctx->number_total_rows
228
0
              << ", number_loaded_rows=" << ctx->number_loaded_rows
229
0
              << ", receive_bytes=" << ctx->receive_bytes << ", loaded_bytes=" << ctx->loaded_bytes
230
0
              << ", error_url=" << ctx->error_url;
231
232
    // update statistics
233
0
    streaming_load_requests_total->increment(1);
234
0
    streaming_load_duration_ms->increment(ctx->load_cost_millis);
235
0
    if (!ctx->data_saved_path.empty()) {
236
0
        _exec_env->load_path_mgr()->clean_tmp_files(ctx->data_saved_path);
237
0
    }
238
0
}
239
240
0
int StreamLoadAction::on_header(HttpRequest* req) {
241
    // Call parent's auth check first
242
0
    int ret = HttpHandlerWithAuth::on_header(req);
243
0
    if (ret != 0) {
244
0
        return ret; // Auth failed, return error
245
0
    }
246
247
    // Continue with stream load specific header processing
248
0
    req->mark_send_reply();
249
250
0
    streaming_load_current_processing->increment(1);
251
252
0
    std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env);
253
0
    req->set_handler_ctx(ctx);
254
255
0
    ctx->load_type = TLoadType::MANUL_LOAD;
256
0
    ctx->load_src_type = TLoadSourceType::RAW;
257
258
0
    url_decode(req->param(HTTP_DB_KEY), &ctx->db);
259
0
    url_decode(req->param(HTTP_TABLE_KEY), &ctx->table);
260
0
    ctx->label = req->header(HTTP_LABEL_KEY);
261
0
    ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true";
262
0
    Status st = _handle_group_commit(req, ctx);
263
0
    if (!ctx->group_commit && ctx->label.empty()) {
264
0
        ctx->label = generate_uuid_string();
265
0
    }
266
267
0
    LOG(INFO) << "new income streaming load request." << ctx->brief() << ", db=" << ctx->db
268
0
              << ", tbl=" << ctx->table << ", group_commit=" << ctx->group_commit
269
0
              << ", group_commit_mode=" << ctx->group_commit_mode
270
0
              << ", HTTP headers=" << req->get_all_headers();
271
0
    ctx->begin_receive_and_read_data_cost_nanos = MonotonicNanos();
272
273
0
    if (st.ok()) {
274
0
        st = _on_header(req, ctx);
275
0
        LOG(INFO) << "finished to handle HTTP header, " << ctx->brief();
276
0
    }
277
0
    if (!st.ok()) {
278
0
        ctx->status = std::move(st);
279
0
        {
280
0
            std::unique_lock<std::mutex> lock1(ctx->_send_reply_lock);
281
0
            ctx->_can_send_reply = true;
282
0
            ctx->_can_send_reply_cv.notify_all();
283
0
        }
284
0
        _send_reply(ctx, req);
285
0
        return -1;
286
0
    }
287
0
    return 0;
288
0
}
289
290
0
Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx) {
291
    // auth information
292
0
    if (!parse_basic_auth(*http_req, &ctx->auth)) {
293
0
        LOG(WARNING) << "parse basic authorization failed." << ctx->brief();
294
0
        return Status::NotAuthorized("no valid Basic authorization");
295
0
    }
296
297
    // get format of this put
298
0
    std::string format_str = http_req->header(HTTP_FORMAT_KEY);
299
0
    if (iequal(format_str, BeConsts::CSV_WITH_NAMES) ||
300
0
        iequal(format_str, BeConsts::CSV_WITH_NAMES_AND_TYPES)) {
301
0
        ctx->header_type = format_str;
302
        //treat as CSV
303
0
        format_str = BeConsts::CSV;
304
0
    }
305
0
    LoadUtil::parse_format(format_str, http_req->header(HTTP_COMPRESS_TYPE), &ctx->format,
306
0
                           &ctx->compress_type);
307
0
    if (ctx->format == TFileFormatType::FORMAT_UNKNOWN) {
308
0
        return Status::Error<ErrorCode::DATA_FILE_TYPE_ERROR>("unknown data format, format={}",
309
0
                                                              http_req->header(HTTP_FORMAT_KEY));
310
0
    }
311
312
    // check content length
313
0
    ctx->body_bytes = 0;
314
0
    size_t csv_max_body_bytes = config::streaming_load_max_mb * 1024 * 1024;
315
0
    size_t json_max_body_bytes = config::streaming_load_json_max_mb * 1024 * 1024;
316
0
    bool read_json_by_line = false;
317
0
    if (!http_req->header(HTTP_READ_JSON_BY_LINE).empty()) {
318
0
        if (iequal(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) {
319
0
            read_json_by_line = true;
320
0
        }
321
0
    }
322
0
    if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
323
0
        try {
324
0
            ctx->body_bytes = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
325
0
        } catch (const std::exception& e) {
326
0
            return Status::InvalidArgument("invalid HTTP header CONTENT_LENGTH={}: {}",
327
0
                                           http_req->header(HttpHeaders::CONTENT_LENGTH), e.what());
328
0
        }
329
        // json max body size
330
0
        if ((ctx->format == TFileFormatType::FORMAT_JSON) &&
331
0
            (ctx->body_bytes > json_max_body_bytes) && !read_json_by_line) {
332
0
            return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
333
0
                    "json body size {} exceed BE's conf `streaming_load_json_max_mb` {}. increase "
334
0
                    "it if you are sure this load is reasonable",
335
0
                    ctx->body_bytes, json_max_body_bytes);
336
0
        }
337
        // csv max body size
338
0
        else if (ctx->body_bytes > csv_max_body_bytes) {
339
0
            LOG(WARNING) << "body exceed max size." << ctx->brief();
340
0
            return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
341
0
                    "body size {} exceed BE's conf `streaming_load_max_mb` {}. increase it if you "
342
0
                    "are sure this load is reasonable",
343
0
                    ctx->body_bytes, csv_max_body_bytes);
344
0
        }
345
0
    } else {
346
#ifndef BE_TEST
347
        evhttp_connection_set_max_body_size(
348
                evhttp_request_get_connection(http_req->get_evhttp_request()), csv_max_body_bytes);
349
#endif
350
0
    }
351
352
0
    if (!http_req->header(HttpHeaders::TRANSFER_ENCODING).empty()) {
353
0
        if (http_req->header(HttpHeaders::TRANSFER_ENCODING).find(CHUNK) != std::string::npos) {
354
0
            ctx->is_chunked_transfer = true;
355
0
        }
356
0
    }
357
0
    if (UNLIKELY((http_req->header(HttpHeaders::CONTENT_LENGTH).empty() &&
358
0
                  !ctx->is_chunked_transfer))) {
359
0
        LOG(WARNING) << "content_length is empty and transfer-encoding!=chunked, please set "
360
0
                        "content_length or transfer-encoding=chunked";
361
0
        return Status::InvalidArgument(
362
0
                "content_length is empty and transfer-encoding!=chunked, please set content_length "
363
0
                "or transfer-encoding=chunked");
364
0
    } else if (UNLIKELY(!http_req->header(HttpHeaders::CONTENT_LENGTH).empty() &&
365
0
                        ctx->is_chunked_transfer)) {
366
0
        LOG(WARNING) << "please do not set both content_length and transfer-encoding";
367
0
        return Status::InvalidArgument(
368
0
                "please do not set both content_length and transfer-encoding");
369
0
    }
370
371
0
    if (!http_req->header(HTTP_TIMEOUT).empty()) {
372
0
        ctx->timeout_second = DORIS_TRY(safe_stoi(http_req->header(HTTP_TIMEOUT), HTTP_TIMEOUT));
373
0
    }
374
0
    if (!http_req->header(HTTP_COMMENT).empty()) {
375
0
        ctx->load_comment = http_req->header(HTTP_COMMENT);
376
0
    }
377
    // begin transaction
378
0
    if (!ctx->group_commit) {
379
0
        int64_t begin_txn_start_time = MonotonicNanos();
380
0
        RETURN_IF_ERROR(_exec_env->stream_load_executor()->begin_txn(ctx.get()));
381
0
        ctx->begin_txn_cost_nanos = MonotonicNanos() - begin_txn_start_time;
382
0
        if (ctx->group_commit) {
383
0
            RETURN_IF_ERROR(_check_wal_space(ctx->group_commit_mode, ctx->body_bytes));
384
0
        }
385
0
    }
386
387
    // process put file
388
0
    return _process_put(http_req, ctx);
389
0
}
390
391
0
void StreamLoadAction::on_chunk_data(HttpRequest* req) {
392
0
    std::shared_ptr<StreamLoadContext> ctx =
393
0
            std::static_pointer_cast<StreamLoadContext>(req->handler_ctx());
394
0
    if (ctx == nullptr || !ctx->status.ok()) {
395
0
        return;
396
0
    }
397
398
0
    struct evhttp_request* ev_req = req->get_evhttp_request();
399
0
    auto evbuf = evhttp_request_get_input_buffer(ev_req);
400
401
0
    SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
402
403
0
    int64_t start_read_data_time = MonotonicNanos();
404
0
    while (evbuffer_get_length(evbuf) > 0) {
405
0
        ByteBufferPtr bb;
406
0
        Status st = ByteBuffer::allocate(128 * 1024, &bb);
407
0
        if (!st.ok()) {
408
0
            ctx->status = st;
409
0
            return;
410
0
        }
411
0
        auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
412
0
        bb->pos = remove_bytes;
413
0
        bb->flip();
414
0
        st = ctx->body_sink->append(bb);
415
0
        if (!st.ok()) {
416
0
            LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief();
417
0
            ctx->status = st;
418
0
            return;
419
0
        }
420
0
        ctx->receive_bytes += remove_bytes;
421
0
    }
422
0
    int64_t read_data_time = MonotonicNanos() - start_read_data_time;
423
0
    int64_t last_receive_and_read_data_cost_nanos = ctx->receive_and_read_data_cost_nanos;
424
0
    ctx->read_data_cost_nanos += read_data_time;
425
0
    ctx->receive_and_read_data_cost_nanos =
426
0
            MonotonicNanos() - ctx->begin_receive_and_read_data_cost_nanos;
427
0
    g_stream_load_receive_data_latency_ms
428
0
            << (ctx->receive_and_read_data_cost_nanos - last_receive_and_read_data_cost_nanos -
429
0
                read_data_time) /
430
0
                       1000000;
431
0
}
432
433
0
void StreamLoadAction::free_handler_ctx(std::shared_ptr<void> param) {
434
0
    std::shared_ptr<StreamLoadContext> ctx = std::static_pointer_cast<StreamLoadContext>(param);
435
0
    if (ctx == nullptr) {
436
0
        return;
437
0
    }
438
    // sender is gone, make receiver know it
439
0
    if (ctx->body_sink != nullptr) {
440
0
        ctx->body_sink->cancel("sender is gone");
441
0
    }
442
    // remove stream load context from stream load manager and the resource will be released
443
0
    ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
444
0
    streaming_load_current_processing->increment(-1);
445
0
}
446
447
Status StreamLoadAction::_process_put(HttpRequest* http_req,
448
0
                                      std::shared_ptr<StreamLoadContext> ctx) {
449
    // Now we use stream
450
0
    ctx->use_streaming = LoadUtil::is_format_support_streaming(ctx->format);
451
452
    // put request
453
0
    TStreamLoadPutRequest request;
454
0
    set_request_auth(&request, ctx->auth);
455
0
    request.db = ctx->db;
456
0
    request.tbl = ctx->table;
457
0
    request.txnId = ctx->txn_id;
458
0
    request.formatType = ctx->format;
459
0
    request.__set_compress_type(ctx->compress_type);
460
0
    request.__set_header_type(ctx->header_type);
461
0
    request.__set_loadId(ctx->id.to_thrift());
462
0
    if (ctx->use_streaming) {
463
0
        std::shared_ptr<io::StreamLoadPipe> pipe;
464
0
        if (ctx->is_chunked_transfer) {
465
0
            pipe = std::make_shared<io::StreamLoadPipe>(
466
0
                    io::kMaxPipeBufferedBytes /* max_buffered_bytes */);
467
0
            pipe->set_is_chunked_transfer(true);
468
0
        } else {
469
0
            pipe = std::make_shared<io::StreamLoadPipe>(
470
0
                    io::kMaxPipeBufferedBytes /* max_buffered_bytes */,
471
0
                    MIN_CHUNK_SIZE /* min_chunk_size */, ctx->body_bytes /* total_length */);
472
0
        }
473
0
        request.fileType = TFileType::FILE_STREAM;
474
0
        ctx->body_sink = pipe;
475
0
        ctx->pipe = pipe;
476
0
        RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, ctx));
477
0
    } else {
478
0
        RETURN_IF_ERROR(_data_saved_path(http_req, &request.path, ctx->body_bytes));
479
0
        auto file_sink = std::make_shared<MessageBodyFileSink>(request.path);
480
0
        RETURN_IF_ERROR(file_sink->open());
481
0
        request.__isset.path = true;
482
0
        request.fileType = TFileType::FILE_LOCAL;
483
0
        request.__set_file_size(ctx->body_bytes);
484
0
        ctx->body_sink = file_sink;
485
0
        ctx->data_saved_path = request.path;
486
0
    }
487
0
    if (!http_req->header(HTTP_COLUMNS).empty()) {
488
0
        request.__set_columns(http_req->header(HTTP_COLUMNS));
489
0
    }
490
0
    if (!http_req->header(HTTP_WHERE).empty()) {
491
0
        request.__set_where(http_req->header(HTTP_WHERE));
492
0
    }
493
0
    if (!http_req->header(HTTP_COLUMN_SEPARATOR).empty()) {
494
0
        request.__set_columnSeparator(http_req->header(HTTP_COLUMN_SEPARATOR));
495
0
    }
496
0
    if (!http_req->header(HTTP_LINE_DELIMITER).empty()) {
497
0
        request.__set_line_delimiter(http_req->header(HTTP_LINE_DELIMITER));
498
0
    }
499
0
    if (!http_req->header(HTTP_ENCLOSE).empty() && !http_req->header(HTTP_ENCLOSE).empty()) {
500
0
        const auto& enclose_str = http_req->header(HTTP_ENCLOSE);
501
0
        if (enclose_str.length() != 1) {
502
0
            return Status::InvalidArgument("enclose must be single-char, actually is {}",
503
0
                                           enclose_str);
504
0
        }
505
0
        request.__set_enclose(http_req->header(HTTP_ENCLOSE)[0]);
506
0
    }
507
0
    if (!http_req->header(HTTP_ESCAPE).empty() && !http_req->header(HTTP_ESCAPE).empty()) {
508
0
        const auto& escape_str = http_req->header(HTTP_ESCAPE);
509
0
        if (escape_str.length() != 1) {
510
0
            return Status::InvalidArgument("escape must be single-char, actually is {}",
511
0
                                           escape_str);
512
0
        }
513
0
        request.__set_escape(http_req->header(HTTP_ESCAPE)[0]);
514
0
    }
515
0
    if (!http_req->header(HTTP_PARTITIONS).empty()) {
516
0
        request.__set_partitions(http_req->header(HTTP_PARTITIONS));
517
0
        request.__set_isTempPartition(false);
518
0
        if (!http_req->header(HTTP_TEMP_PARTITIONS).empty()) {
519
0
            return Status::InvalidArgument(
520
0
                    "Can not specify both partitions and temporary partitions");
521
0
        }
522
0
    }
523
0
    if (!http_req->header(HTTP_TEMP_PARTITIONS).empty()) {
524
0
        request.__set_partitions(http_req->header(HTTP_TEMP_PARTITIONS));
525
0
        request.__set_isTempPartition(true);
526
0
        if (!http_req->header(HTTP_PARTITIONS).empty()) {
527
0
            return Status::InvalidArgument(
528
0
                    "Can not specify both partitions and temporary partitions");
529
0
        }
530
0
    }
531
0
    if (!http_req->header(HTTP_NEGATIVE).empty() && http_req->header(HTTP_NEGATIVE) == "true") {
532
0
        request.__set_negative(true);
533
0
    } else {
534
0
        request.__set_negative(false);
535
0
    }
536
0
    bool strictMode = false;
537
0
    if (!http_req->header(HTTP_STRICT_MODE).empty()) {
538
0
        if (iequal(http_req->header(HTTP_STRICT_MODE), "false")) {
539
0
            strictMode = false;
540
0
        } else if (iequal(http_req->header(HTTP_STRICT_MODE), "true")) {
541
0
            strictMode = true;
542
0
        } else {
543
0
            return Status::InvalidArgument("Invalid strict mode format. Must be bool type");
544
0
        }
545
0
        request.__set_strictMode(strictMode);
546
0
    }
547
    // timezone first. if not, try system time_zone
548
0
    if (!http_req->header(HTTP_TIMEZONE).empty()) {
549
0
        request.__set_timezone(http_req->header(HTTP_TIMEZONE));
550
0
    } else if (!http_req->header(HTTP_TIME_ZONE).empty()) {
551
0
        request.__set_timezone(http_req->header(HTTP_TIME_ZONE));
552
0
    }
553
0
    if (!http_req->header(HTTP_EXEC_MEM_LIMIT).empty()) {
554
0
        try {
555
0
            request.__set_execMemLimit(std::stoll(http_req->header(HTTP_EXEC_MEM_LIMIT)));
556
0
        } catch (const std::invalid_argument& e) {
557
0
            return Status::InvalidArgument("Invalid mem limit format, {}", e.what());
558
0
        }
559
0
    }
560
0
    if (!http_req->header(HTTP_JSONPATHS).empty()) {
561
0
        request.__set_jsonpaths(http_req->header(HTTP_JSONPATHS));
562
0
    }
563
0
    if (!http_req->header(HTTP_JSONROOT).empty()) {
564
0
        request.__set_json_root(http_req->header(HTTP_JSONROOT));
565
0
    }
566
0
    if (!http_req->header(HTTP_STRIP_OUTER_ARRAY).empty()) {
567
0
        if (iequal(http_req->header(HTTP_STRIP_OUTER_ARRAY), "true")) {
568
0
            request.__set_strip_outer_array(true);
569
0
        } else {
570
0
            request.__set_strip_outer_array(false);
571
0
        }
572
0
    } else {
573
0
        request.__set_strip_outer_array(false);
574
0
    }
575
576
0
    if (!http_req->header(HTTP_READ_JSON_BY_LINE).empty()) {
577
0
        if (iequal(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) {
578
0
            request.__set_read_json_by_line(true);
579
0
        } else {
580
0
            request.__set_read_json_by_line(false);
581
0
        }
582
0
    } else {
583
0
        request.__set_read_json_by_line(false);
584
0
    }
585
586
0
    if (http_req->header(HTTP_READ_JSON_BY_LINE).empty() &&
587
0
        http_req->header(HTTP_STRIP_OUTER_ARRAY).empty()) {
588
0
        request.__set_read_json_by_line(true);
589
0
        request.__set_strip_outer_array(false);
590
0
    }
591
592
0
    if (!http_req->header(HTTP_NUM_AS_STRING).empty()) {
593
0
        if (iequal(http_req->header(HTTP_NUM_AS_STRING), "true")) {
594
0
            request.__set_num_as_string(true);
595
0
        } else {
596
0
            request.__set_num_as_string(false);
597
0
        }
598
0
    } else {
599
0
        request.__set_num_as_string(false);
600
0
    }
601
0
    if (!http_req->header(HTTP_FUZZY_PARSE).empty()) {
602
0
        if (iequal(http_req->header(HTTP_FUZZY_PARSE), "true")) {
603
0
            request.__set_fuzzy_parse(true);
604
0
        } else {
605
0
            request.__set_fuzzy_parse(false);
606
0
        }
607
0
    } else {
608
0
        request.__set_fuzzy_parse(false);
609
0
    }
610
611
0
    if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) {
612
0
        request.__set_sequence_col(
613
0
                http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL));
614
0
    }
615
616
0
    if (!http_req->header(HTTP_SEND_BATCH_PARALLELISM).empty()) {
617
0
        int parallelism = DORIS_TRY(safe_stoi(http_req->header(HTTP_SEND_BATCH_PARALLELISM),
618
0
                                              HTTP_SEND_BATCH_PARALLELISM));
619
0
        request.__set_send_batch_parallelism(parallelism);
620
0
    }
621
622
0
    if (!http_req->header(HTTP_LOAD_TO_SINGLE_TABLET).empty()) {
623
0
        if (iequal(http_req->header(HTTP_LOAD_TO_SINGLE_TABLET), "true")) {
624
0
            request.__set_load_to_single_tablet(true);
625
0
        } else {
626
0
            request.__set_load_to_single_tablet(false);
627
0
        }
628
0
    }
629
630
0
    if (ctx->timeout_second != -1) {
631
0
        request.__set_timeout(ctx->timeout_second);
632
0
    }
633
0
    request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms);
634
0
    TMergeType::type merge_type = TMergeType::APPEND;
635
0
    StringCaseMap<TMergeType::type> merge_type_map = {{"APPEND", TMergeType::APPEND},
636
0
                                                      {"DELETE", TMergeType::DELETE},
637
0
                                                      {"MERGE", TMergeType::MERGE}};
638
0
    if (!http_req->header(HTTP_MERGE_TYPE).empty()) {
639
0
        std::string merge_type_str = http_req->header(HTTP_MERGE_TYPE);
640
0
        auto iter = merge_type_map.find(merge_type_str);
641
0
        if (iter != merge_type_map.end()) {
642
0
            merge_type = iter->second;
643
0
        } else {
644
0
            return Status::InvalidArgument("Invalid merge type {}", merge_type_str);
645
0
        }
646
0
        if (merge_type == TMergeType::MERGE && http_req->header(HTTP_DELETE_CONDITION).empty()) {
647
0
            return Status::InvalidArgument("Excepted DELETE ON clause when merge type is MERGE.");
648
0
        } else if (merge_type != TMergeType::MERGE &&
649
0
                   !http_req->header(HTTP_DELETE_CONDITION).empty()) {
650
0
            return Status::InvalidArgument(
651
0
                    "Not support DELETE ON clause when merge type is not MERGE.");
652
0
        }
653
0
    }
654
0
    request.__set_merge_type(merge_type);
655
0
    if (!http_req->header(HTTP_DELETE_CONDITION).empty()) {
656
0
        request.__set_delete_condition(http_req->header(HTTP_DELETE_CONDITION));
657
0
    }
658
659
0
    if (!http_req->header(HTTP_MAX_FILTER_RATIO).empty()) {
660
0
        ctx->max_filter_ratio = strtod(http_req->header(HTTP_MAX_FILTER_RATIO).c_str(), nullptr);
661
0
        request.__set_max_filter_ratio(ctx->max_filter_ratio);
662
0
    }
663
664
0
    if (!http_req->header(HTTP_HIDDEN_COLUMNS).empty()) {
665
0
        request.__set_hidden_columns(http_req->header(HTTP_HIDDEN_COLUMNS));
666
0
    }
667
0
    if (!http_req->header(HTTP_TRIM_DOUBLE_QUOTES).empty()) {
668
0
        if (iequal(http_req->header(HTTP_TRIM_DOUBLE_QUOTES), "true")) {
669
0
            request.__set_trim_double_quotes(true);
670
0
        } else {
671
0
            request.__set_trim_double_quotes(false);
672
0
        }
673
0
    }
674
0
    if (!http_req->header(HTTP_SKIP_LINES).empty()) {
675
0
        int skip_lines = DORIS_TRY(safe_stoi(http_req->header(HTTP_SKIP_LINES), HTTP_SKIP_LINES));
676
0
        if (skip_lines < 0) {
677
0
            return Status::InvalidArgument("Invalid 'skip_lines': {}", skip_lines);
678
0
        }
679
0
        request.__set_skip_lines(skip_lines);
680
0
    }
681
0
    if (!http_req->header(HTTP_ENABLE_PROFILE).empty()) {
682
0
        if (iequal(http_req->header(HTTP_ENABLE_PROFILE), "true")) {
683
0
            request.__set_enable_profile(true);
684
0
        } else {
685
0
            request.__set_enable_profile(false);
686
0
        }
687
0
    }
688
689
0
    if (!http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty()) {
690
0
        static const StringCaseMap<TUniqueKeyUpdateMode::type> unique_key_update_mode_map = {
691
0
                {"UPSERT", TUniqueKeyUpdateMode::UPSERT},
692
0
                {"UPDATE_FIXED_COLUMNS", TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS},
693
0
                {"UPDATE_FLEXIBLE_COLUMNS", TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS}};
694
0
        std::string unique_key_update_mode_str = http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE);
695
0
        auto iter = unique_key_update_mode_map.find(unique_key_update_mode_str);
696
0
        if (iter != unique_key_update_mode_map.end()) {
697
0
            TUniqueKeyUpdateMode::type unique_key_update_mode = iter->second;
698
0
            if (unique_key_update_mode == TUniqueKeyUpdateMode::UPDATE_FLEXIBLE_COLUMNS) {
699
                // check constraints when flexible partial update is enabled
700
0
                if (ctx->format != TFileFormatType::FORMAT_JSON) {
701
0
                    return Status::InvalidArgument(
702
0
                            "flexible partial update only support json format as input file "
703
0
                            "currently");
704
0
                }
705
0
                if (!http_req->header(HTTP_FUZZY_PARSE).empty() &&
706
0
                    iequal(http_req->header(HTTP_FUZZY_PARSE), "true")) {
707
0
                    return Status::InvalidArgument(
708
0
                            "Don't support flexible partial update when 'fuzzy_parse' is enabled");
709
0
                }
710
0
                if (!http_req->header(HTTP_COLUMNS).empty()) {
711
0
                    return Status::InvalidArgument(
712
0
                            "Don't support flexible partial update when 'columns' is specified");
713
0
                }
714
0
                if (!http_req->header(HTTP_JSONPATHS).empty()) {
715
0
                    return Status::InvalidArgument(
716
0
                            "Don't support flexible partial update when 'jsonpaths' is specified");
717
0
                }
718
0
                if (!http_req->header(HTTP_HIDDEN_COLUMNS).empty()) {
719
0
                    return Status::InvalidArgument(
720
0
                            "Don't support flexible partial update when 'hidden_columns' is "
721
0
                            "specified");
722
0
                }
723
0
                if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) {
724
0
                    return Status::InvalidArgument(
725
0
                            "Don't support flexible partial update when "
726
0
                            "'function_column.sequence_col' is specified");
727
0
                }
728
0
                if (!http_req->header(HTTP_MERGE_TYPE).empty()) {
729
0
                    return Status::InvalidArgument(
730
0
                            "Don't support flexible partial update when "
731
0
                            "'merge_type' is specified");
732
0
                }
733
0
                if (!http_req->header(HTTP_WHERE).empty()) {
734
0
                    return Status::InvalidArgument(
735
0
                            "Don't support flexible partial update when "
736
0
                            "'where' is specified");
737
0
                }
738
0
            }
739
0
            request.__set_unique_key_update_mode(unique_key_update_mode);
740
0
        } else {
741
0
            return Status::InvalidArgument(
742
0
                    "Invalid unique_key_partial_mode {}, must be one of 'UPSERT', "
743
0
                    "'UPDATE_FIXED_COLUMNS' or 'UPDATE_FLEXIBLE_COLUMNS'",
744
0
                    unique_key_update_mode_str);
745
0
        }
746
0
    }
747
748
0
    if (http_req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty() &&
749
0
        !http_req->header(HTTP_PARTIAL_COLUMNS).empty()) {
750
        // only consider `partial_columns` parameter when `unique_key_update_mode` is not set
751
0
        if (iequal(http_req->header(HTTP_PARTIAL_COLUMNS), "true")) {
752
0
            request.__set_unique_key_update_mode(TUniqueKeyUpdateMode::UPDATE_FIXED_COLUMNS);
753
            // for backward compatibility
754
0
            request.__set_partial_update(true);
755
0
        }
756
0
    }
757
758
0
    if (!http_req->header(HTTP_PARTIAL_UPDATE_NEW_ROW_POLICY).empty()) {
759
0
        static const std::map<std::string, TPartialUpdateNewRowPolicy::type> policy_map {
760
0
                {"APPEND", TPartialUpdateNewRowPolicy::APPEND},
761
0
                {"ERROR", TPartialUpdateNewRowPolicy::ERROR}};
762
763
0
        auto policy_name = http_req->header(HTTP_PARTIAL_UPDATE_NEW_ROW_POLICY);
764
0
        std::transform(policy_name.begin(), policy_name.end(), policy_name.begin(),
765
0
                       [](unsigned char c) { return std::toupper(c); });
766
0
        auto it = policy_map.find(policy_name);
767
0
        if (it == policy_map.end()) {
768
0
            return Status::InvalidArgument(
769
0
                    "Invalid partial_update_new_key_behavior {}, must be one of {'APPEND', "
770
0
                    "'ERROR'}",
771
0
                    policy_name);
772
0
        }
773
0
        request.__set_partial_update_new_key_policy(it->second);
774
0
    }
775
776
0
    if (!http_req->header(HTTP_MEMTABLE_ON_SINKNODE).empty()) {
777
0
        bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), "true");
778
0
        request.__set_memtable_on_sink_node(value);
779
0
    }
780
0
    if (!http_req->header(HTTP_LOAD_STREAM_PER_NODE).empty()) {
781
0
        int stream_per_node = DORIS_TRY(
782
0
                safe_stoi(http_req->header(HTTP_LOAD_STREAM_PER_NODE), HTTP_LOAD_STREAM_PER_NODE));
783
0
        request.__set_stream_per_node(stream_per_node);
784
0
    }
785
0
    if (ctx->group_commit) {
786
0
        request.__set_group_commit_mode(ctx->group_commit_mode);
787
0
    }
788
789
0
    if (!http_req->header(HTTP_COMPUTE_GROUP).empty()) {
790
0
        request.__set_cloud_cluster(http_req->header(HTTP_COMPUTE_GROUP));
791
0
    } else if (!http_req->header(HTTP_CLOUD_CLUSTER).empty()) {
792
0
        request.__set_cloud_cluster(http_req->header(HTTP_CLOUD_CLUSTER));
793
0
    }
794
795
0
    if (!http_req->header(HTTP_EMPTY_FIELD_AS_NULL).empty()) {
796
0
        if (iequal(http_req->header(HTTP_EMPTY_FIELD_AS_NULL), "true")) {
797
0
            request.__set_empty_field_as_null(true);
798
0
        }
799
0
    }
800
801
#ifndef BE_TEST
802
    // plan this load
803
    TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
804
    int64_t stream_load_put_start_time = MonotonicNanos();
805
    RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
806
            master_addr.hostname, master_addr.port,
807
            [&request, ctx](FrontendServiceConnection& client) {
808
                client->streamLoadPut(ctx->put_result, request);
809
            }));
810
    ctx->stream_load_put_cost_nanos = MonotonicNanos() - stream_load_put_start_time;
811
#else
812
0
    ctx->put_result = k_stream_load_put_result;
813
0
#endif
814
0
    Status plan_status(Status::create(ctx->put_result.status));
815
0
    if (!plan_status.ok()) {
816
0
        LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << ctx->brief();
817
0
        return plan_status;
818
0
    }
819
0
    DCHECK(ctx->put_result.__isset.pipeline_params);
820
0
    ctx->put_result.pipeline_params.query_options.__set_enable_strict_cast(false);
821
0
    ctx->put_result.pipeline_params.query_options.__set_enable_insert_strict(strictMode);
822
0
    if (config::is_cloud_mode() && ctx->two_phase_commit && ctx->is_mow_table()) {
823
0
        return Status::NotSupported("stream load 2pc is unsupported for mow table");
824
0
    }
825
0
    if (iequal(ctx->group_commit_mode, ASYNC_MODE)) {
826
        // FIXME find a way to avoid chunked stream load write large WALs
827
0
        size_t content_length = 0;
828
0
        if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
829
0
            try {
830
0
                content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
831
0
            } catch (const std::exception& e) {
832
0
                return Status::InvalidArgument("invalid HTTP header CONTENT_LENGTH={}: {}",
833
0
                                               http_req->header(HttpHeaders::CONTENT_LENGTH),
834
0
                                               e.what());
835
0
            }
836
0
            if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
837
0
                ctx->format == TFileFormatType::FORMAT_CSV_LZO ||
838
0
                ctx->format == TFileFormatType::FORMAT_CSV_BZ2 ||
839
0
                ctx->format == TFileFormatType::FORMAT_CSV_LZ4FRAME ||
840
0
                ctx->format == TFileFormatType::FORMAT_CSV_LZOP ||
841
0
                ctx->format == TFileFormatType::FORMAT_CSV_LZ4BLOCK ||
842
0
                ctx->format == TFileFormatType::FORMAT_CSV_SNAPPYBLOCK) {
843
0
                content_length *= 3;
844
0
            }
845
0
        }
846
0
        ctx->put_result.pipeline_params.__set_content_length(content_length);
847
0
    }
848
849
0
    VLOG_NOTICE << "params is "
850
0
                << apache::thrift::ThriftDebugString(ctx->put_result.pipeline_params);
851
    // if we not use streaming, we must download total content before we begin
852
    // to process this load
853
0
    if (!ctx->use_streaming) {
854
0
        return Status::OK();
855
0
    }
856
857
0
    TPipelineFragmentParamsList mocked;
858
0
    return _exec_env->stream_load_executor()->execute_plan_fragment(
859
0
            ctx, mocked, [http_req, this](std::shared_ptr<StreamLoadContext> ctx) {
860
0
                _on_finish(ctx, http_req);
861
0
            });
862
0
}
863
864
Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* file_path,
865
0
                                          int64_t file_bytes) {
866
0
    std::string prefix;
867
0
    RETURN_IF_ERROR(_exec_env->load_path_mgr()->allocate_dir(req->param(HTTP_DB_KEY), "", &prefix,
868
0
                                                             file_bytes));
869
0
    timeval tv;
870
0
    gettimeofday(&tv, nullptr);
871
0
    struct tm tm;
872
0
    time_t cur_sec = tv.tv_sec;
873
0
    localtime_r(&cur_sec, &tm);
874
0
    char buf[64];
875
0
    strftime(buf, 64, "%Y%m%d%H%M%S", &tm);
876
0
    std::stringstream ss;
877
0
    ss << prefix << "/" << req->param(HTTP_TABLE_KEY) << "." << buf << "." << tv.tv_usec;
878
0
    *file_path = ss.str();
879
0
    return Status::OK();
880
0
}
881
882
void StreamLoadAction::_save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx,
883
0
                                                const std::string& str) {
884
0
    std::shared_ptr<StreamLoadRecorder> stream_load_recorder =
885
0
            ExecEnv::GetInstance()->storage_engine().get_stream_load_recorder();
886
887
0
    if (stream_load_recorder != nullptr) {
888
0
        std::string key =
889
0
                std::to_string(ctx->start_millis + ctx->load_cost_millis) + "_" + ctx->label;
890
0
        auto st = stream_load_recorder->put(key, str);
891
0
        if (st.ok()) {
892
0
            LOG(INFO) << "put stream_load_record rocksdb successfully. label: " << ctx->label
893
0
                      << ", key: " << key;
894
0
        }
895
0
    } else {
896
0
        LOG(WARNING) << "put stream_load_record rocksdb failed. stream_load_recorder is null.";
897
0
    }
898
0
}
899
900
Status StreamLoadAction::_check_wal_space(const std::string& group_commit_mode,
901
0
                                          int64_t content_length) {
902
0
    if (iequal(group_commit_mode, ASYNC_MODE) &&
903
0
        !load_size_smaller_than_wal_limit(content_length)) {
904
0
        std::stringstream ss;
905
0
        ss << "There is no space for group commit stream load async WAL. This stream load "
906
0
              "size is "
907
0
           << content_length
908
0
           << ". WAL dir info: " << ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string();
909
0
        LOG(WARNING) << ss.str();
910
0
        return Status::Error<EXCEEDED_LIMIT>(ss.str());
911
0
    }
912
0
    return Status::OK();
913
0
}
914
915
Status StreamLoadAction::_can_group_commit(HttpRequest* req, std::shared_ptr<StreamLoadContext> ctx,
916
                                           std::string& group_commit_header,
917
0
                                           bool& can_group_commit) {
918
0
    int64_t content_length = req->header(HttpHeaders::CONTENT_LENGTH).empty()
919
0
                                     ? 0
920
0
                                     : std::stoll(req->header(HttpHeaders::CONTENT_LENGTH));
921
0
    if (content_length < 0) {
922
0
        std::stringstream ss;
923
0
        ss << "This stream load content length <0 (" << content_length
924
0
           << "), please check your content length.";
925
0
        LOG(WARNING) << ss.str();
926
0
        return Status::InvalidArgument(ss.str());
927
0
    }
928
0
    auto is_chunk = !req->header(HttpHeaders::TRANSFER_ENCODING).empty() &&
929
0
                    req->header(HttpHeaders::TRANSFER_ENCODING).find(CHUNK) != std::string::npos;
930
0
    if (content_length == 0 && !is_chunk) {
931
        // off_mode and empty
932
0
        can_group_commit = false;
933
0
        return Status::OK();
934
0
    }
935
0
    if (is_chunk) {
936
0
        ctx->label = "";
937
0
    }
938
939
0
    auto partial_columns = !req->header(HTTP_PARTIAL_COLUMNS).empty() &&
940
0
                           iequal(req->header(HTTP_PARTIAL_COLUMNS), "true");
941
0
    auto temp_partitions = !req->header(HTTP_TEMP_PARTITIONS).empty();
942
0
    auto partitions = !req->header(HTTP_PARTITIONS).empty();
943
0
    auto update_mode =
944
0
            !req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty() &&
945
0
            (iequal(req->header(HTTP_UNIQUE_KEY_UPDATE_MODE), "UPDATE_FIXED_COLUMNS") ||
946
0
             iequal(req->header(HTTP_UNIQUE_KEY_UPDATE_MODE), "UPDATE_FLEXIBLE_COLUMNS"));
947
0
    if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit &&
948
0
        !update_mode) {
949
0
        if (!config::wait_internal_group_commit_finish && !group_commit_header.empty() &&
950
0
            !ctx->label.empty()) {
951
0
            return Status::InvalidArgument("label and group_commit can't be set at the same time");
952
0
        }
953
0
        RETURN_IF_ERROR(_check_wal_space(group_commit_header, content_length));
954
0
        can_group_commit = true;
955
0
    }
956
0
    return Status::OK();
957
0
}
958
959
Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
960
0
                                              std::shared_ptr<StreamLoadContext> ctx) {
961
0
    std::string group_commit_header = req->header(HTTP_GROUP_COMMIT);
962
0
    if (!group_commit_header.empty() && !iequal(group_commit_header, SYNC_MODE) &&
963
0
        !iequal(group_commit_header, ASYNC_MODE) && !iequal(group_commit_header, OFF_MODE)) {
964
0
        return Status::InvalidArgument(
965
0
                "group_commit can only be [async_mode, sync_mode, off_mode]");
966
0
    }
967
0
    if (config::wait_internal_group_commit_finish) {
968
0
        group_commit_header = SYNC_MODE;
969
0
    }
970
971
    // if group_commit_header is off_mode, we will not use group commit
972
0
    if (iequal(group_commit_header, OFF_MODE)) {
973
0
        ctx->group_commit_mode = OFF_MODE;
974
0
        ctx->group_commit = false;
975
0
        return Status::OK();
976
0
    }
977
0
    bool can_group_commit = false;
978
0
    RETURN_IF_ERROR(_can_group_commit(req, ctx, group_commit_header, can_group_commit));
979
0
    if (!can_group_commit) {
980
0
        ctx->group_commit_mode = OFF_MODE;
981
0
        ctx->group_commit = false;
982
0
    } else {
983
0
        if (!group_commit_header.empty()) {
984
0
            ctx->group_commit_mode = group_commit_header;
985
0
            ctx->group_commit = true;
986
0
        } else {
987
            // use table property to decide group commit or not
988
0
            ctx->group_commit_mode = "";
989
0
            ctx->group_commit = false;
990
0
        }
991
0
    }
992
0
    return Status::OK();
993
0
}
994
995
} // namespace doris