be/src/service/http/action/stream_load_2pc.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "service/http/action/stream_load_2pc.h" |
19 | | |
20 | | #include <glog/logging.h> |
21 | | #include <rapidjson/encodings.h> |
22 | | #include <rapidjson/prettywriter.h> |
23 | | #include <rapidjson/stringbuffer.h> |
24 | | |
25 | | #include <exception> |
26 | | #include <memory> |
27 | | #include <new> |
28 | | #include <ostream> |
29 | | |
30 | | #include "common/status.h" |
31 | | #include "load/stream_load/stream_load_context.h" |
32 | | #include "load/stream_load/stream_load_executor.h" |
33 | | #include "runtime/exec_env.h" |
34 | | #include "service/http/http_channel.h" |
35 | | #include "service/http/http_common.h" |
36 | | #include "service/http/http_request.h" |
37 | | #include "service/http/http_status.h" |
38 | | #include "service/http/utils.h" |
39 | | |
40 | | namespace doris { |
41 | | |
42 | 1 | StreamLoad2PCAction::StreamLoad2PCAction(ExecEnv* exec_env) : _exec_env(exec_env) {} |
43 | | |
44 | 0 | void StreamLoad2PCAction::handle(HttpRequest* req) { |
45 | 0 | Status status = Status::OK(); |
46 | 0 | std::string status_result; |
47 | 0 | std::string msg; |
48 | |
|
49 | 0 | std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env); |
50 | 0 | ctx->db = req->param(HTTP_DB_KEY); |
51 | 0 | if (!req->header(HTTP_LABEL_KEY).empty()) { |
52 | 0 | std::string req_label = req->header(HTTP_LABEL_KEY); |
53 | 0 | ctx->label = req_label; |
54 | 0 | msg.append("label [" + req_label + "] "); |
55 | 0 | } |
56 | 0 | if (!req->header(HTTP_TXN_ID_KEY).empty()) { |
57 | 0 | std::string req_txn_id = req->header(HTTP_TXN_ID_KEY); |
58 | 0 | msg.append("transaction [" + req_txn_id + "] "); |
59 | 0 | try { |
60 | 0 | ctx->txn_id = std::stoull(req_txn_id); |
61 | 0 | } catch (const std::exception& e) { |
62 | 0 | status = Status::InternalError("convert txn_id [{}] failed, reason={}", req_txn_id, |
63 | 0 | e.what()); |
64 | 0 | status_result = status.to_json(); |
65 | 0 | HttpChannel::send_reply(req, HttpStatus::OK, status_result); |
66 | 0 | return; |
67 | 0 | } |
68 | 0 | } |
69 | 0 | ctx->txn_operation = req->header(HTTP_TXN_OPERATION_KEY); |
70 | 0 | msg.append(ctx->txn_operation + " successfully."); |
71 | 0 | if (ctx->txn_operation.compare("commit") != 0 && ctx->txn_operation.compare("abort") != 0) { |
72 | 0 | status = Status::InternalError("transaction operation should be \'commit\' or \'abort\'"); |
73 | 0 | status_result = status.to_json(); |
74 | 0 | HttpChannel::send_reply(req, HttpStatus::OK, status_result); |
75 | 0 | return; |
76 | 0 | } |
77 | | |
78 | 0 | if (!parse_basic_auth(*req, &ctx->auth)) { |
79 | 0 | LOG(WARNING) << "parse basic authorization failed."; |
80 | 0 | status = Status::InternalError("no valid Basic authorization"); |
81 | 0 | } |
82 | |
|
83 | 0 | status = _exec_env->stream_load_executor()->operate_txn_2pc(ctx.get()); |
84 | |
|
85 | 0 | if (!status.ok()) { |
86 | 0 | status_result = status.to_json(); |
87 | 0 | } else { |
88 | 0 | status_result = get_success_info(msg, ctx->txn_operation); |
89 | 0 | } |
90 | 0 | HttpChannel::send_reply(req, HttpStatus::OK, status_result); |
91 | 0 | } |
92 | | |
93 | | std::string StreamLoad2PCAction::get_success_info(const std::string msg, |
94 | 0 | const std::string txn_operation) { |
95 | 0 | rapidjson::StringBuffer s; |
96 | 0 | rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s); |
97 | |
|
98 | 0 | writer.StartObject(); |
99 | | // status |
100 | 0 | writer.Key("status"); |
101 | 0 | writer.String("Success"); |
102 | | // msg |
103 | 0 | writer.Key("msg"); |
104 | 0 | writer.String(msg.c_str()); |
105 | 0 | writer.EndObject(); |
106 | 0 | return s.GetString(); |
107 | 0 | } |
108 | | |
109 | | } // namespace doris |