Coverage Report

Created: 2026-03-15 17:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/service/http/action/stream_load_2pc.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "service/http/action/stream_load_2pc.h"
19
20
#include <glog/logging.h>
21
#include <rapidjson/encodings.h>
22
#include <rapidjson/prettywriter.h>
23
#include <rapidjson/stringbuffer.h>
24
25
#include <exception>
26
#include <memory>
27
#include <new>
28
#include <ostream>
29
30
#include "common/status.h"
31
#include "load/stream_load/stream_load_context.h"
32
#include "load/stream_load/stream_load_executor.h"
33
#include "runtime/exec_env.h"
34
#include "service/http/http_channel.h"
35
#include "service/http/http_common.h"
36
#include "service/http/http_request.h"
37
#include "service/http/http_status.h"
38
#include "service/http/utils.h"
39
40
namespace doris {
41
42
1
StreamLoad2PCAction::StreamLoad2PCAction(ExecEnv* exec_env) : _exec_env(exec_env) {}
43
44
0
void StreamLoad2PCAction::handle(HttpRequest* req) {
45
0
    Status status = Status::OK();
46
0
    std::string status_result;
47
0
    std::string msg;
48
49
0
    std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env);
50
0
    ctx->db = req->param(HTTP_DB_KEY);
51
0
    if (!req->header(HTTP_LABEL_KEY).empty()) {
52
0
        std::string req_label = req->header(HTTP_LABEL_KEY);
53
0
        ctx->label = req_label;
54
0
        msg.append("label [" + req_label + "] ");
55
0
    }
56
0
    if (!req->header(HTTP_TXN_ID_KEY).empty()) {
57
0
        std::string req_txn_id = req->header(HTTP_TXN_ID_KEY);
58
0
        msg.append("transaction [" + req_txn_id + "] ");
59
0
        try {
60
0
            ctx->txn_id = std::stoull(req_txn_id);
61
0
        } catch (const std::exception& e) {
62
0
            status = Status::InternalError("convert txn_id [{}] failed, reason={}", req_txn_id,
63
0
                                           e.what());
64
0
            status_result = status.to_json();
65
0
            HttpChannel::send_reply(req, HttpStatus::OK, status_result);
66
0
            return;
67
0
        }
68
0
    }
69
0
    ctx->txn_operation = req->header(HTTP_TXN_OPERATION_KEY);
70
0
    msg.append(ctx->txn_operation + " successfully.");
71
0
    if (ctx->txn_operation.compare("commit") != 0 && ctx->txn_operation.compare("abort") != 0) {
72
0
        status = Status::InternalError("transaction operation should be \'commit\' or \'abort\'");
73
0
        status_result = status.to_json();
74
0
        HttpChannel::send_reply(req, HttpStatus::OK, status_result);
75
0
        return;
76
0
    }
77
78
0
    if (!parse_basic_auth(*req, &ctx->auth)) {
79
0
        LOG(WARNING) << "parse basic authorization failed.";
80
0
        status = Status::InternalError("no valid Basic authorization");
81
0
    }
82
83
0
    status = _exec_env->stream_load_executor()->operate_txn_2pc(ctx.get());
84
85
0
    if (!status.ok()) {
86
0
        status_result = status.to_json();
87
0
    } else {
88
0
        status_result = get_success_info(msg, ctx->txn_operation);
89
0
    }
90
0
    HttpChannel::send_reply(req, HttpStatus::OK, status_result);
91
0
}
92
93
std::string StreamLoad2PCAction::get_success_info(const std::string msg,
94
0
                                                  const std::string txn_operation) {
95
0
    rapidjson::StringBuffer s;
96
0
    rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
97
98
0
    writer.StartObject();
99
    // status
100
0
    writer.Key("status");
101
0
    writer.String("Success");
102
    // msg
103
0
    writer.Key("msg");
104
0
    writer.String(msg.c_str());
105
0
    writer.EndObject();
106
0
    return s.GetString();
107
0
}
108
109
} // namespace doris