Coverage Report

Created: 2026-06-23 10:59

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/service/http/action/http_stream.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "service/http/action/http_stream.h"
19
20
#include <algorithm>
21
#include <cstddef>
22
#include <future>
23
#include <sstream>
24
25
// use string iequal
26
#include <event2/buffer.h>
27
#include <event2/bufferevent.h>
28
#include <event2/http.h>
29
#include <gen_cpp/FrontendService.h>
30
#include <gen_cpp/FrontendService_types.h>
31
#include <gen_cpp/HeartbeatService_types.h>
32
#include <rapidjson/prettywriter.h>
33
#include <thrift/protocol/TDebugProtocol.h>
34
35
#include "cloud/config.h"
36
#include "common/config.h"
37
#include "common/logging.h"
38
#include "common/metrics/doris_metrics.h"
39
#include "common/metrics/metrics.h"
40
#include "common/status.h"
41
#include "common/utils.h"
42
#include "io/fs/stream_load_pipe.h"
43
#include "load/group_commit/group_commit_mgr.h"
44
#include "load/load_path_mgr.h"
45
#include "load/stream_load/new_load_stream_mgr.h"
46
#include "load/stream_load/stream_load_context.h"
47
#include "load/stream_load/stream_load_executor.h"
48
#include "load/stream_load/stream_load_recorder.h"
49
#include "runtime/exec_env.h"
50
#include "runtime/fragment_mgr.h"
51
#include "service/http/http_channel.h"
52
#include "service/http/http_common.h"
53
#include "service/http/http_headers.h"
54
#include "service/http/http_request.h"
55
#include "service/http/utils.h"
56
#include "storage/storage_engine.h"
57
#include "util/byte_buffer.h"
58
#include "util/client_cache.h"
59
#include "util/load_util.h"
60
#include "util/string_util.h"
61
#include "util/thrift_rpc_helper.h"
62
#include "util/time.h"
63
#include "util/uid_util.h"
64
65
namespace doris {
66
using namespace ErrorCode;
67
68
namespace {
69
70
10
bool is_compressed_file_scan(const TPipelineFragmentParams& params) {
71
10
    if (!params.__isset.file_scan_params) {
72
0
        return false;
73
0
    }
74
10
    return std::ranges::any_of(params.file_scan_params, [](const auto& file_scan_param) {
75
10
        const auto& file_scan_params = file_scan_param.second;
76
10
        TFileCompressType::type compress_type = file_scan_params.__isset.compress_type
77
10
                                                        ? file_scan_params.compress_type
78
10
                                                        : TFileCompressType::UNKNOWN;
79
10
        TFileFormatType::type format_type = file_scan_params.__isset.format_type
80
10
                                                    ? file_scan_params.format_type
81
10
                                                    : TFileFormatType::FORMAT_UNKNOWN;
82
10
        return LoadUtil::is_compressed_load(compress_type, format_type);
83
10
    });
84
10
}
85
86
} // namespace
87
88
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(http_stream_requests_total, MetricUnit::REQUESTS);
89
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(http_stream_duration_ms, MetricUnit::MILLISECONDS);
90
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(http_stream_current_processing, MetricUnit::REQUESTS);
91
92
HttpStreamAction::HttpStreamAction(ExecEnv* exec_env)
93
8
        : HttpHandlerWithAuth(exec_env, TPrivilegeHier::GLOBAL, TPrivilegeType::LOAD) {
94
    // Use LOAD privilege type: requires LOAD permission
95
    // Note: _exec_env is set by parent class HttpHandlerWithAuth
96
8
    _http_stream_entity =
97
8
            DorisMetrics::instance()->metric_registry()->register_entity("http_stream");
98
8
    INT_COUNTER_METRIC_REGISTER(_http_stream_entity, http_stream_requests_total);
99
8
    INT_COUNTER_METRIC_REGISTER(_http_stream_entity, http_stream_duration_ms);
100
8
    INT_GAUGE_METRIC_REGISTER(_http_stream_entity, http_stream_current_processing);
101
8
}
102
103
4
HttpStreamAction::~HttpStreamAction() {
104
4
    DorisMetrics::instance()->metric_registry()->deregister_entity(_http_stream_entity);
105
4
}
106
107
133
void HttpStreamAction::handle(HttpRequest* req) {
108
133
    std::shared_ptr<StreamLoadContext> ctx =
109
133
            std::static_pointer_cast<StreamLoadContext>(req->handler_ctx());
110
133
    if (ctx == nullptr) {
111
0
        return;
112
0
    }
113
114
    // status already set to fail
115
133
    if (ctx->status.ok()) {
116
112
        ctx->status = _handle(req, ctx);
117
112
        if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
118
3
            LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
119
3
                         << ", errmsg=" << ctx->status;
120
3
        }
121
112
    }
122
133
    ctx->load_cost_millis = UnixMillis() - ctx->start_millis;
123
124
133
    if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
125
24
        if (ctx->body_sink != nullptr) {
126
24
            ctx->body_sink->cancel(ctx->status.to_string());
127
24
        }
128
24
    }
