be/src/cloud/cloud_stream_load_executor.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "cloud/cloud_stream_load_executor.h" |
19 | | |
20 | | #include <bvar/bvar.h> |
21 | | |
22 | | #include "cloud/cloud_meta_mgr.h" |
23 | | #include "cloud/cloud_storage_engine.h" |
24 | | #include "cloud/config.h" |
25 | | #include "common/logging.h" |
26 | | #include "common/status.h" |
27 | | #include "load/stream_load/stream_load_context.h" |
28 | | #include "util/debug_points.h" |
29 | | |
30 | | namespace doris { |
31 | | |
32 | | bvar::Adder<uint64_t> stream_load_commit_retry_counter("stream_load_commit_retry_counter"); |
33 | | bvar::Window<bvar::Adder<uint64_t>> stream_load_commit_retry_counter_minute( |
34 | | "stream_load_commit_retry_counter", "1m", &stream_load_commit_retry_counter, 60); |
35 | | |
36 | | enum class TxnOpParamType : int { |
37 | | ILLEGAL, |
38 | | WITH_TXN_ID, |
39 | | WITH_LABEL, |
40 | | }; |
41 | | |
42 | | CloudStreamLoadExecutor::CloudStreamLoadExecutor(ExecEnv* exec_env) |
43 | 0 | : StreamLoadExecutor(exec_env) {} |
44 | | |
45 | 0 | CloudStreamLoadExecutor::~CloudStreamLoadExecutor() = default; |
46 | | |
47 | 0 | Status CloudStreamLoadExecutor::pre_commit_txn(StreamLoadContext* ctx) { |
48 | 0 | auto st = _exec_env->storage_engine().to_cloud().meta_mgr().precommit_txn(*ctx); |
49 | 0 | if (!st.ok()) { |
50 | 0 | LOG(WARNING) << "Failed to precommit txn: " << st << ", " << ctx->brief(); |
51 | 0 | return st; |
52 | 0 | } |
53 | 0 | ctx->need_rollback = false; |
54 | 0 | return st; |
55 | 0 | } |
56 | | |
57 | 0 | Status CloudStreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) { |
58 | 0 | std::stringstream ss; |
59 | 0 | ss << "db_id=" << ctx->db_id << " txn_id=" << ctx->txn_id << " label=" << ctx->label |
60 | 0 | << " txn_2pc_op=" << ctx->txn_operation; |
61 | 0 | std::string op_info = ss.str(); |
62 | 0 | VLOG_DEBUG << "operate_txn_2pc " << op_info; |
63 | 0 | TxnOpParamType topt = ctx->txn_id > 0 ? TxnOpParamType::WITH_TXN_ID |
64 | 0 | : !ctx->label.empty() ? TxnOpParamType::WITH_LABEL |
65 | 0 | : TxnOpParamType::ILLEGAL; |
66 | |
|
67 | 0 | Status st = Status::InternalError<false>("impossible branch reached, " + op_info); |
68 | |
|
69 | 0 | if (ctx->txn_operation.compare("commit") == 0) { |
70 | 0 | if (!config::enable_stream_load_commit_txn_on_be) { |
71 | 0 | VLOG_DEBUG << "2pc commit stream load txn with FE support: " << op_info; |
72 | 0 | st = StreamLoadExecutor::operate_txn_2pc(ctx); |
73 | 0 | } else if (topt == TxnOpParamType::WITH_TXN_ID) { |
74 | 0 | VLOG_DEBUG << "2pc commit stream load txn directly: " << op_info; |
75 | 0 | st = _exec_env->storage_engine().to_cloud().meta_mgr().commit_txn(*ctx, true); |
76 | 0 | } else if (topt == TxnOpParamType::WITH_LABEL) { |
77 | 0 | VLOG_DEBUG << "2pc commit stream load txn with FE support: " << op_info; |
78 | 0 | st = StreamLoadExecutor::operate_txn_2pc(ctx); |
79 | 0 | } else { |
80 | 0 | st = Status::InternalError<false>( |
81 | 0 | "failed to 2pc commit txn, with TxnOpParamType::illegal input, " + op_info); |
82 | 0 | } |
83 | 0 | } else if (ctx->txn_operation.compare("abort") == 0) { |
84 | 0 | if (topt == TxnOpParamType::WITH_TXN_ID) { |
85 | 0 | LOG(INFO) << "2pc abort stream load txn directly: " << op_info; |
86 | 0 | st = _exec_env->storage_engine().to_cloud().meta_mgr().abort_txn(*ctx); |
87 | 0 | WARN_IF_ERROR(st, "failed to rollback txn " + op_info); |
88 | 0 | } else if (topt == TxnOpParamType::WITH_LABEL) { // maybe a label send to FE to abort |
89 | 0 | VLOG_DEBUG << "2pc abort stream load txn with FE support: " << op_info; |
90 | 0 | StreamLoadExecutor::rollback_txn(ctx); |
91 | 0 | st = Status::OK(); |
92 | 0 | } else { |
93 | 0 | st = Status::InternalError<false>("failed abort txn, with illegal input, " + op_info); |
94 | 0 | } |
95 | 0 | } else { |
96 | 0 | std::string msg = |
97 | 0 | "failed to operate_txn_2pc, unrecognized operation: " + ctx->txn_operation; |
98 | 0 | LOG(WARNING) << msg << " " << op_info; |
99 | 0 | st = Status::InternalError<false>(msg + " " + op_info); |
100 | 0 | } |
101 | 0 | WARN_IF_ERROR(st, "failed to operate_txn_2pc " + op_info) |
102 | 0 | return st; |
103 | 0 | } |
104 | | |
105 | 0 | Status CloudStreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { |
106 | 0 | DBUG_EXECUTE_IF("StreamLoadExecutor.commit_txn.block", DBUG_BLOCK); |
107 | 0 | DBUG_EXECUTE_IF("StreamLoadExecutor.commit_txn.crash", { |
108 | 0 | LOG(INFO) << "debug point " << DP_NAME << " trigger crash"; |
109 | 0 | volatile int* p = nullptr; |
110 | 0 | *p = 1; |
111 | 0 | }); |
112 | | // forward to fe to excute commit transaction for MoW table |
113 | 0 | if (ctx->is_mow_table() || !config::enable_stream_load_commit_txn_on_be || |
114 | 0 | ctx->load_type == TLoadType::ROUTINE_LOAD) { |
115 | 0 | Status st; |
116 | 0 | int retry_times = 0; |
117 | 0 | while (retry_times < config::mow_stream_load_commit_retry_times) { |
118 | 0 | st = StreamLoadExecutor::commit_txn(ctx); |
119 | | // DELETE_BITMAP_LOCK_ERROR will be retried |
120 | 0 | if (st.ok() || !st.is<ErrorCode::DELETE_BITMAP_LOCK_ERROR>()) { |
121 | 0 | break; |
122 | 0 | } |
123 | 0 | LOG_WARNING("Failed to commit txn") |
124 | 0 | .tag("txn_id", ctx->txn_id) |
125 | 0 | .tag("retry_times", retry_times) |
126 | 0 | .error(st); |
127 | 0 | retry_times++; |
128 | 0 | stream_load_commit_retry_counter << 1; |
129 | 0 | } |
130 | 0 | return st; |
131 | 0 | } |
132 | | |
133 | 0 | auto st = _exec_env->storage_engine().to_cloud().meta_mgr().commit_txn(*ctx, false); |
134 | 0 | if (!st.ok()) { |
135 | 0 | LOG(WARNING) << "Failed to commit txn: " << st << ", " << ctx->brief(); |
136 | 0 | return st; |
137 | 0 | } |
138 | 0 | ctx->need_rollback = false; |
139 | 0 | return st; |
140 | 0 | } |
141 | | |
142 | 0 | void CloudStreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) { |
143 | 0 | std::stringstream ss; |
144 | 0 | ss << "db_id=" << ctx->db_id << " txn_id=" << ctx->txn_id << " label=" << ctx->label; |
145 | 0 | std::string op_info = ss.str(); |
146 | 0 | LOG(INFO) << "rollback stream load txn " << op_info; |
147 | 0 | TxnOpParamType topt = ctx->txn_id > 0 ? TxnOpParamType::WITH_TXN_ID |
148 | 0 | : !ctx->label.empty() ? TxnOpParamType::WITH_LABEL |
149 | 0 | : TxnOpParamType::ILLEGAL; |
150 | |
|
151 | 0 | if (topt == TxnOpParamType::WITH_TXN_ID && ctx->load_type != TLoadType::ROUTINE_LOAD) { |
152 | 0 | VLOG_DEBUG << "abort stream load txn directly: " << op_info; |
153 | 0 | WARN_IF_ERROR(_exec_env->storage_engine().to_cloud().meta_mgr().abort_txn(*ctx), |
154 | 0 | "failed to rollback txn " + op_info); |
155 | 0 | } else { // maybe a label send to FE to abort |
156 | | // does not care about the return status |
157 | | // ctx->db_id > 0 && !ctx->label.empty() |
158 | 0 | VLOG_DEBUG << "abort stream load txn with FE support: " << op_info; |
159 | 0 | StreamLoadExecutor::rollback_txn(ctx); |
160 | 0 | } |
161 | 0 | } |
162 | | |
163 | | } // namespace doris |