be/src/load/group_commit/group_commit_mgr.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "load/group_commit/group_commit_mgr.h" |
19 | | |
20 | | #include <gen_cpp/Types_types.h> |
21 | | #include <glog/logging.h> |
22 | | |
23 | | #include <chrono> |
24 | | |
25 | | #include "cloud/config.h" |
26 | | #include "common/compiler_util.h" |
27 | | #include "common/config.h" |
28 | | #include "common/status.h" |
29 | | #include "exec/pipeline/dependency.h" |
30 | | #include "runtime/exec_env.h" |
31 | | #include "runtime/fragment_mgr.h" |
32 | | #include "util/client_cache.h" |
33 | | #include "util/debug_points.h" |
34 | | #include "util/thrift_rpc_helper.h" |
35 | | |
36 | | namespace doris { |
37 | | #include "common/compile_check_begin.h" |
38 | | |
39 | | bvar::Adder<uint64_t> group_commit_block_by_memory_counter("group_commit_block_by_memory_counter"); |
40 | | |
41 | 0 | std::string LoadBlockQueue::_get_load_ids() { |
42 | 0 | std::stringstream ss; |
43 | 0 | ss << "["; |
44 | 0 | for (auto& id : _load_ids_to_write_dep) { |
45 | 0 | ss << id.first.to_string() << ", "; |
46 | 0 | } |
47 | 0 | ss << "]"; |
48 | 0 | return ss.str(); |
49 | 0 | } |
50 | | |
51 | | Status LoadBlockQueue::add_block(RuntimeState* runtime_state, std::shared_ptr<Block> block, |
52 | 0 | bool write_wal, UniqueId& load_id) { |
53 | 0 | DBUG_EXECUTE_IF("LoadBlockQueue.add_block.failed", |
54 | 0 | { return Status::InternalError("LoadBlockQueue.add_block.failed"); }); |
55 | 0 | std::unique_lock l(mutex); |
56 | 0 | RETURN_IF_ERROR(status); |
57 | 0 | if (UNLIKELY(runtime_state->is_cancelled())) { |
58 | 0 | return runtime_state->cancel_reason(); |
59 | 0 | } |
60 | 0 | RETURN_IF_ERROR(status); |
61 | 0 | LOG(INFO) << "query_id: " << print_id(runtime_state->query_id()) |
62 | 0 | << ", add block rows=" << block->rows() << ", use group_commit label=" << label; |
63 | 0 | DBUG_EXECUTE_IF("LoadBlockQueue.add_block.block", DBUG_BLOCK); |
64 | 0 | if (block->rows() > 0) { |
65 | 0 | if (!config::group_commit_wait_replay_wal_finish) { |
66 | 0 | _block_queue.emplace_back(block); |
67 | 0 | _data_bytes += block->bytes(); |
68 | 0 | size_t before_block_queues_bytes = _all_block_queues_bytes->load(); |
69 | 0 | _all_block_queues_bytes->fetch_add(block->bytes(), std::memory_order_relaxed); |
70 | 0 | VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::add_block). " |
71 | 0 | << "Cur block rows=" << block->rows() << ", bytes=" << block->bytes() |
72 | 0 | << ". all block queues bytes from " << before_block_queues_bytes << " to " |
73 | 0 | << _all_block_queues_bytes->load() << ", queue size=" << _block_queue.size() |
74 | 0 | << ". txn_id=" << txn_id << ", label=" << label |
75 | 0 | << ", instance_id=" << load_instance_id << ", load_ids=" << _get_load_ids(); |
76 | 0 | } |
77 | 0 | if (write_wal || config::group_commit_wait_replay_wal_finish) { |
78 | 0 | auto st = _v_wal_writer->write_wal(block.get()); |
79 | 0 | if (!st.ok()) { |
80 | 0 | _cancel_without_lock(st); |
81 | 0 | return st; |
82 | 0 | } |
83 | 0 | } |
84 | 0 | if (!runtime_state->is_cancelled() && status.ok() && |
85 | 0 | _all_block_queues_bytes->load(std::memory_order_relaxed) >= |
86 | 0 | config::group_commit_queue_mem_limit) { |
87 | 0 | group_commit_block_by_memory_counter << 1; |
88 | 0 | DCHECK(_load_ids_to_write_dep.find(load_id) != _load_ids_to_write_dep.end()); |
89 | 0 | _load_ids_to_write_dep[load_id]->block(); |
90 | 0 | VLOG_DEBUG << "block add_block for load_id=" << load_id |
91 | 0 | << ", memory=" << _all_block_queues_bytes->load(std::memory_order_relaxed) |
92 | 0 | << ". inner load_id=" << load_instance_id << ", label=" << label; |
93 | 0 | } |
94 | 0 | } |
95 | 0 | if (!_need_commit) { |
96 | 0 | if (_data_bytes >= _group_commit_data_bytes) { |
97 | 0 | VLOG_DEBUG << "group commit meets commit condition for data size, label=" << label |
98 | 0 | << ", instance_id=" << load_instance_id << ", data_bytes=" << _data_bytes; |
99 | 0 | _need_commit = true; |
100 | 0 | data_size_condition = true; |
101 | 0 | } |
102 | 0 | if (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - |
103 | 0 | _start_time) |
104 | 0 | .count() >= _group_commit_interval_ms) { |
105 | 0 | VLOG_DEBUG << "group commit meets commit condition for time interval, label=" << label |
106 | 0 | << ", instance_id=" << load_instance_id << ", data_bytes=" << _data_bytes; |
107 | 0 | _need_commit = true; |
108 | 0 | } |
109 | 0 | } |
110 | 0 | for (auto read_dep : _read_deps) { |
111 | 0 | read_dep->set_ready(); |
112 | 0 | VLOG_DEBUG << "set ready for inner load_id=" << load_instance_id; |
113 | 0 | } |
114 | 0 | return Status::OK(); |
115 | 0 | } |
116 | | |
117 | | Status LoadBlockQueue::get_block(RuntimeState* runtime_state, Block* block, bool* find_block, |
118 | 0 | bool* eos, std::shared_ptr<Dependency> get_block_dep) { |
119 | 0 | *find_block = false; |
120 | 0 | *eos = false; |
121 | 0 | std::unique_lock l(mutex); |
122 | 0 | if (runtime_state->is_cancelled() || !status.ok()) { |
123 | 0 | auto st = runtime_state->cancel_reason(); |
124 | 0 | _cancel_without_lock(st); |
125 | 0 | return status; |
126 | 0 | } |
127 | 0 | auto duration = std::chrono::duration_cast<std::chrono::milliseconds>( |
128 | 0 | std::chrono::steady_clock::now() - _start_time) |
129 | 0 | .count(); |
130 | 0 | if (!_need_commit && duration >= _group_commit_interval_ms) { |
131 | 0 | _need_commit = true; |
132 | 0 | } |
133 | 0 | if (_block_queue.empty()) { |
134 | 0 | if (_need_commit && duration >= 10 * _group_commit_interval_ms) { |
135 | 0 | auto last_print_duration = std::chrono::duration_cast<std::chrono::milliseconds>( |
136 | 0 | std::chrono::steady_clock::now() - _last_print_time) |
137 | 0 | .count(); |
138 | 0 | if (last_print_duration >= 10000) { |
139 | 0 | _last_print_time = std::chrono::steady_clock::now(); |
140 | 0 | LOG(INFO) << "find one group_commit need to commit, txn_id=" << txn_id |
141 | 0 | << ", label=" << label << ", instance_id=" << load_instance_id |
142 | 0 | << ", duration=" << duration << ", load_ids=" << _get_load_ids(); |
143 | 0 | } |
144 | 0 | } |
145 | 0 | VLOG_DEBUG << "get_block for inner load_id=" << load_instance_id << ", but queue is empty"; |
146 | 0 | if (!_need_commit) { |
147 | 0 | get_block_dep->block(); |
148 | 0 | VLOG_DEBUG << "block get_block for inner load_id=" << load_instance_id; |
149 | 0 | } |
150 | 0 | } else { |
151 | 0 | const BlockData block_data = _block_queue.front(); |
152 | 0 | block->swap(*block_data.block); |
153 | 0 | *find_block = true; |
154 | 0 | _block_queue.pop_front(); |
155 | 0 | size_t before_block_queues_bytes = _all_block_queues_bytes->load(); |
156 | 0 | _all_block_queues_bytes->fetch_sub(block_data.block_bytes, std::memory_order_relaxed); |
157 | 0 | VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::get_block). " |
158 | 0 | << "Cur block rows=" << block->rows() << ", bytes=" << block->bytes() |
159 | 0 | << ". all block queues bytes from " << before_block_queues_bytes << " to " |
160 | 0 | << _all_block_queues_bytes->load() << ", queue size=" << _block_queue.size() |
161 | 0 | << ". txn_id=" << txn_id << ", label=" << label |
162 | 0 | << ", instance_id=" << load_instance_id << ", load_ids=" << _get_load_ids(); |
163 | 0 | } |
164 | 0 | if (_block_queue.empty() && _need_commit && _load_ids_to_write_dep.empty()) { |
165 | 0 | *eos = true; |
166 | 0 | } else { |
167 | 0 | *eos = false; |
168 | 0 | } |
169 | 0 | if (_all_block_queues_bytes->load(std::memory_order_relaxed) < |
170 | 0 | config::group_commit_queue_mem_limit) { |
171 | 0 | for (auto& id : _load_ids_to_write_dep) { |
172 | 0 | id.second->set_ready(); |
173 | 0 | } |
174 | 0 | VLOG_DEBUG << "set ready for load_ids=" << _get_load_ids() |
175 | 0 | << ". inner load_id=" << load_instance_id << ", label=" << label; |
176 | 0 | } |
177 | 0 | return Status::OK(); |
178 | 0 | } |
179 | | |
180 | 0 | Status LoadBlockQueue::remove_load_id(const UniqueId& load_id) { |
181 | 0 | std::unique_lock l(mutex); |
182 | 0 | if (_load_ids_to_write_dep.find(load_id) != _load_ids_to_write_dep.end()) { |
183 | 0 | _load_ids_to_write_dep[load_id]->set_always_ready(); |
184 | 0 | _load_ids_to_write_dep.erase(load_id); |
185 | 0 | for (auto read_dep : _read_deps) { |
186 | 0 | read_dep->set_ready(); |
187 | 0 | } |
188 | 0 | VLOG_DEBUG << "set ready for load_id=" << load_id << ", inner load_id=" << load_instance_id; |
189 | 0 | return Status::OK(); |
190 | 0 | } |
191 | 0 | return Status::NotFound<false>("load_id=" + load_id.to_string() + |
192 | 0 | " not in block queue, label=" + label); |
193 | 0 | } |
194 | | |
195 | 0 | bool LoadBlockQueue::contain_load_id(const UniqueId& load_id) { |
196 | 0 | std::unique_lock l(mutex); |
197 | 0 | return _load_ids_to_write_dep.find(load_id) != _load_ids_to_write_dep.end(); |
198 | 0 | } |
199 | | |
200 | | Status LoadBlockQueue::add_load_id(const UniqueId& load_id, |
201 | 0 | const std::shared_ptr<Dependency> put_block_dep) { |
202 | 0 | std::unique_lock l(mutex); |
203 | 0 | if (_need_commit) { |
204 | 0 | return Status::InternalError<false>("block queue is set need commit, id=" + |
205 | 0 | load_instance_id.to_string()); |
206 | 0 | } |
207 | 0 | _load_ids_to_write_dep[load_id] = put_block_dep; |
208 | 0 | group_commit_load_count.fetch_add(1); |
209 | 0 | return Status::OK(); |
210 | 0 | } |
211 | | |
212 | 0 | void LoadBlockQueue::cancel(const Status& st) { |
213 | 0 | DCHECK(!st.ok()); |
214 | 0 | std::unique_lock l(mutex); |
215 | 0 | _cancel_without_lock(st); |
216 | 0 | } |
217 | | |
218 | 0 | void LoadBlockQueue::_cancel_without_lock(const Status& st) { |
219 | 0 | LOG(INFO) << "cancel group_commit, instance_id=" << load_instance_id << ", label=" << label |
220 | 0 | << ", status=" << st.to_string(); |
221 | 0 | status = |
222 | 0 | Status::Cancelled("cancel group_commit, label=" + label + ", status=" + st.to_string()); |
223 | 0 | size_t before_block_queues_bytes = _all_block_queues_bytes->load(); |
224 | 0 | while (!_block_queue.empty()) { |
225 | 0 | const BlockData& block_data = _block_queue.front().block; |
226 | 0 | _all_block_queues_bytes->fetch_sub(block_data.block_bytes, std::memory_order_relaxed); |
227 | 0 | _block_queue.pop_front(); |
228 | 0 | } |
229 | 0 | VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::_cancel_without_block). " |
230 | 0 | << "all block queues bytes from " << before_block_queues_bytes << " to " |
231 | 0 | << _all_block_queues_bytes->load() << ", queue size=" << _block_queue.size() |
232 | 0 | << ", txn_id=" << txn_id << ", label=" << label |
233 | 0 | << ", instance_id=" << load_instance_id << ", load_ids=" << _get_load_ids(); |
234 | 0 | for (auto& id : _load_ids_to_write_dep) { |
235 | 0 | id.second->set_always_ready(); |
236 | 0 | } |
237 | 0 | for (auto read_dep : _read_deps) { |
238 | 0 | read_dep->set_ready(); |
239 | 0 | } |
240 | 0 | VLOG_DEBUG << "set ready for load_ids=" << _get_load_ids() |
241 | 0 | << ", inner load_id=" << load_instance_id; |
242 | 0 | } |
243 | | |
244 | | Status GroupCommitTable::get_first_block_load_queue( |
245 | | int64_t table_id, int64_t base_schema_version, int64_t index_size, const UniqueId& load_id, |
246 | | std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version, |
247 | | std::shared_ptr<MemTrackerLimiter> mem_tracker, std::shared_ptr<Dependency> create_plan_dep, |
248 | 0 | std::shared_ptr<Dependency> put_block_dep) { |
249 | 0 | DCHECK(table_id == _table_id); |
250 | 0 | std::unique_lock l(_lock); |
251 | 0 | auto try_to_get_matched_queue = [&]() -> Status { |
252 | 0 | for (const auto& [_, inner_block_queue] : _load_block_queues) { |
253 | 0 | if (inner_block_queue->contain_load_id(load_id)) { |
254 | 0 | load_block_queue = inner_block_queue; |
255 | 0 | return Status::OK(); |
256 | 0 | } |
257 | 0 | } |
258 | 0 | for (const auto& [_, inner_block_queue] : _load_block_queues) { |
259 | 0 | if (!inner_block_queue->need_commit()) { |
260 | 0 | if (base_schema_version == inner_block_queue->schema_version && |
261 | 0 | index_size == inner_block_queue->index_size) { |
262 | 0 | if (inner_block_queue->add_load_id(load_id, put_block_dep).ok()) { |
263 | 0 | load_block_queue = inner_block_queue; |
264 | 0 | return Status::OK(); |
265 | 0 | } |
266 | 0 | } else { |
267 | 0 | return Status::DataQualityError<false>( |
268 | 0 | "schema version not match, maybe a schema change is in process. " |
269 | 0 | "Please retry this load manually."); |
270 | 0 | } |
271 | 0 | } |
272 | 0 | } |
273 | 0 | return Status::InternalError<false>("can not get a block queue for table_id: " + |
274 | 0 | std::to_string(_table_id) + _create_plan_failed_reason); |
275 | 0 | }; |
276 | |
|
277 | 0 | if (try_to_get_matched_queue().ok()) { |
278 | 0 | return Status::OK(); |
279 | 0 | } |
280 | 0 | create_plan_dep->block(); |
281 | 0 | _create_plan_deps.emplace(load_id, std::make_tuple(create_plan_dep, put_block_dep, |
282 | 0 | base_schema_version, index_size)); |
283 | 0 | if (!_is_creating_plan_fragment) { |
284 | 0 | _is_creating_plan_fragment = true; |
285 | 0 | RETURN_IF_ERROR( |
286 | 0 | _thread_pool->submit_func([&, be_exe_version, mem_tracker, dep = create_plan_dep] { |
287 | 0 | Defer defer {[&, dep = dep]() { |
288 | 0 | std::unique_lock l(_lock); |
289 | 0 | for (auto it : _create_plan_deps) { |
290 | 0 | std::get<0>(it.second)->set_ready(); |
291 | 0 | } |
292 | 0 | _create_plan_deps.clear(); |
293 | 0 | _is_creating_plan_fragment = false; |
294 | 0 | }}; |
295 | 0 | auto st = _create_group_commit_load(be_exe_version, mem_tracker); |
296 | 0 | if (!st.ok()) { |
297 | 0 | LOG(WARNING) << "create group commit load error: " << st.to_string(); |
298 | 0 | _create_plan_failed_reason = ". create group commit load error: " + |
299 | 0 | st.to_string().substr(0, 300); |
300 | 0 | } else { |
301 | 0 | _create_plan_failed_reason = ""; |
302 | 0 | } |
303 | 0 | })); |
304 | 0 | } |
305 | 0 | return try_to_get_matched_queue(); |
306 | 0 | } |
307 | | |
308 | 0 | void GroupCommitTable::remove_load_id(const UniqueId& load_id) { |
309 | 0 | std::unique_lock l(_lock); |
310 | 0 | if (_create_plan_deps.find(load_id) != _create_plan_deps.end()) { |
311 | 0 | _create_plan_deps.erase(load_id); |
312 | 0 | return; |
313 | 0 | } |
314 | 0 | for (const auto& [_, inner_block_queue] : _load_block_queues) { |
315 | 0 | if (inner_block_queue->remove_load_id(load_id).ok()) { |
316 | 0 | return; |
317 | 0 | } |
318 | 0 | } |
319 | 0 | } |
320 | | |
321 | | Status GroupCommitTable::_create_group_commit_load(int be_exe_version, |
322 | 0 | std::shared_ptr<MemTrackerLimiter> mem_tracker) { |
323 | 0 | Status st = Status::OK(); |
324 | 0 | TStreamLoadPutResult result; |
325 | 0 | std::string label; |
326 | 0 | int64_t txn_id; |
327 | 0 | TUniqueId instance_id; |
328 | 0 | { |
329 | 0 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker); |
330 | 0 | UniqueId load_id = UniqueId::gen_uid(); |
331 | 0 | TUniqueId tload_id; |
332 | 0 | tload_id.__set_hi(load_id.hi); |
333 | 0 | tload_id.__set_lo(load_id.lo); |
334 | 0 | std::regex reg("-"); |
335 | 0 | label = "group_commit_" + std::regex_replace(load_id.to_string(), reg, "_"); |
336 | 0 | std::stringstream ss; |
337 | 0 | ss << "insert into doris_internal_table_id(" << _table_id << ") WITH LABEL " << label |
338 | 0 | << " select * from group_commit(\"table_id\"=\"" << _table_id << "\")"; |
339 | 0 | TStreamLoadPutRequest request; |
340 | 0 | request.__set_load_sql(ss.str()); |
341 | 0 | request.__set_loadId(tload_id); |
342 | 0 | request.__set_label(label); |
343 | 0 | request.__set_token("group_commit"); // this is a fake, fe not check it now |
344 | 0 | request.__set_max_filter_ratio(1.0); |
345 | 0 | request.__set_strictMode(false); |
346 | 0 | request.__set_partial_update(false); |
347 | | // this is an internal interface, use admin to pass the auth check |
348 | 0 | request.__set_user("admin"); |
349 | 0 | if (_exec_env->cluster_info()->backend_id != 0) { |
350 | 0 | request.__set_backend_id(_exec_env->cluster_info()->backend_id); |
351 | 0 | } else { |
352 | 0 | LOG(WARNING) << "_exec_env->cluster_info not set backend_id"; |
353 | 0 | } |
354 | 0 | TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; |
355 | 0 | st = ThriftRpcHelper::rpc<FrontendServiceClient>( |
356 | 0 | master_addr.hostname, master_addr.port, |
357 | 0 | [&result, &request](FrontendServiceConnection& client) { |
358 | 0 | client->streamLoadPut(result, request); |
359 | 0 | }, |
360 | 0 | 10000L); |
361 | 0 | if (!st.ok()) { |
362 | 0 | LOG(WARNING) << "create group commit load rpc error, st=" << st.to_string(); |
363 | 0 | return st; |
364 | 0 | } |
365 | 0 | st = Status::create<false>(result.status); |
366 | 0 | if (st.ok() && !result.__isset.pipeline_params) { |
367 | 0 | st = Status::InternalError("Non-pipeline is disabled!"); |
368 | 0 | } |
369 | 0 | if (!st.ok()) { |
370 | 0 | LOG(WARNING) << "create group commit load error, st=" << st.to_string(); |
371 | 0 | return st; |
372 | 0 | } |
373 | 0 | auto& pipeline_params = result.pipeline_params; |
374 | 0 | auto schema_version = pipeline_params.fragment.output_sink.olap_table_sink.schema.version; |
375 | 0 | auto index_size = |
376 | 0 | pipeline_params.fragment.output_sink.olap_table_sink.schema.indexes.size(); |
377 | 0 | DCHECK(pipeline_params.fragment.output_sink.olap_table_sink.db_id == _db_id); |
378 | 0 | txn_id = pipeline_params.txn_conf.txn_id; |
379 | 0 | DCHECK(pipeline_params.local_params.size() == 1); |
380 | 0 | instance_id = pipeline_params.local_params[0].fragment_instance_id; |
381 | 0 | VLOG_DEBUG << "create plan fragment, db_id=" << _db_id << ", table=" << _table_id |
382 | 0 | << ", schema version=" << schema_version << ", index size=" << index_size |
383 | 0 | << ", label=" << label << ", txn_id=" << txn_id |
384 | 0 | << ", instance_id=" << print_id(instance_id); |
385 | 0 | { |
386 | 0 | auto load_block_queue = std::make_shared<LoadBlockQueue>( |
387 | 0 | instance_id, label, txn_id, schema_version, index_size, _all_block_queues_bytes, |
388 | 0 | result.wait_internal_group_commit_finish, result.group_commit_interval_ms, |
389 | 0 | result.group_commit_data_bytes); |
390 | 0 | RETURN_IF_ERROR(load_block_queue->create_wal( |
391 | 0 | _db_id, _table_id, txn_id, label, _exec_env->wal_mgr(), |
392 | 0 | pipeline_params.fragment.output_sink.olap_table_sink.schema.slot_descs, |
393 | 0 | be_exe_version)); |
394 | | |
395 | 0 | std::unique_lock l(_lock); |
396 | 0 | _load_block_queues.emplace(instance_id, load_block_queue); |
397 | 0 | std::vector<UniqueId> success_load_ids; |
398 | 0 | for (const auto& [id, load_info] : _create_plan_deps) { |
399 | 0 | auto create_dep = std::get<0>(load_info); |
400 | 0 | auto put_dep = std::get<1>(load_info); |
401 | 0 | if (load_block_queue->schema_version == std::get<2>(load_info) && |
402 | 0 | load_block_queue->index_size == std::get<3>(load_info)) { |
403 | 0 | if (load_block_queue->add_load_id(id, put_dep).ok()) { |
404 | 0 | create_dep->set_ready(); |
405 | 0 | success_load_ids.emplace_back(id); |
406 | 0 | } |
407 | 0 | } |
408 | 0 | } |
409 | 0 | for (const auto& id : success_load_ids) { |
410 | 0 | _create_plan_deps.erase(id); |
411 | 0 | } |
412 | 0 | } |
413 | 0 | } |
414 | 0 | st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, result.pipeline_params); |
415 | 0 | if (!st.ok()) { |
416 | 0 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker); |
417 | 0 | auto finish_st = _finish_group_commit_load(_db_id, _table_id, label, txn_id, instance_id, |
418 | 0 | st, nullptr); |
419 | 0 | if (!finish_st.ok()) { |
420 | 0 | LOG(WARNING) << "finish group commit error, label=" << label |
421 | 0 | << ", st=" << finish_st.to_string(); |
422 | 0 | } |
423 | 0 | } |
424 | 0 | return st; |
425 | 0 | } |
426 | | |
427 | | Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_id, |
428 | | const std::string& label, int64_t txn_id, |
429 | | const TUniqueId& instance_id, Status& status, |
430 | 0 | RuntimeState* state) { |
431 | 0 | Status st; |
432 | 0 | Status result_status; |
433 | 0 | DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status", { |
434 | 0 | status = Status::InternalError("LoadBlockQueue._finish_group_commit_load.err_status"); |
435 | 0 | }); |
436 | 0 | DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.load_error", |
437 | 0 | { status = Status::InternalError("load_error"); }); |
438 | 0 | if (status.ok()) { |
439 | 0 | DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.commit_error", { |
440 | 0 | status = Status::InternalError("LoadBlockQueue._finish_group_commit_load.commit_error"); |
441 | 0 | }); |
442 | | // commit txn |
443 | 0 | TLoadTxnCommitRequest request; |
444 | | // deprecated and should be removed in 3.1, use token instead |
445 | 0 | request.__set_auth_code(0); |
446 | 0 | request.__set_token(_exec_env->cluster_info()->curr_auth_token); |
447 | 0 | request.__set_db_id(db_id); |
448 | 0 | request.__set_table_id(table_id); |
449 | 0 | request.__set_txnId(txn_id); |
450 | 0 | request.__set_thrift_rpc_timeout_ms(config::txn_commit_rpc_timeout_ms); |
451 | 0 | request.__set_groupCommit(true); |
452 | 0 | request.__set_receiveBytes(state->num_bytes_load_total()); |
453 | 0 | if (_exec_env->cluster_info()->backend_id != 0) { |
454 | 0 | request.__set_backendId(_exec_env->cluster_info()->backend_id); |
455 | 0 | } else { |
456 | 0 | LOG(WARNING) << "_exec_env->cluster_info not set backend_id"; |
457 | 0 | } |
458 | 0 | if (state) { |
459 | 0 | request.__set_commitInfos(state->tablet_commit_infos()); |
460 | 0 | } |
461 | 0 | TLoadTxnCommitResult result; |
462 | 0 | TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; |
463 | 0 | int retry_times = 0; |
464 | 0 | while (retry_times < config::mow_stream_load_commit_retry_times) { |
465 | 0 | st = ThriftRpcHelper::rpc<FrontendServiceClient>( |
466 | 0 | master_addr.hostname, master_addr.port, |
467 | 0 | [&request, &result](FrontendServiceConnection& client) { |
468 | 0 | client->loadTxnCommit(result, request); |
469 | 0 | }, |
470 | 0 | config::txn_commit_rpc_timeout_ms); |
471 | 0 | result_status = Status::create(result.status); |
472 | | // DELETE_BITMAP_LOCK_ERROR will be retried |
473 | 0 | if (result_status.ok() || !result_status.is<ErrorCode::DELETE_BITMAP_LOCK_ERROR>()) { |
474 | 0 | break; |
475 | 0 | } |
476 | 0 | LOG_WARNING("Failed to commit txn on group commit") |
477 | 0 | .tag("label", label) |
478 | 0 | .tag("txn_id", txn_id) |
479 | 0 | .tag("retry_times", retry_times) |
480 | 0 | .error(result_status); |
481 | 0 | retry_times++; |
482 | 0 | } |
483 | 0 | DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.commit_success_and_rpc_error", |
484 | 0 | { result_status = Status::InternalError("commit_success_and_rpc_error"); }); |
485 | 0 | } else { |
486 | | // abort txn |
487 | 0 | TLoadTxnRollbackRequest request; |
488 | | // deprecated and should be removed in 3.1, use token instead |
489 | 0 | request.__set_auth_code(0); |
490 | 0 | request.__set_token(_exec_env->cluster_info()->curr_auth_token); |
491 | 0 | request.__set_db_id(db_id); |
492 | 0 | request.__set_txnId(txn_id); |
493 | 0 | request.__set_reason(status.to_string()); |
494 | 0 | TLoadTxnRollbackResult result; |
495 | 0 | TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; |
496 | 0 | st = ThriftRpcHelper::rpc<FrontendServiceClient>( |
497 | 0 | master_addr.hostname, master_addr.port, |
498 | 0 | [&request, &result](FrontendServiceConnection& client) { |
499 | 0 | client->loadTxnRollback(result, request); |
500 | 0 | }); |
501 | 0 | result_status = Status::create<false>(result.status); |
502 | 0 | DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status", { |
503 | 0 | std ::string msg = "abort txn"; |
504 | 0 | LOG(INFO) << "debug promise set: " << msg; |
505 | 0 | ExecEnv::GetInstance()->group_commit_mgr()->debug_promise.set_value( |
506 | 0 | Status ::InternalError(msg)); |
507 | 0 | }); |
508 | 0 | } |
509 | 0 | std::shared_ptr<LoadBlockQueue> load_block_queue; |
510 | 0 | { |
511 | 0 | std::lock_guard<std::mutex> l(_lock); |
512 | 0 | auto it = _load_block_queues.find(instance_id); |
513 | 0 | if (it != _load_block_queues.end()) { |
514 | 0 | load_block_queue = it->second; |
515 | 0 | if (!status.ok()) { |
516 | 0 | load_block_queue->cancel(status); |
517 | 0 | } |
518 | | //close wal |
519 | 0 | RETURN_IF_ERROR(load_block_queue->close_wal()); |
520 | | // notify sync mode loads |
521 | 0 | { |
522 | 0 | std::unique_lock l2(load_block_queue->mutex); |
523 | 0 | load_block_queue->process_finish = true; |
524 | 0 | for (auto dep : load_block_queue->dependencies) { |
525 | 0 | dep->set_always_ready(); |
526 | 0 | } |
527 | 0 | } |
528 | 0 | } |
529 | 0 | _load_block_queues.erase(instance_id); |
530 | 0 | } |
531 | | // status: exec_plan_fragment result |
532 | | // st: commit txn rpc status |
533 | | // result_status: commit txn result |
534 | 0 | DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_st", { |
535 | 0 | st = Status::InternalError("LoadBlockQueue._finish_group_commit_load.err_st"); |
536 | 0 | }); |
537 | 0 | if (status.ok() && st.ok() && |
538 | 0 | (result_status.ok() || result_status.is<ErrorCode::PUBLISH_TIMEOUT>())) { |
539 | 0 | if (!config::group_commit_wait_replay_wal_finish) { |
540 | 0 | auto delete_st = _exec_env->wal_mgr()->delete_wal(table_id, txn_id); |
541 | 0 | if (!delete_st.ok()) { |
542 | 0 | LOG(WARNING) << "fail to delete wal " << txn_id << ", st=" << delete_st.to_string(); |
543 | 0 | } |
544 | 0 | } |
545 | 0 | } else { |
546 | 0 | std::string wal_path; |
547 | 0 | RETURN_IF_ERROR(_exec_env->wal_mgr()->get_wal_path(txn_id, wal_path)); |
548 | 0 | RETURN_IF_ERROR(_exec_env->wal_mgr()->add_recover_wal(db_id, table_id, txn_id, wal_path)); |
549 | 0 | } |
550 | 0 | std::stringstream ss; |
551 | 0 | ss << "finish group commit, db_id=" << db_id << ", table_id=" << table_id << ", label=" << label |
552 | 0 | << ", txn_id=" << txn_id << ", instance_id=" << print_id(instance_id) |
553 | 0 | << ", exec_plan_fragment status=" << status.to_string() |
554 | 0 | << ", commit/abort txn rpc status=" << st.to_string() |
555 | 0 | << ", commit/abort txn status=" << result_status.to_string() |
556 | 0 | << ", this group commit includes " << load_block_queue->group_commit_load_count << " loads" |
557 | 0 | << ", flush because meet " |
558 | 0 | << (load_block_queue->data_size_condition ? "data size " : "time ") << "condition" |
559 | 0 | << ", wal space info:" << ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string(); |
560 | 0 | if (state) { |
561 | 0 | if (!state->get_error_log_file_path().empty()) { |
562 | 0 | ss << ", error_url=" << state->get_error_log_file_path(); |
563 | 0 | } |
564 | 0 | if (!state->get_first_error_msg().empty()) { |
565 | 0 | ss << ", first_error_msg=" << state->get_first_error_msg(); |
566 | 0 | } |
567 | 0 | ss << ", rows=" << state->num_rows_load_success(); |
568 | 0 | } |
569 | 0 | LOG(INFO) << ss.str(); |
570 | 0 | DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg", { |
571 | 0 | if (dp->param<int64_t>("table_id", -1) == table_id) { |
572 | 0 | std ::string msg = _exec_env->wal_mgr()->get_wal_dirs_info_string(); |
573 | 0 | LOG(INFO) << "table_id" << std::to_string(table_id) << " set debug promise: " << msg; |
574 | 0 | ExecEnv::GetInstance()->group_commit_mgr()->debug_promise.set_value( |
575 | 0 | Status ::InternalError(msg)); |
576 | 0 | } |
577 | 0 | };); |
578 | 0 | return st; |
579 | 0 | } |
580 | | |
581 | | Status GroupCommitTable::_exec_plan_fragment(int64_t db_id, int64_t table_id, |
582 | | const std::string& label, int64_t txn_id, |
583 | 0 | const TPipelineFragmentParams& pipeline_params) { |
584 | 0 | auto finish_cb = [db_id, table_id, label, txn_id, this](RuntimeState* state, Status* status) { |
585 | 0 | DCHECK(state); |
586 | 0 | auto finish_st = _finish_group_commit_load(db_id, table_id, label, txn_id, |
587 | 0 | state->fragment_instance_id(), *status, state); |
588 | 0 | if (!finish_st.ok()) { |
589 | 0 | LOG(WARNING) << "finish group commit error, label=" << label |
590 | 0 | << ", st=" << finish_st.to_string(); |
591 | 0 | } |
592 | 0 | }; |
593 | |
|
594 | 0 | TPipelineFragmentParamsList mocked; |
595 | 0 | return _exec_env->fragment_mgr()->exec_plan_fragment( |
596 | 0 | pipeline_params, QuerySource::GROUP_COMMIT_LOAD, finish_cb, mocked); |
597 | 0 | } |
598 | | |
599 | | Status GroupCommitTable::get_load_block_queue(const TUniqueId& instance_id, |
600 | | std::shared_ptr<LoadBlockQueue>& load_block_queue, |
601 | 0 | std::shared_ptr<Dependency> get_block_dep) { |
602 | 0 | std::unique_lock l(_lock); |
603 | 0 | auto it = _load_block_queues.find(instance_id); |
604 | 0 | if (it == _load_block_queues.end()) { |
605 | 0 | return Status::InternalError("group commit load instance " + print_id(instance_id) + |
606 | 0 | " not found"); |
607 | 0 | } |
608 | 0 | load_block_queue = it->second; |
609 | 0 | load_block_queue->append_read_dependency(get_block_dep); |
610 | 0 | return Status::OK(); |
611 | 0 | } |
612 | | |
613 | 0 | GroupCommitMgr::GroupCommitMgr(ExecEnv* exec_env) : _exec_env(exec_env) { |
614 | 0 | static_cast<void>(ThreadPoolBuilder("GroupCommitThreadPool") |
615 | 0 | .set_min_threads(1) |
616 | 0 | .set_max_threads(config::group_commit_insert_threads) |
617 | 0 | .build(&_thread_pool)); |
618 | 0 | _all_block_queues_bytes = std::make_shared<std::atomic_size_t>(0); |
619 | 0 | } |
620 | | |
621 | 0 | GroupCommitMgr::~GroupCommitMgr() { |
622 | 0 | LOG(INFO) << "GroupCommitMgr is destoried"; |
623 | 0 | } |
624 | | |
625 | 0 | void GroupCommitMgr::stop() { |
626 | 0 | _thread_pool->shutdown(); |
627 | 0 | LOG(INFO) << "GroupCommitMgr is stopped"; |
628 | 0 | } |
629 | | |
630 | | Status GroupCommitMgr::get_first_block_load_queue( |
631 | | int64_t db_id, int64_t table_id, int64_t base_schema_version, int64_t index_size, |
632 | | const UniqueId& load_id, std::shared_ptr<LoadBlockQueue>& load_block_queue, |
633 | | int be_exe_version, std::shared_ptr<MemTrackerLimiter> mem_tracker, |
634 | 0 | std::shared_ptr<Dependency> create_plan_dep, std::shared_ptr<Dependency> put_block_dep) { |
635 | 0 | std::shared_ptr<GroupCommitTable> group_commit_table; |
636 | 0 | { |
637 | 0 | std::lock_guard wlock(_lock); |
638 | 0 | if (_table_map.find(table_id) == _table_map.end()) { |
639 | 0 | _table_map.emplace(table_id, std::make_shared<GroupCommitTable>( |
640 | 0 | _exec_env, _thread_pool.get(), db_id, table_id, |
641 | 0 | _all_block_queues_bytes)); |
642 | 0 | } |
643 | 0 | group_commit_table = _table_map[table_id]; |
644 | 0 | } |
645 | 0 | RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue( |
646 | 0 | table_id, base_schema_version, index_size, load_id, load_block_queue, be_exe_version, |
647 | 0 | mem_tracker, create_plan_dep, put_block_dep)); |
648 | 0 | return Status::OK(); |
649 | 0 | } |
650 | | |
651 | | Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& instance_id, |
652 | | std::shared_ptr<LoadBlockQueue>& load_block_queue, |
653 | 0 | std::shared_ptr<Dependency> get_block_dep) { |
654 | 0 | std::shared_ptr<GroupCommitTable> group_commit_table; |
655 | 0 | { |
656 | 0 | std::lock_guard<std::mutex> l(_lock); |
657 | 0 | auto it = _table_map.find(table_id); |
658 | 0 | if (it == _table_map.end()) { |
659 | 0 | return Status::NotFound("table_id: " + std::to_string(table_id) + |
660 | 0 | ", instance_id: " + print_id(instance_id) + " dose not exist"); |
661 | 0 | } |
662 | 0 | group_commit_table = it->second; |
663 | 0 | } |
664 | 0 | return group_commit_table->get_load_block_queue(instance_id, load_block_queue, get_block_dep); |
665 | 0 | } |
666 | | |
667 | 0 | void GroupCommitMgr::remove_load_id(int64_t table_id, const UniqueId& load_id) { |
668 | 0 | std::lock_guard wlock(_lock); |
669 | 0 | if (_table_map.find(table_id) != _table_map.end()) { |
670 | 0 | _table_map.find(table_id)->second->remove_load_id(load_id); |
671 | 0 | } |
672 | 0 | } |
673 | | |
674 | | Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, |
675 | | const std::string& import_label, WalManager* wal_manager, |
676 | 0 | std::vector<TSlotDescriptor>& slot_desc, int be_exe_version) { |
677 | 0 | std::string real_label = config::group_commit_wait_replay_wal_finish |
678 | 0 | ? import_label + "_test_wait" |
679 | 0 | : import_label; |
680 | 0 | RETURN_IF_ERROR(ExecEnv::GetInstance()->wal_mgr()->create_wal_path( |
681 | 0 | db_id, tb_id, wal_id, real_label, _wal_base_path, WAL_VERSION)); |
682 | 0 | _v_wal_writer = std::make_shared<VWalWriter>(db_id, tb_id, wal_id, real_label, wal_manager, |
683 | 0 | slot_desc, be_exe_version); |
684 | 0 | return _v_wal_writer->init(); |
685 | 0 | } |
686 | | |
687 | 0 | Status LoadBlockQueue::close_wal() { |
688 | 0 | if (_v_wal_writer != nullptr) { |
689 | 0 | RETURN_IF_ERROR(_v_wal_writer->close()); |
690 | 0 | } |
691 | 0 | return Status::OK(); |
692 | 0 | } |
693 | | |
694 | 0 | void LoadBlockQueue::append_dependency(std::shared_ptr<Dependency> finish_dep) { |
695 | 0 | std::lock_guard<std::mutex> lock(mutex); |
696 | | // If not finished, dependencies should be blocked. |
697 | 0 | if (!process_finish) { |
698 | 0 | finish_dep->block(); |
699 | 0 | dependencies.push_back(finish_dep); |
700 | 0 | } |
701 | 0 | } |
702 | | |
703 | 0 | void LoadBlockQueue::append_read_dependency(std::shared_ptr<Dependency> read_dep) { |
704 | 0 | std::lock_guard<std::mutex> lock(mutex); |
705 | 0 | _read_deps.push_back(read_dep); |
706 | 0 | } |
707 | | |
708 | 0 | bool LoadBlockQueue::has_enough_wal_disk_space(size_t estimated_wal_bytes) { |
709 | 0 | DBUG_EXECUTE_IF("LoadBlockQueue.has_enough_wal_disk_space.low_space", { return false; }); |
710 | 0 | auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr(); |
711 | 0 | size_t available_bytes = 0; |
712 | 0 | { |
713 | 0 | Status st = wal_mgr->get_wal_dir_available_size(_wal_base_path, &available_bytes); |
714 | 0 | if (!st.ok()) { |
715 | 0 | LOG(WARNING) << "get wal dir available size failed, st=" << st.to_string(); |
716 | 0 | } |
717 | 0 | } |
718 | 0 | if (estimated_wal_bytes < available_bytes) { |
719 | 0 | Status st = |
720 | 0 | wal_mgr->update_wal_dir_estimated_wal_bytes(_wal_base_path, estimated_wal_bytes, 0); |
721 | 0 | if (!st.ok()) { |
722 | 0 | LOG(WARNING) << "update wal dir estimated_wal_bytes failed, reason: " << st.to_string(); |
723 | 0 | } |
724 | 0 | return true; |
725 | 0 | } else { |
726 | 0 | return false; |
727 | 0 | } |
728 | 0 | } |
729 | | #include "common/compile_check_end.h" |
730 | | |
731 | | } // namespace doris |