129
130
133
    if (!ctx->status.ok()) {
131
24
        auto str = std::string(ctx->to_json());
132
        // add new line at end
133
24
        str = str + '\n';
134
24
        HttpChannel::send_reply(req, str);
135
24
        return;
136
24
    }
137
109
    auto str = std::string(ctx->to_json());
138
    // add new line at end
139
109
    str = str + '\n';
140
109
    HttpChannel::send_reply(req, str);
141
109
    if (config::enable_stream_load_record || config::enable_stream_load_record_to_audit_log_table) {
142
109
        if (req->header(HTTP_SKIP_RECORD_TO_AUDIT_LOG_TABLE).empty()) {
143
109
            str = ctx->prepare_stream_load_record(str);
144
109
            _save_stream_load_record(ctx, str);
145
109
        }
146
109
    }
147
    // update statistics
148
109
    http_stream_requests_total->increment(1);
149
109
    http_stream_duration_ms->increment(ctx->load_cost_millis);
150
109
}
151
152
112
Status HttpStreamAction::_handle(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx) {
153
112
    if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
154
0
        LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes
155
0
                     << ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id;
156
0
        return Status::Error<ErrorCode::NETWORK_ERROR>("receive body don't equal with body bytes");
157
0
    }
158
112
    RETURN_IF_ERROR(ctx->body_sink->finish());
159
160
    // wait stream load finish
161
112
    RETURN_IF_ERROR(ctx->load_status_future.get());
162
163
109
    if (ctx->group_commit) {
164
12
        LOG(INFO) << "skip commit because this is group commit, pipe_id=" << ctx->id.to_string();
165
12
        return Status::OK();
166
12
    }
167
168
97
    if (ctx->two_phase_commit) {
169
1
        int64_t pre_commit_start_time = MonotonicNanos();
170
1
        RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get()));
171
1
        ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time;
172
96
    } else {
173
        // If put file success we need commit this load
174
96
        int64_t commit_and_publish_start_time = MonotonicNanos();
175
96
        RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
176
96
        ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - commit_and_publish_start_time;
177
96
    }
178
97
    return Status::OK();
179
97
}
180
181
133
int HttpStreamAction::on_header(HttpRequest* req) {
182
    // Call parent's auth check first
183
133
    int ret = HttpHandlerWithAuth::on_header(req);
184
133
    if (ret != 0) {
185
0
        return ret; // Auth failed, return error
186
0
    }
187
188
133
    http_stream_current_processing->increment(1);
189
190
133
    std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env);
191
133
    req->set_handler_ctx(ctx);
192
193
133
    ctx->load_type = TLoadType::MANUL_LOAD;
194
133
    ctx->load_src_type = TLoadSourceType::RAW;
195
133
    ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true";
196
133
    Status st = _handle_group_commit(req, ctx);
197
198
133
    LOG(INFO) << "new income streaming load request." << ctx->brief()
199
133
              << " sql : " << req->header(HTTP_SQL) << ", group_commit=" << ctx->group_commit;
200
133
    if (st.ok()) {
201
133
        st = _on_header(req, ctx);
202
133
    }
203
133
    if (!st.ok()) {
204
0
        ctx->status = std::move(st);
205
0
        if (ctx->body_sink != nullptr) {
206
0
            ctx->body_sink->cancel(ctx->status.to_string());
207
0
        }
208
0
        auto str = ctx->to_json();
209
        // add new line at end
210
0
        str = str + '\n';
211
0
        HttpChannel::send_reply(req, str);
212
0
        if (config::enable_stream_load_record ||
213
0
            config::enable_stream_load_record_to_audit_log_table) {
214
0
            if (req->header(HTTP_SKIP_RECORD_TO_AUDIT_LOG_TABLE).empty()) {
215
0
                str = ctx->prepare_stream_load_record(str);
216
0
                _save_stream_load_record(ctx, str);
217
0
            }
218
0
        }
219
0
        return -1;
220
0
    }
221
133
    return 0;
