be/src/load/group_commit/wal/wal_table.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "load/group_commit/wal/wal_table.h" |
19 | | |
20 | | #include <absl/strings/str_split.h> |
21 | | #include <thrift/protocol/TDebugProtocol.h> |
22 | | |
23 | | #include "io/fs/local_file_system.h" |
24 | | #include "io/fs/stream_load_pipe.h" |
25 | | #include "load/group_commit/wal/wal_manager.h" |
26 | | #include "runtime/fragment_mgr.h" |
27 | | #include "service/http/action/http_stream.h" |
28 | | #include "service/http/action/stream_load.h" |
29 | | #include "service/http/ev_http_server.h" |
30 | | #include "service/http/http_common.h" |
31 | | #include "service/http/http_headers.h" |
32 | | #include "service/http/utils.h" |
33 | | #include "util/client_cache.h" |
34 | | #include "util/path_util.h" |
35 | | #include "util/thrift_rpc_helper.h" |
36 | | |
37 | | namespace doris { |
38 | | |
39 | | bvar::Adder<uint64_t> wal_fail("group_commit_wal_fail"); |
40 | | |
41 | | WalTable::WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id) |
42 | 0 | : _exec_env(exec_env), _db_id(db_id), _table_id(table_id) { |
43 | 0 | _http_stream_action = std::make_shared<HttpStreamAction>(exec_env); |
44 | 0 | } |
45 | 0 | WalTable::~WalTable() {} |
46 | | |
47 | 0 | void WalTable::add_wal(int64_t wal_id, std::string wal) { |
48 | 0 | std::lock_guard<std::mutex> lock(_replay_wal_lock); |
49 | 0 | LOG(INFO) << "add replay wal=" << wal; |
50 | 0 | auto wal_info = std::make_shared<WalInfo>(wal_id, wal, 0, UnixMillis()); |
51 | 0 | _replay_wal_map.emplace(wal, wal_info); |
52 | 0 | } |
53 | | |
54 | 0 | void WalTable::_pick_relay_wals() { |
55 | 0 | std::lock_guard<std::mutex> lock(_replay_wal_lock); |
56 | 0 | std::vector<std::string> need_replay_wals; |
57 | 0 | std::vector<std::string> need_erase_wals; |
58 | 0 | for (const auto& [wal_path, wal_info] : _replay_wal_map) { |
59 | 0 | if (config::group_commit_wait_replay_wal_finish && |
60 | 0 | wal_info->get_retry_num() >= config::group_commit_replay_wal_retry_num) { |
61 | 0 | LOG(WARNING) << "failed to replay wal=" << wal_path << " after retry " |
62 | 0 | << wal_info->get_retry_num() << " times"; |
63 | 0 | [[maybe_unused]] auto st = _exec_env->wal_mgr()->rename_to_tmp_path( |
64 | 0 | wal_path, _table_id, wal_info->get_wal_id()); |
65 | 0 | auto notify_st = _exec_env->wal_mgr()->notify_relay_wal(wal_info->get_wal_id()); |
66 | 0 | if (!notify_st.ok()) { |
67 | 0 | LOG(WARNING) << "notify wal " << wal_info->get_wal_id() << " fail"; |
68 | 0 | } |
69 | 0 | need_erase_wals.push_back(wal_path); |
70 | 0 | continue; |
71 | 0 | } |
72 | 0 | if (_need_replay(wal_info)) { |
73 | 0 | need_replay_wals.push_back(wal_path); |
74 | 0 | } |
75 | 0 | } |
76 | 0 | for (const auto& wal : need_erase_wals) { |
77 | 0 | _replay_wal_map.erase(wal); |
78 | 0 | } |
79 | 0 | std::sort(need_replay_wals.begin(), need_replay_wals.end()); |
80 | 0 | for (const auto& wal : need_replay_wals) { |
81 | 0 | _replaying_queue.emplace_back(_replay_wal_map[wal]); |
82 | 0 | _replay_wal_map.erase(wal); |
83 | 0 | } |
84 | 0 | } |
85 | | |
86 | 0 | Status WalTable::_relay_wal_one_by_one() { |
87 | 0 | std::vector<std::shared_ptr<WalInfo>> need_retry_wals; |
88 | 0 | for (auto wal_info : _replaying_queue) { |
89 | 0 | wal_info->add_retry_num(); |
90 | 0 | Status st; |
91 | 0 | int64_t file_size = 0; |
92 | 0 | std::filesystem::path file_path(wal_info->get_wal_path()); |
93 | 0 | if (!std::filesystem::exists(file_path)) { |
94 | 0 | st = Status::InternalError("wal file {} does not exist", wal_info->get_wal_path()); |
95 | 0 | } else { |
96 | 0 | file_size = std::filesystem::file_size(file_path); |
97 | 0 | st = _replay_wal_internal(wal_info->get_wal_path()); |
98 | 0 | } |
99 | 0 | auto msg = st.msg(); |
100 | 0 | if (st.ok() || st.is<ErrorCode::PUBLISH_TIMEOUT>() || st.is<ErrorCode::NOT_FOUND>() || |
101 | 0 | st.is<ErrorCode::DATA_QUALITY_ERROR>() || |
102 | 0 | (msg.find("has already been used") != msg.npos && |
103 | 0 | (msg.find("COMMITTED") != msg.npos || msg.find("VISIBLE") != msg.npos))) { |
104 | 0 | LOG(INFO) << "succeed to replay wal=" << wal_info->get_wal_path() |
105 | 0 | << ", st=" << st.to_string() << ", file size=" << file_size; |
106 | | // delete wal |
107 | 0 | WARN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(_table_id, wal_info->get_wal_id()), |
108 | 0 | "failed to delete wal=" + wal_info->get_wal_path()); |
109 | 0 | if (config::group_commit_wait_replay_wal_finish) { |
110 | 0 | RETURN_IF_ERROR(_exec_env->wal_mgr()->notify_relay_wal(wal_info->get_wal_id())); |
111 | 0 | } |
112 | 0 | } else { |
113 | 0 | doris::wal_fail << 1; |
114 | 0 | LOG(WARNING) << "failed to replay wal=" << wal_info->get_wal_path() |
115 | 0 | << ", st=" << st.to_string(); |
116 | 0 | need_retry_wals.push_back(wal_info); |
117 | 0 | } |
118 | 0 | } |
119 | 0 | { |
120 | 0 | std::lock_guard<std::mutex> lock(_replay_wal_lock); |
121 | 0 | _replaying_queue.clear(); |
122 | 0 | for (auto retry_wal_info : need_retry_wals) { |
123 | 0 | _replay_wal_map.emplace(retry_wal_info->get_wal_path(), retry_wal_info); |
124 | 0 | } |
125 | 0 | } |
126 | 0 | return Status::OK(); |
127 | 0 | } |
128 | | |
129 | 0 | Status WalTable::replay_wals() { |
130 | 0 | { |
131 | 0 | std::lock_guard<std::mutex> lock(_replay_wal_lock); |
132 | 0 | if (_replay_wal_map.empty()) { |
133 | 0 | LOG(INFO) << "_replay_wal_map is empty, skip relaying for table_id=" << _table_id; |
134 | 0 | return Status::OK(); |
135 | 0 | } |
136 | 0 | if (!_replaying_queue.empty()) { |
137 | 0 | LOG(INFO) << "_replaying_queue is not empty, skip relaying for table_id=" << _table_id; |
138 | 0 | return Status::OK(); |
139 | 0 | } |
140 | 0 | } |
141 | 0 | VLOG_DEBUG << "Start replay wals for db=" << _db_id << ", table=" << _table_id |
142 | 0 | << ", wal size=" << _replay_wal_map.size(); |
143 | 0 | _pick_relay_wals(); |
144 | 0 | RETURN_IF_ERROR(_relay_wal_one_by_one()); |
145 | 0 | return Status::OK(); |
146 | 0 | } |
147 | | |
148 | 0 | bool WalTable::_need_replay(std::shared_ptr<WalInfo> wal_info) { |
149 | 0 | if (config::group_commit_wait_replay_wal_finish) { |
150 | 0 | return true; |
151 | 0 | } |
152 | | #ifndef BE_TEST |
153 | | int64_t replay_interval = 0; |
154 | | if (wal_info->get_retry_num() >= config::group_commit_replay_wal_retry_num) { |
155 | | replay_interval = |
156 | | int64_t(pow(2, config::group_commit_replay_wal_retry_num) * |
157 | | config::group_commit_replay_wal_retry_interval_seconds * 1000 + |
158 | | (wal_info->get_retry_num() - config::group_commit_replay_wal_retry_num) * |
159 | | config::group_commit_replay_wal_retry_interval_max_seconds * 1000); |
160 | | } else { |
161 | | replay_interval = int64_t(pow(2, wal_info->get_retry_num()) * |
162 | | config::group_commit_replay_wal_retry_interval_seconds * 1000); |
163 | | } |
164 | | return UnixMillis() - wal_info->get_start_time_ms() >= replay_interval; |
165 | | #else |
166 | 0 | return true; |
167 | 0 | #endif |
168 | 0 | } |
169 | | |
170 | 0 | Status WalTable::_try_abort_txn(int64_t db_id, std::string& label) { |
171 | 0 | TLoadTxnRollbackRequest request; |
172 | | // this is a fake, fe not check it now |
173 | | // should be removed in 3.1, use token instead |
174 | 0 | request.__set_auth_code(0); |
175 | 0 | request.__set_token(_exec_env->cluster_info()->curr_auth_token); |
176 | 0 | request.__set_db_id(db_id); |
177 | 0 | request.__set_label(label); |
178 | 0 | request.__set_reason("relay wal with label " + label); |
179 | 0 | TLoadTxnRollbackResult result; |
180 | 0 | TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; |
181 | 0 | auto st = ThriftRpcHelper::rpc<FrontendServiceClient>( |
182 | 0 | master_addr.hostname, master_addr.port, |
183 | 0 | [&request, &result](FrontendServiceConnection& client) { |
184 | 0 | client->loadTxnRollback(result, request); |
185 | 0 | }); |
186 | 0 | auto result_status = Status::create<false>(result.status); |
187 | 0 | LOG(INFO) << "abort label " << label << ", st:" << st << ", result_status:" << result_status; |
188 | 0 | return result_status; |
189 | 0 | } |
190 | | |
191 | 0 | Status WalTable::_replay_wal_internal(const std::string& wal) { |
192 | 0 | LOG(INFO) << "start replay wal=" << wal; |
193 | 0 | int64_t version = -1; |
194 | 0 | int64_t backend_id = -1; |
195 | 0 | int64_t wal_id = -1; |
196 | 0 | std::string label = ""; |
197 | 0 | io::Path wal_path = wal; |
198 | 0 | auto file_name = wal_path.filename().string(); |
199 | 0 | RETURN_IF_ERROR(WalManager::parse_wal_path(file_name, version, backend_id, wal_id, label)); |
200 | | #ifndef BE_TEST |
201 | | if (!config::group_commit_wait_replay_wal_finish) { |
202 | | [[maybe_unused]] auto st = _try_abort_txn(_db_id, label); |
203 | | } |
204 | | #endif |
205 | 0 | DBUG_EXECUTE_IF("WalTable.replay_wals.stop", |
206 | 0 | { return Status::InternalError("WalTable.replay_wals.stop"); }); |
207 | 0 | return _replay_one_wal_with_streamload(wal_id, wal, label); |
208 | 0 | } |
209 | | |
210 | | Status WalTable::_construct_sql_str(const std::string& wal, const std::string& label, |
211 | 0 | std::string& sql_str) { |
212 | 0 | std::string columns; |
213 | 0 | RETURN_IF_ERROR(_read_wal_header(wal, columns)); |
214 | 0 | std::vector<std::string> column_id_vector = |
215 | 0 | absl::StrSplit(columns, ",", absl::SkipWhitespace()); |
216 | 0 | std::map<int64_t, std::string> column_info_map; |
217 | 0 | RETURN_IF_ERROR(_get_column_info(_db_id, _table_id, column_info_map)); |
218 | 0 | std::stringstream ss_name; |
219 | 0 | for (auto column_id_str : column_id_vector) { |
220 | 0 | try { |
221 | 0 | int64_t column_id = std::strtoll(column_id_str.c_str(), NULL, 10); |
222 | 0 | auto it = column_info_map.find(column_id); |
223 | 0 | if (it != column_info_map.end()) { |
224 | 0 | ss_name << "`" << it->second << "`,"; |
225 | 0 | column_info_map.erase(column_id); |
226 | 0 | } |
227 | 0 | } catch (const std::invalid_argument& e) { |
228 | 0 | return Status::InvalidArgument("Invalid format, {}", e.what()); |
229 | 0 | } |
230 | 0 | } |
231 | 0 | auto name = ss_name.str().substr(0, ss_name.str().size() - 1); |
232 | 0 | std::stringstream ss; |
233 | 0 | ss << "insert into doris_internal_table_id(" << _table_id << ") WITH LABEL " << label << " (" |
234 | 0 | << name << ") select " << name << " from http_stream(\"format\" = \"wal\", \"table_id\" = \"" |
235 | 0 | << std::to_string(_table_id) << "\")"; |
236 | 0 | sql_str = ss.str().data(); |
237 | 0 | return Status::OK(); |
238 | 0 | } |
239 | | |
240 | | Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal, |
241 | 0 | const std::string& label) { |
242 | 0 | std::string sql_str; |
243 | 0 | RETURN_IF_ERROR(_construct_sql_str(wal, label, sql_str)); |
244 | 0 | std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env); |
245 | 0 | ctx->sql_str = sql_str; |
246 | 0 | ctx->db_id = _db_id; |
247 | 0 | ctx->table_id = _table_id; |
248 | 0 | ctx->wal_id = wal_id; |
249 | 0 | ctx->label = label; |
250 | 0 | ctx->need_commit_self = false; |
251 | 0 | ctx->auth.token = _exec_env->cluster_info()->curr_auth_token; |
252 | 0 | ctx->auth.user = "admin"; |
253 | 0 | ctx->group_commit = false; |
254 | 0 | ctx->load_type = TLoadType::MANUL_LOAD; |
255 | 0 | ctx->load_src_type = TLoadSourceType::RAW; |
256 | 0 | ctx->max_filter_ratio = 1; |
257 | 0 | auto st = _http_stream_action->process_put(nullptr, ctx); |
258 | 0 | DBUG_EXECUTE_IF("WalTable::_handle_stream_load.fail", |
259 | 0 | { st = Status::InternalError("WalTable::_handle_stream_load.fail"); }); |
260 | 0 | if (st.ok()) { |
261 | | // wait stream load finish |
262 | 0 | RETURN_IF_ERROR(ctx->load_status_future.get()); |
263 | 0 | if (ctx->status.ok()) { |
264 | | // deprecated and should be removed in 3.1, use token instead. |
265 | 0 | ctx->auth.auth_code = wal_id; |
266 | 0 | st = _exec_env->stream_load_executor()->commit_txn(ctx.get()); |
267 | 0 | } else { |
268 | 0 | st = ctx->status; |
269 | 0 | } |
270 | 0 | } |
271 | 0 | if (!st.ok()) { |
272 | 0 | _exec_env->stream_load_executor()->rollback_txn(ctx.get()); |
273 | 0 | } |
274 | 0 | return st; |
275 | 0 | } |
276 | | |
277 | | Status WalTable::_replay_one_wal_with_streamload(int64_t wal_id, const std::string& wal, |
278 | 0 | const std::string& label) { |
279 | | #ifndef BE_TEST |
280 | | return _handle_stream_load(wal_id, wal, label); |
281 | | #else |
282 | 0 | return Status::OK(); |
283 | 0 | #endif |
284 | 0 | } |
285 | | |
286 | 0 | void WalTable::stop() { |
287 | 0 | do { |
288 | 0 | { |
289 | 0 | std::lock_guard<std::mutex> lock(_replay_wal_lock); |
290 | 0 | if (_replay_wal_map.empty() && _replaying_queue.empty()) { |
291 | 0 | break; |
292 | 0 | } |
293 | 0 | LOG(INFO) << "stopping wal_table,wait for relay wal task done, now " |
294 | 0 | << _replay_wal_map.size() << " wals wait to replay, " |
295 | 0 | << _replaying_queue.size() << " wals are replaying"; |
296 | 0 | std::this_thread::sleep_for(std::chrono::milliseconds(1000)); |
297 | 0 | } |
298 | 0 | } while (true); |
299 | 0 | } |
300 | | |
301 | 0 | size_t WalTable::size() { |
302 | 0 | std::lock_guard<std::mutex> lock(_replay_wal_lock); |
303 | 0 | return _replay_wal_map.size() + _replaying_queue.size(); |
304 | 0 | } |
305 | | |
306 | | Status WalTable::_get_column_info(int64_t db_id, int64_t tb_id, |
307 | 0 | std::map<int64_t, std::string>& column_info_map) { |
308 | 0 | TGetColumnInfoRequest request; |
309 | 0 | request.__set_db_id(db_id); |
310 | 0 | request.__set_table_id(tb_id); |
311 | 0 | TGetColumnInfoResult result; |
312 | 0 | Status status; |
313 | 0 | TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; |
314 | 0 | if (master_addr.hostname.empty() || master_addr.port == 0) { |
315 | 0 | status = Status::InternalError<false>("Have not get FE Master heartbeat yet"); |
316 | 0 | } else { |
317 | 0 | RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( |
318 | 0 | master_addr.hostname, master_addr.port, |
319 | 0 | [&request, &result](FrontendServiceConnection& client) { |
320 | 0 | client->getColumnInfo(result, request); |
321 | 0 | })); |
322 | 0 | status = Status::create<false>(result.status); |
323 | 0 | if (!status.ok()) { |
324 | 0 | return status; |
325 | 0 | } |
326 | 0 | std::vector<TColumnInfo> column_element = result.columns; |
327 | 0 | for (auto column : column_element) { |
328 | 0 | auto column_name = column.column_name; |
329 | 0 | auto column_id = column.column_id; |
330 | 0 | column_info_map.emplace(column_id, column_name); |
331 | 0 | } |
332 | 0 | } |
333 | 0 | return status; |
334 | 0 | } |
335 | | |
336 | 0 | Status WalTable::_read_wal_header(const std::string& wal_path, std::string& columns) { |
337 | 0 | std::shared_ptr<doris::WalFileReader> wal_reader = |
338 | 0 | std::make_shared<doris::WalFileReader>(wal_path); |
339 | 0 | RETURN_IF_ERROR(wal_reader->init()); |
340 | 0 | uint32_t version = 0; |
341 | 0 | RETURN_IF_ERROR(wal_reader->read_header(version, columns)); |
342 | 0 | VLOG_DEBUG << "wal=" << wal_path << ",version=" << std::to_string(version) |
343 | 0 | << ",columns=" << columns; |
344 | 0 | RETURN_IF_ERROR(wal_reader->finalize()); |
345 | 0 | return Status::OK(); |
346 | 0 | } |
347 | | |
348 | | } // namespace doris |