222
133
}
223
224
133
Status HttpStreamAction::_on_header(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx) {
225
    // auth information
226
133
    if (!parse_basic_auth(*http_req, &ctx->auth)) {
227
0
        LOG(WARNING) << "parse basic authorization failed." << ctx->brief();
228
0
        return Status::NotAuthorized("no valid Basic authorization");
229
0
    }
230
231
    // TODO(zs) : need Need to request an FE to obtain information such as format
232
    // check content length
233
133
    ctx->body_bytes = 0;
234
133
    size_t csv_max_body_bytes = config::streaming_load_max_mb * 1024 * 1024;
235
133
    if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
236
130
        try {
237
130
            ctx->body_bytes = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
238
130
        } catch (const std::exception& e) {
239
0
            return Status::InvalidArgument("invalid HTTP header CONTENT_LENGTH={}: {}",
240
0
                                           http_req->header(HttpHeaders::CONTENT_LENGTH), e.what());
241
0
        }
242
        // csv max body size
243
130
        if (ctx->body_bytes > csv_max_body_bytes) {
244
0
            LOG(WARNING) << "body exceed max size." << ctx->brief();
245
0
            return Status::Error<ErrorCode::EXCEEDED_LIMIT>(
246
0
                    "body size {} exceed BE's conf `streaming_load_max_mb` {}. increase it if you "
247
0
                    "are sure this load is reasonable",
248
0
                    ctx->body_bytes, csv_max_body_bytes);
249
0
        }
250
130
    }
251
252
133
    auto pipe = std::make_shared<io::StreamLoadPipe>(
253
133
            io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
254
133
            ctx->body_bytes /* total_length */);
255
133
    ctx->body_sink = pipe;
256
133
    ctx->pipe = pipe;
257
258
133
    RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(ctx->id, ctx));
259
260
    // Here, transactions are set from fe's NativeInsertStmt.
261
    // TODO(zs) : How to support two_phase_commit
262
263
133
    return Status::OK();
264
133
}
265
266
9.85k
void HttpStreamAction::on_chunk_data(HttpRequest* req) {
267
9.85k
    std::shared_ptr<StreamLoadContext> ctx =
268
9.85k
            std::static_pointer_cast<StreamLoadContext>(req->handler_ctx());
269
9.85k
    if (ctx == nullptr || !ctx->status.ok()) {
270
8
        return;
271
8
    }
272
9.84k
    if (!req->header(HTTP_WAL_ID_KY).empty()) {
273
0
        ctx->wal_id = std::stoll(req->header(HTTP_WAL_ID_KY));
274
0
    }
275
9.84k
    struct evhttp_request* ev_req = req->get_evhttp_request();
276
9.84k
    auto evbuf = evhttp_request_get_input_buffer(ev_req);
277
278
    // In HttpStreamAction::on_chunk_data
279
    //      -> process_put
280
    //      -> StreamLoadExecutor::execute_plan_fragment
281
    //      -> exec_plan_fragment
282
    // , SCOPED_ATTACH_TASK will be called.
283
9.84k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->stream_load_pipe_tracker());
284
285
9.84k
    int64_t start_read_data_time = MonotonicNanos();
286
9.84k
    Status st = ctx->allocate_schema_buffer();
287
9.84k
    if (!st.ok()) {
288
0
        ctx->status = st;
289
0
        return;
290
0
    }
291
19.6k
    while (evbuffer_get_length(evbuf) > 0) {
292
9.84k
        ByteBufferPtr bb;
293
9.84k
        st = ByteBuffer::allocate(128 * 1024, &bb);
294
9.84k
        if (!st.ok()) {
295
0
            ctx->status = st;
296
0
            return;
297
0
        }
298
9.84k
        auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
299
9.84k
        bb->pos = remove_bytes;
300
9.84k
        bb->flip();
301
9.84k
        st = ctx->body_sink->append(bb);
302
        // schema_buffer stores 1M of data for parsing column information
303
        // need to determine whether to cache for the first time
304
9.84k
        if (ctx->is_read_schema) {
305
133
            if (ctx->schema_buffer()->pos + remove_bytes < config::stream_tvf_buffer_size) {
306
133
                ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes);
307
133
            } else {
308
0
                LOG(INFO) << "use a portion of data to request fe to obtain column information";
309
0
                ctx->is_read_schema = false;
310
0
                ctx->status = process_put(req, ctx);
311
0
            }
312
133
        }
313
9.84k
        if (!st.ok()) {
314
0
            LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief();
315
0
            ctx->status = st;
316
0
            return;
317
0
        }
318
9.84k
        ctx->receive_bytes += remove_bytes;
319
9.84k
    }
320
    // after all the data has been read and it has not reached 1M, it will execute here
321
9.84k
    if (ctx->is_read_schema) {
322
133
        LOG(INFO) << "after all the data has been read and it has not reached 1M, it will execute "
323
133
                  << "here";
324
133
        ctx->is_read_schema = false;
325
133
        ctx->status = process_put(req, ctx);
326
133
    }
327
9.84k
    ctx->read_data_cost_nanos += (MonotonicNanos() - start_read_data_time);
328
9.84k
}
329
330
133
void HttpStreamAction::free_handler_ctx(std::shared_ptr<void> param) {
331
133
    std::shared_ptr<StreamLoadContext> ctx = std::static_pointer_cast<StreamLoadContext>(param);
332
133
    if (ctx == nullptr) {
333
0
        return;
334
0
    }
335
    // sender is gone, make receiver know it
336
133
    if (ctx->body_sink != nullptr) {
337
133
        ctx->body_sink->cancel("sender is gone");
338
133
    }
339
    // remove stream load context from stream load manager and the resource will be released
340
133
    ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
341
133
    http_stream_current_processing->increment(-1);
342
133
}
343
344
Status HttpStreamAction::process_put(HttpRequest* http_req,
345
133
                                     std::shared_ptr<StreamLoadContext> ctx) {
346
133
    TStreamLoadPutRequest request;
347
133
    if (http_req != nullptr) {
348
133
        request.__set_load_sql(http_req->header(HTTP_SQL));
349
133
        if (!http_req->header(HTTP_MEMTABLE_ON_SINKNODE).empty()) {
350
0
            bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), "true");
351
0
            request.__set_memtable_on_sink_node(value);
352
0
        }
353
133
    } else {
354
0
        request.__set_token(ctx->auth.token);
355
0
        request.__set_load_sql(ctx->sql_str);
356
0
        ctx->auth.token = "";
357
0
    }
358
133
    set_request_auth(&request, ctx->auth);
359
133
    request.__set_loadId(ctx->id.to_thrift());
360
133
    request.__set_label(ctx->label);
361
133
    if (ctx->group_commit) {
362
13
        if (!http_req->header(HTTP_GROUP_COMMIT).empty()) {
363
13
            request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT));
364
13
        } else {
365
            // used for wait_internal_group_commit_finish
366
0
            request.__set_group_commit_mode("sync_mode");
367
0
        }
368
13
    }
369
133
    if (_exec_env->cluster_info()->backend_id != 0) {
370
133
        request.__set_backend_id(_exec_env->cluster_info()->backend_id);
371
133
    } else {
372
0
        LOG(WARNING) << "_exec_env->cluster_info not set backend_id";
373
0
    }
374
133
    if (ctx->wal_id > 0) {
375
0
        request.__set_partial_update(false);
376
0
    }
377
378
    // plan this load
379
133
    TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
380
133
    int64_t stream_load_put_start_time = MonotonicNanos();
381
133
    RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
382
133
            master_addr.hostname, master_addr.port,
383
133
            [&request, ctx](FrontendServiceConnection& client) {
384
133
                client->streamLoadPut(ctx->put_result, request);
385
133
            }));
386
133
    ctx->put_result.pipeline_params.query_options.__set_enable_strict_cast(false);
387
133
    ctx->stream_load_put_cost_nanos = MonotonicNanos() - stream_load_put_start_time;
388
133
    Status plan_status(Status::create(ctx->put_result.status));
389
133
    if (!plan_status.ok()) {
390
20
        LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << ctx->brief();
391
20
        return plan_status;
392
20
    }
393
113
    if (config::is_cloud_mode() && ctx->two_phase_commit && ctx->is_mow_table()) {
394
1
        return Status::NotSupported("http stream 2pc is unsupported for mow table");
395
1
    }
396
112
    ctx->db = ctx->put_result.pipeline_params.db_name;
397
112
    ctx->table = ctx->put_result.pipeline_params.table_name;
398
112
    ctx->txn_id = ctx->put_result.pipeline_params.txn_conf.txn_id;
399
112
    ctx->label = ctx->put_result.pipeline_params.import_label;
400
112
    ctx->put_result.pipeline_params.__set_wal_id(ctx->wal_id);
401
112
    if (http_req != nullptr && http_req->header(HTTP_GROUP_COMMIT) == "async_mode") {
402
        // FIXME find a way to avoid chunked stream load write large WALs
403
12
        size_t content_length = 0;
404
12
        if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
405
10
            try {
406
10
                content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
407
10
            } catch (const std::exception& e) {
408
0
                return Status::InvalidArgument("invalid HTTP header CONTENT_LENGTH={}: {}",
409
0
                                               http_req->header(HttpHeaders::CONTENT_LENGTH),
410
0
                                               e.what());
411
0
            }
412
10
            if (is_compressed_file_scan(ctx->put_result.pipeline_params)) {
413
5
                content_length *= 3;
414
5
            }
415
10
        }
416
12
        ctx->put_result.pipeline_params.__set_content_length(content_length);
417
12
    }
418
112
    TPipelineFragmentParamsList mocked;
419
112
    return _exec_env->stream_load_executor()->execute_plan_fragment(ctx, mocked);
420
112
}
421
422
void HttpStreamAction::_save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx,
423
109
                                                const std::string& str) {
424
109
    std::shared_ptr<StreamLoadRecorder> stream_load_recorder =
425
109
            ExecEnv::GetInstance()->storage_engine().get_stream_load_recorder();
426
427
109
    if (stream_load_recorder != nullptr) {
428
109
        std::string key =
429
109
                std::to_string(ctx->start_millis + ctx->load_cost_millis) + "_" + ctx->label;
430
109
        auto st = stream_load_recorder->put(key, str);
431
109
        if (st.ok()) {
432
109
            LOG(INFO) << "put stream_load_record rocksdb successfully. label: " << ctx->label
433
109
                      << ", key: " << key;
434
109
        }
435
109
    } else {
436
0
        LOG(WARNING) << "put stream_load_record rocksdb failed. stream_load_recorder is null.";
437
0
    }
438
109
}
439
440
Status HttpStreamAction::_handle_group_commit(HttpRequest* req,
441
133
                                              std::shared_ptr<StreamLoadContext> ctx) {
442
133
    std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT);
443
133
    if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") &&
444
133
        !iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) {
445
0
        return Status::InvalidArgument(
446
0
                "group_commit can only be [async_mode, sync_mode, off_mode]");
447
0
    }
448
133
    if (config::wait_internal_group_commit_finish) {
449
0
        group_commit_mode = "sync_mode";
450
0
    }
451
133
    int64_t content_length = req->header(HttpHeaders::CONTENT_LENGTH).empty()
452
133
                                     ? 0
453
133
                                     : std::stoll(req->header(HttpHeaders::CONTENT_LENGTH));
454
133
    if (content_length < 0) {
455
0
        std::stringstream ss;
456
0
        ss << "This http load content length <0 (" << content_length
457
0
           << "), please check your content length.";
458
0
        LOG(WARNING) << ss.str();
459
0
        return Status::InvalidArgument(ss.str());
460
0
    }
461
    // allow chunked stream load in flink
462
133
    auto is_chunk =
463
133
            !req->header(HttpHeaders::TRANSFER_ENCODING).empty() &&
464
133
            req->header(HttpHeaders::TRANSFER_ENCODING).find("chunked") != std::string::npos;
465
133
    if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode") ||
466
133
        (content_length == 0 && !is_chunk)) {
467
        // off_mode and empty
468
120
        ctx->group_commit = false;
469
120
        return Status::OK();
470
120
    }
471
13
    if (is_chunk) {
472
2
        ctx->label = "";
473
2
    }
474
475
13
    auto partial_columns = !req->header(HTTP_PARTIAL_COLUMNS).empty() &&
476
13
                           iequal(req->header(HTTP_PARTIAL_COLUMNS), "true");
477
13
    auto temp_partitions = !req->header(HTTP_TEMP_PARTITIONS).empty();
478
13
    auto partitions = !req->header(HTTP_PARTITIONS).empty();
479
13
    if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit) {
480
13
        if (!config::wait_internal_group_commit_finish && !ctx->label.empty()) {
481
0
            return Status::InvalidArgument("label and group_commit can't be set at the same time");
482
0
        }
483
13
        ctx->group_commit = true;
484
13
        if (iequal(group_commit_mode, "async_mode")) {
485
13
            if (!load_size_smaller_than_wal_limit(content_length)) {
486
0
                std::stringstream ss;
487
0
                ss << "There is no space for group commit http load async WAL. This http load "
488
0
                      "size is "
489
0
                   << content_length << ". WAL dir info: "
490
0
                   << ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string();
491
0
                LOG(WARNING) << ss.str();
492
0
                return Status::Error<EXCEEDED_LIMIT>(ss.str());
493
0
            }
494
13
        }
495
13
    }
496
13
    return Status::OK();
497
13
}
498
499
} // namespace doris