be/src/load/group_commit/group_commit_mgr.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "load/group_commit/group_commit_mgr.h" |
19 | | |
20 | | #include <gen_cpp/Types_types.h> |
21 | | #include <glog/logging.h> |
22 | | |
23 | | #include <chrono> |
24 | | |
25 | | #include "cloud/config.h" |
26 | | #include "common/compiler_util.h" |
27 | | #include "common/config.h" |
28 | | #include "common/status.h" |
29 | | #include "exec/pipeline/dependency.h" |
30 | | #include "runtime/exec_env.h" |
31 | | #include "runtime/fragment_mgr.h" |
32 | | #include "runtime/memory/mem_tracker_limiter.h" |
33 | | #include "runtime/thread_context.h" |
34 | | #include "util/client_cache.h" |
35 | | #include "util/debug_points.h" |
36 | | #include "util/thrift_rpc_helper.h" |
37 | | #include "util/time.h" |
38 | | |
39 | | namespace doris { |
40 | | |
41 | | bvar::Adder<uint64_t> group_commit_block_by_memory_counter("group_commit_block_by_memory_counter"); |
42 | | |
43 | 0 | std::string LoadBlockQueue::_get_load_ids() { |
44 | 0 | std::stringstream ss; |
45 | 0 | ss << "["; |
46 | 0 | for (auto& id : _load_ids_to_write_dep) { |
47 | 0 | ss << id.first.to_string() << ", "; |
48 | 0 | } |
49 | 0 | ss << "]"; |
50 | 0 | return ss.str(); |
51 | 0 | } |
52 | | |
53 | | Status LoadBlockQueue::add_block(RuntimeState* runtime_state, std::shared_ptr<Block> block, |
54 | 0 | bool write_wal, UniqueId& load_id) { |
55 | 0 | DBUG_EXECUTE_IF("LoadBlockQueue.add_block.failed", |
56 | 0 | { return Status::InternalError("LoadBlockQueue.add_block.failed"); }); |
57 | 0 | std::unique_lock l(mutex); |
58 | 0 | RETURN_IF_ERROR(status); |
59 | 0 | if (UNLIKELY(runtime_state->is_cancelled())) { |
60 | 0 | return runtime_state->cancel_reason(); |
61 | 0 | } |
62 | 0 | RETURN_IF_ERROR(status); |
63 | 0 | LOG(INFO) << "query_id: " << print_id(runtime_state->query_id()) |
64 | 0 | << ", add block rows=" << block->rows() << ", use group_commit label=" << label; |
65 | 0 | DBUG_EXECUTE_IF("LoadBlockQueue.add_block.block", DBUG_BLOCK); |
66 | 0 | if (block->rows() > 0) { |
67 | 0 | if (!config::group_commit_wait_replay_wal_finish) { |
68 | 0 | _block_queue.emplace_back(block); |
69 | 0 | _data_bytes += block->bytes(); |
70 | 0 | size_t before_block_queues_bytes = _all_block_queues_bytes->load(); |
71 | 0 | _all_block_queues_bytes->fetch_add(block->bytes(), std::memory_order_relaxed); |
72 | 0 | VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::add_block). " |
73 | 0 | << "Cur block rows=" << block->rows() << ", bytes=" << block->bytes() |
74 | 0 | << ". all block queues bytes from " << before_block_queues_bytes << " to " |
75 | 0 | << _all_block_queues_bytes->load() << ", queue size=" << _block_queue.size() |
76 | 0 | << ". txn_id=" << txn_id << ", label=" << label |
77 | 0 | << ", instance_id=" << load_instance_id << ", load_ids=" << _get_load_ids(); |
78 | 0 | } |
79 | 0 | if (write_wal || config::group_commit_wait_replay_wal_finish) { |
80 | 0 | auto st = _v_wal_writer->write_wal(block.get()); |
81 | 0 | if (!st.ok()) { |
82 | 0 | _cancel_without_lock(st); |
83 | 0 | return st; |
84 | 0 | } |
85 | 0 | } |
86 | 0 | if (!runtime_state->is_cancelled() && status.ok() && |
87 | 0 | _all_block_queues_bytes->load(std::memory_order_relaxed) >= |
88 | 0 | config::group_commit_queue_mem_limit) { |
89 | 0 | group_commit_block_by_memory_counter << 1; |
90 | 0 | auto dep_it = _load_ids_to_write_dep.find(load_id); |
91 | 0 | DCHECK(dep_it != _load_ids_to_write_dep.end()); |
92 | 0 | if (dep_it != _load_ids_to_write_dep.end() && dep_it->second) { |
93 | 0 | dep_it->second->block(); |
94 | 0 | VLOG_DEBUG << "block add_block for load_id=" << load_id << ", memory=" |
95 | 0 | << _all_block_queues_bytes->load(std::memory_order_relaxed) |
96 | 0 | << ". inner load_id=" << load_instance_id << ", label=" << label; |
97 | 0 | } |
98 | 0 | } |
99 | 0 | } |
100 | 0 | if (!_need_commit.load()) { |
101 | 0 | if (_data_bytes >= _group_commit_data_bytes) { |
102 | 0 | VLOG_DEBUG << "group commit meets commit condition for data size, label=" << label |
103 | 0 | << ", instance_id=" << load_instance_id << ", data_bytes=" << _data_bytes; |
104 | 0 | _need_commit.store(true); |
105 | 0 | data_size_condition = true; |
106 | 0 | } |
107 | 0 | if (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - |
108 | 0 | _start_time) |
109 | 0 | .count() >= _group_commit_interval_ms) { |
110 | 0 | VLOG_DEBUG << "group commit meets commit condition for time interval, label=" << label |
111 | 0 | << ", instance_id=" << load_instance_id << ", data_bytes=" << _data_bytes; |
112 | 0 | _need_commit.store(true); |
113 | 0 | } |
114 | 0 | } |
115 | 0 | for (auto read_dep : _read_deps) { |
116 | 0 | read_dep->set_ready(); |
117 | 0 | VLOG_DEBUG << "set ready for inner load_id=" << load_instance_id; |
118 | 0 | } |
119 | 0 | return Status::OK(); |
120 | 0 | } |
121 | | |
122 | | Status LoadBlockQueue::get_block(RuntimeState* runtime_state, Block* block, bool* find_block, |
123 | 0 | bool* eos, std::shared_ptr<Dependency> get_block_dep) { |
124 | 0 | *find_block = false; |
125 | 0 | *eos = false; |
126 | 0 | std::unique_lock l(mutex); |
127 | 0 | if (runtime_state->is_cancelled() || !status.ok()) { |
128 | 0 | auto st = runtime_state->cancel_reason(); |
129 | 0 | _cancel_without_lock(st); |
130 | 0 | return status; |
131 | 0 | } |
132 | 0 | auto duration = std::chrono::duration_cast<std::chrono::milliseconds>( |
133 | 0 | std::chrono::steady_clock::now() - _start_time) |
134 | 0 | .count(); |
135 | 0 | if (!_need_commit.load() && duration >= _group_commit_interval_ms) { |
136 | 0 | _need_commit.store(true); |
137 | 0 | } |
138 | 0 | if (_block_queue.empty()) { |
139 | 0 | if (_need_commit.load() && duration >= 10 * _group_commit_interval_ms) { |
140 | 0 | auto last_print_duration = std::chrono::duration_cast<std::chrono::milliseconds>( |
141 | 0 | std::chrono::steady_clock::now() - _last_print_time) |
142 | 0 | .count(); |
143 | 0 | if (last_print_duration >= 10000) { |
144 | 0 | _last_print_time = std::chrono::steady_clock::now(); |
145 | 0 | LOG(INFO) << "find one group_commit need to commit, txn_id=" << txn_id |
146 | 0 | << ", label=" << label << ", instance_id=" << load_instance_id |
147 | 0 | << ", duration=" << duration << ", load_ids=" << _get_load_ids(); |
148 | 0 | } |
149 | 0 | } |
150 | 0 | VLOG_DEBUG << "get_block for inner load_id=" << load_instance_id << ", but queue is empty"; |
151 | 0 | if (!_need_commit.load()) { |
152 | 0 | get_block_dep->block(); |
153 | 0 | VLOG_DEBUG << "block get_block for inner load_id=" << load_instance_id; |
154 | 0 | } |
155 | 0 | } else { |
156 | 0 | const BlockData block_data = _block_queue.front(); |
157 | 0 | block->swap(*block_data.block); |
158 | 0 | *find_block = true; |
159 | 0 | _block_queue.pop_front(); |
160 | 0 | size_t before_block_queues_bytes = _all_block_queues_bytes->load(); |
161 | 0 | _all_block_queues_bytes->fetch_sub(block_data.block_bytes, std::memory_order_relaxed); |
162 | 0 | VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::get_block). " |
163 | 0 | << "Cur block rows=" << block->rows() << ", bytes=" << block->bytes() |
164 | 0 | << ". all block queues bytes from " << before_block_queues_bytes << " to " |
165 | 0 | << _all_block_queues_bytes->load() << ", queue size=" << _block_queue.size() |
166 | 0 | << ". txn_id=" << txn_id << ", label=" << label |
167 | 0 | << ", instance_id=" << load_instance_id << ", load_ids=" << _get_load_ids(); |
168 | 0 | } |
169 | 0 | if (_block_queue.empty() && _need_commit.load() && _load_ids_to_write_dep.empty()) { |
170 | 0 | *eos = true; |
171 | 0 | } else { |
172 | 0 | *eos = false; |
173 | 0 | } |
174 | 0 | if (_all_block_queues_bytes->load(std::memory_order_relaxed) < |
175 | 0 | config::group_commit_queue_mem_limit) { |
176 | 0 | for (auto& id : _load_ids_to_write_dep) { |
177 | 0 | id.second->set_ready(); |
178 | 0 | } |
179 | 0 | VLOG_DEBUG << "set ready for load_ids=" << _get_load_ids() |
180 | 0 | << ". inner load_id=" << load_instance_id << ", label=" << label; |
181 | 0 | } |
182 | 0 | return Status::OK(); |
183 | 0 | } |
184 | | |
185 | 0 | Status LoadBlockQueue::remove_load_id(const UniqueId& load_id) { |
186 | 0 | std::unique_lock l(mutex); |
187 | 0 | if (_load_ids_to_write_dep.find(load_id) != _load_ids_to_write_dep.end()) { |
188 | 0 | _load_ids_to_write_dep[load_id]->set_always_ready(); |
189 | 0 | _load_ids_to_write_dep.erase(load_id); |
190 | 0 | for (auto read_dep : _read_deps) { |
191 | 0 | read_dep->set_ready(); |
192 | 0 | } |
193 | 0 | VLOG_DEBUG << "set ready for load_id=" << load_id << ", inner load_id=" << load_instance_id; |
194 | 0 | return Status::OK(); |
195 | 0 | } |
196 | 0 | return Status::NotFound<false>("load_id=" + load_id.to_string() + |
197 | 0 | " not in block queue, label=" + label); |
198 | 0 | } |
199 | | |
200 | 0 | bool LoadBlockQueue::contain_load_id(const UniqueId& load_id) { |
201 | 0 | std::unique_lock l(mutex); |
202 | 0 | return _load_ids_to_write_dep.find(load_id) != _load_ids_to_write_dep.end(); |
203 | 0 | } |
204 | | |
205 | | Status LoadBlockQueue::add_load_id(const UniqueId& load_id, |
206 | 0 | const std::shared_ptr<Dependency> put_block_dep) { |
207 | 0 | std::unique_lock l(mutex); |
208 | 0 | if (_need_commit.load() || !status.ok() || process_finish.load()) { |
209 | 0 | return Status::InternalError<false>( |
210 | 0 | "block queue cannot add load id, id=" + load_instance_id.to_string() + |
211 | 0 | ", need_commit=" + (_need_commit.load() ? "true" : "false") + |
212 | 0 | ", process_finish=" + (process_finish.load() ? "true" : "false") + |
213 | 0 | ", queue_status=" + status.to_string()); |
214 | 0 | } |
215 | 0 | _load_ids_to_write_dep[load_id] = put_block_dep; |
216 | 0 | group_commit_load_count.fetch_add(1); |
217 | 0 | return Status::OK(); |
218 | 0 | } |
219 | | |
220 | 0 | void LoadBlockQueue::cancel(const Status& st) { |
221 | 0 | DCHECK(!st.ok()); |
222 | 0 | std::unique_lock l(mutex); |
223 | 0 | _cancel_without_lock(st); |
224 | 0 | } |
225 | | |
226 | 0 | void LoadBlockQueue::_cancel_without_lock(const Status& st) { |
227 | 0 | LOG(INFO) << "cancel group_commit, instance_id=" << load_instance_id << ", label=" << label |
228 | 0 | << ", status=" << st.to_string(); |
229 | 0 | status = |
230 | 0 | Status::Cancelled("cancel group_commit, label=" + label + ", status=" + st.to_string()); |
231 | 0 | size_t before_block_queues_bytes = _all_block_queues_bytes->load(); |
232 | 0 | while (!_block_queue.empty()) { |
233 | 0 | const BlockData& block_data = _block_queue.front(); |
234 | 0 | _all_block_queues_bytes->fetch_sub(block_data.block_bytes, std::memory_order_relaxed); |
235 | 0 | _block_queue.pop_front(); |
236 | 0 | } |
237 | 0 | VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::_cancel_without_block). " |
238 | 0 | << "all block queues bytes from " << before_block_queues_bytes << " to " |
239 | 0 | << _all_block_queues_bytes->load() << ", queue size=" << _block_queue.size() |
240 | 0 | << ", txn_id=" << txn_id << ", label=" << label |
241 | 0 | << ", instance_id=" << load_instance_id << ", load_ids=" << _get_load_ids(); |
242 | 0 | for (auto& id : _load_ids_to_write_dep) { |
243 | 0 | id.second->set_always_ready(); |
244 | 0 | } |
245 | 0 | for (auto read_dep : _read_deps) { |
246 | 0 | read_dep->set_ready(); |
247 | 0 | } |
248 | 0 | VLOG_DEBUG << "set ready for load_ids=" << _get_load_ids() |
249 | 0 | << ", inner load_id=" << load_instance_id; |
250 | 0 | } |
251 | | |
252 | | Status GroupCommitTable::get_first_block_load_queue( |
253 | | int64_t table_id, int64_t base_schema_version, int64_t index_size, const UniqueId& load_id, |
254 | | std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version, |
255 | 0 | std::shared_ptr<Dependency> create_plan_dep, std::shared_ptr<Dependency> put_block_dep) { |
256 | 0 | DCHECK(table_id == _table_id); |
257 | 0 | std::unique_lock l(_lock); |
258 | 0 | auto try_to_get_matched_queue = [&]() -> Status { |
259 | 0 | for (const auto& [_, inner_block_queue] : _load_block_queues) { |
260 | 0 | if (inner_block_queue->contain_load_id(load_id)) { |
261 | 0 | load_block_queue = inner_block_queue; |
262 | 0 | return Status::OK(); |
263 | 0 | } |
264 | 0 | } |
265 | 0 | for (const auto& [_, inner_block_queue] : _load_block_queues) { |
266 | 0 | if (!inner_block_queue->need_commit()) { |
267 | 0 | if (base_schema_version == inner_block_queue->schema_version && |
268 | 0 | index_size == inner_block_queue->index_size) { |
269 | 0 | if (inner_block_queue->add_load_id(load_id, put_block_dep).ok()) { |
270 | 0 | load_block_queue = inner_block_queue; |
271 | 0 | return Status::OK(); |
272 | 0 | } |
273 | 0 | } else { |
274 | 0 | return Status::DataQualityError<false>( |
275 | 0 | "schema version not match, maybe a schema change is in process. " |
276 | 0 | "Please retry this load manually."); |
277 | 0 | } |
278 | 0 | } |
279 | 0 | } |
280 | 0 | return Status::InternalError<false>("can not get a block queue for table_id: " + |
281 | 0 | std::to_string(_table_id) + _create_plan_failed_reason); |
282 | 0 | }; |
283 | |
|
284 | 0 | if (try_to_get_matched_queue().ok()) { |
285 | 0 | return Status::OK(); |
286 | 0 | } |
287 | 0 | create_plan_dep->block(); |
288 | 0 | _create_plan_be_exe_version = be_exe_version; |
289 | 0 | if (_create_plan_deps.empty()) { |
290 | 0 | _create_plan_start_time_ms = MonotonicMillis(); |
291 | 0 | } |
292 | 0 | _create_plan_deps.emplace(load_id, std::make_tuple(create_plan_dep, put_block_dep, |
293 | 0 | base_schema_version, index_size)); |
294 | 0 | [[maybe_unused]] auto submit_st = _submit_create_group_commit_load(); |
295 | 0 | return try_to_get_matched_queue(); |
296 | 0 | } |
297 | | |
298 | 0 | Status GroupCommitTable::submit_create_group_commit_load() { |
299 | 0 | std::unique_lock l(_lock); |
300 | 0 | if (_create_plan_deps.empty()) { |
301 | 0 | return Status::OK(); |
302 | 0 | } |
303 | 0 | return _submit_create_group_commit_load(); |
304 | 0 | } |
305 | | |
306 | 0 | Status GroupCommitTable::_submit_create_group_commit_load() { |
307 | 0 | if (_is_creating_plan_fragment) { |
308 | 0 | return Status::OK(); |
309 | 0 | } |
310 | | |
311 | 0 | int64_t timeout_ms = config::group_commit_create_plan_timeout_ms; |
312 | 0 | if (timeout_ms > 0 && !_create_plan_deps.empty()) { |
313 | 0 | int64_t now_ms = MonotonicMillis(); |
314 | 0 | if (_create_plan_start_time_ms > 0 && now_ms - _create_plan_start_time_ms > timeout_ms) { |
315 | 0 | _create_plan_failed_reason = |
316 | 0 | ". group commit create plan timeout after " + std::to_string(timeout_ms) + "ms"; |
317 | 0 | for (const auto& [id, load_info] : _create_plan_deps) { |
318 | 0 | std::get<0>(load_info)->set_ready(); |
319 | 0 | } |
320 | 0 | _create_plan_deps.clear(); |
321 | 0 | _create_plan_start_time_ms = 0; |
322 | 0 | return Status::OK(); |
323 | 0 | } |
324 | 0 | } |
325 | | |
326 | 0 | auto mem_tracker = _group_commit_mgr->group_commit_mem_tracker(); |
327 | 0 | int be_exe_version = _create_plan_be_exe_version; |
328 | 0 | _is_creating_plan_fragment = true; |
329 | 0 | auto submit_st = _thread_pool->submit_func([&, be_exe_version, mem_tracker] { |
330 | 0 | std::shared_ptr<LoadBlockQueue> created_load_block_queue; |
331 | 0 | Status create_group_commit_st = Status::OK(); |
332 | 0 | std::string create_plan_failed_reason; |
333 | 0 | Defer defer {[&]() { |
334 | 0 | bool need_resubmit = !create_group_commit_st.ok(); |
335 | 0 | std::unique_lock l(_lock); |
336 | 0 | _is_creating_plan_fragment = false; |
337 | 0 | _create_plan_failed_reason = create_plan_failed_reason; |
338 | 0 | if (created_load_block_queue && create_group_commit_st.ok() && |
339 | 0 | !created_load_block_queue->need_commit()) { |
340 | 0 | std::vector<UniqueId> success_load_ids; |
341 | 0 | for (const auto& [id, load_info] : _create_plan_deps) { |
342 | 0 | auto create_dep = std::get<0>(load_info); |
343 | 0 | auto put_dep = std::get<1>(load_info); |
344 | 0 | if (created_load_block_queue->schema_version == std::get<2>(load_info) && |
345 | 0 | created_load_block_queue->index_size == std::get<3>(load_info)) { |
346 | 0 | auto st = created_load_block_queue->add_load_id(id, put_dep); |
347 | 0 | if (!st.ok()) { |
348 | 0 | LOG(WARNING) << "failed to add pending load_id into created " |
349 | 0 | "group commit queue, load_id=" |
350 | 0 | << id << ", label=" << created_load_block_queue->label |
351 | 0 | << ", status=" << st.to_string(); |
352 | 0 | need_resubmit = true; |
353 | 0 | } else { |
354 | 0 | create_dep->set_ready(); |
355 | 0 | success_load_ids.emplace_back(id); |
356 | 0 | } |
357 | 0 | } else if (created_load_block_queue->schema_version > std::get<2>(load_info) || |
358 | 0 | (created_load_block_queue->schema_version == |
359 | 0 | std::get<2>(load_info) && |
360 | 0 | created_load_block_queue->index_size != std::get<3>(load_info))) { |
361 | | // schema version mismatch: |
362 | | // 1. the schema version of created load block queue is newer than the load request |
363 | | // 2. the index size is not equal |
364 | | // set ready for the load request to let it fail |
365 | 0 | create_dep->set_ready(); |
366 | 0 | success_load_ids.emplace_back(id); |
367 | 0 | } |
368 | 0 | } |
369 | 0 | for (const auto& id : success_load_ids) { |
370 | 0 | _create_plan_deps.erase(id); |
371 | 0 | } |
372 | 0 | if (_create_plan_deps.empty()) { |
373 | 0 | _create_plan_start_time_ms = 0; |
374 | 0 | } |
375 | 0 | } |
376 | 0 | if (!_create_plan_deps.empty()) { |
377 | 0 | need_resubmit = true; |
378 | 0 | } |
379 | 0 | if (need_resubmit && _group_commit_mgr) { |
380 | 0 | LOG(INFO) << "resubmit create group commit load task for table: " << _table_id; |
381 | 0 | _group_commit_mgr->add_need_create_plan_table(_table_id); |
382 | 0 | } |
383 | 0 | }}; |
384 | 0 | create_group_commit_st = |
385 | 0 | _create_group_commit_load(be_exe_version, mem_tracker, created_load_block_queue); |
386 | 0 | if (!create_group_commit_st.ok()) { |
387 | 0 | LOG(WARNING) << "create group commit load error: " |
388 | 0 | << create_group_commit_st.to_string(); |
389 | 0 | create_plan_failed_reason = ". create group commit load error: " + |
390 | 0 | create_group_commit_st.to_string().substr(0, 300); |
391 | 0 | } else { |
392 | 0 | create_plan_failed_reason = ""; |
393 | 0 | } |
394 | 0 | }); |
395 | 0 | if (!submit_st.ok()) { |
396 | 0 | _is_creating_plan_fragment = false; |
397 | 0 | _create_plan_failed_reason = |
398 | 0 | ". create group commit load error: submit create group commit load task failed: " + |
399 | 0 | submit_st.to_string().substr(0, 300); |
400 | 0 | for (const auto& [id, load_info] : _create_plan_deps) { |
401 | 0 | std::get<0>(load_info)->set_ready(); |
402 | 0 | } |
403 | 0 | _create_plan_deps.clear(); |
404 | 0 | LOG(WARNING) << "submit create group commit load task for table: " << _table_id |
405 | 0 | << ", error: " << submit_st.to_string(); |
406 | 0 | } |
407 | 0 | return submit_st; |
408 | 0 | } |
409 | | |
410 | 0 | void GroupCommitTable::remove_load_id(const UniqueId& load_id) { |
411 | 0 | std::unique_lock l(_lock); |
412 | 0 | if (_create_plan_deps.find(load_id) != _create_plan_deps.end()) { |
413 | 0 | _create_plan_deps.erase(load_id); |
414 | 0 | return; |
415 | 0 | } |
416 | 0 | for (const auto& [_, inner_block_queue] : _load_block_queues) { |
417 | 0 | if (inner_block_queue->remove_load_id(load_id).ok()) { |
418 | 0 | return; |
419 | 0 | } |
420 | 0 | } |
421 | 0 | } |
422 | | |
423 | | Status GroupCommitTable::_create_group_commit_load( |
424 | | int be_exe_version, const std::shared_ptr<MemTrackerLimiter>& mem_tracker, |
425 | 0 | std::shared_ptr<LoadBlockQueue>& created_load_block_queue) { |
426 | 0 | Status st = Status::OK(); |
427 | 0 | TStreamLoadPutResult result; |
428 | 0 | std::string label; |
429 | 0 | int64_t txn_id; |
430 | 0 | TUniqueId instance_id; |
431 | 0 | { |
432 | 0 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker); |
433 | 0 | UniqueId load_id = UniqueId::gen_uid(); |
434 | 0 | TUniqueId tload_id; |
435 | 0 | tload_id.__set_hi(load_id.hi); |
436 | 0 | tload_id.__set_lo(load_id.lo); |
437 | 0 | std::regex reg("-"); |
438 | 0 | label = "group_commit_" + std::regex_replace(load_id.to_string(), reg, "_"); |
439 | 0 | std::stringstream ss; |
440 | 0 | ss << "insert into doris_internal_table_id(" << _table_id << ") WITH LABEL " << label |
441 | 0 | << " select * from group_commit(\"table_id\"=\"" << _table_id << "\")"; |
442 | 0 | TStreamLoadPutRequest request; |
443 | 0 | request.__set_load_sql(ss.str()); |
444 | 0 | request.__set_loadId(tload_id); |
445 | 0 | request.__set_label(label); |
446 | 0 | request.__set_token("group_commit"); // this is a fake, fe not check it now |
447 | 0 | request.__set_max_filter_ratio(1.0); |
448 | 0 | request.__set_strictMode(false); |
449 | 0 | request.__set_partial_update(false); |
450 | | // this is an internal interface, use admin to pass the auth check |
451 | 0 | request.__set_user("admin"); |
452 | 0 | if (_exec_env->cluster_info()->backend_id != 0) { |
453 | 0 | request.__set_backend_id(_exec_env->cluster_info()->backend_id); |
454 | 0 | } else { |
455 | 0 | LOG(WARNING) << "_exec_env->cluster_info not set backend_id"; |
456 | 0 | } |
457 | 0 | TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; |
458 | 0 | st = ThriftRpcHelper::rpc<FrontendServiceClient>( |
459 | 0 | master_addr.hostname, master_addr.port, |
460 | 0 | [&result, &request](FrontendServiceConnection& client) { |
461 | 0 | client->streamLoadPut(result, request); |
462 | 0 | }, |
463 | 0 | 10000L); |
464 | 0 | if (!st.ok()) { |
465 | 0 | LOG(WARNING) << "create group commit load rpc error, st=" << st.to_string(); |
466 | 0 | return st; |
467 | 0 | } |
468 | 0 | st = Status::create<false>(result.status); |
469 | 0 | if (st.ok() && !result.__isset.pipeline_params) { |
470 | 0 | st = Status::InternalError("Non-pipeline is disabled!"); |
471 | 0 | } |
472 | 0 | if (!st.ok()) { |
473 | 0 | LOG(WARNING) << "create group commit load error, st=" << st.to_string(); |
474 | 0 | return st; |
475 | 0 | } |
476 | 0 | auto& pipeline_params = result.pipeline_params; |
477 | 0 | auto schema_version = pipeline_params.fragment.output_sink.olap_table_sink.schema.version; |
478 | 0 | auto index_size = |
479 | 0 | pipeline_params.fragment.output_sink.olap_table_sink.schema.indexes.size(); |
480 | 0 | DCHECK(pipeline_params.fragment.output_sink.olap_table_sink.db_id == _db_id); |
481 | 0 | txn_id = pipeline_params.txn_conf.txn_id; |
482 | 0 | DCHECK(pipeline_params.local_params.size() == 1); |
483 | 0 | instance_id = pipeline_params.local_params[0].fragment_instance_id; |
484 | 0 | VLOG_DEBUG << "create plan fragment, db_id=" << _db_id << ", table=" << _table_id |
485 | 0 | << ", schema version=" << schema_version << ", index size=" << index_size |
486 | 0 | << ", label=" << label << ", txn_id=" << txn_id |
487 | 0 | << ", instance_id=" << print_id(instance_id); |
488 | 0 | { |
489 | 0 | auto load_block_queue = std::make_shared<LoadBlockQueue>( |
490 | 0 | instance_id, label, txn_id, schema_version, index_size, _all_block_queues_bytes, |
491 | 0 | result.wait_internal_group_commit_finish, result.group_commit_interval_ms, |
492 | 0 | result.group_commit_data_bytes); |
493 | 0 | RETURN_IF_ERROR(load_block_queue->create_wal( |
494 | 0 | _db_id, _table_id, txn_id, label, _exec_env->wal_mgr(), |
495 | 0 | pipeline_params.fragment.output_sink.olap_table_sink.schema.slot_descs, |
496 | 0 | be_exe_version)); |
497 | 0 | created_load_block_queue = load_block_queue; |
498 | |
|
499 | 0 | std::unique_lock l(_lock); |
500 | 0 | _load_block_queues.emplace(instance_id, load_block_queue); |
501 | 0 | std::vector<UniqueId> success_load_ids; |
502 | 0 | for (const auto& [id, load_info] : _create_plan_deps) { |
503 | 0 | auto create_dep = std::get<0>(load_info); |
504 | 0 | auto put_dep = std::get<1>(load_info); |
505 | 0 | if (load_block_queue->schema_version == std::get<2>(load_info) && |
506 | 0 | load_block_queue->index_size == std::get<3>(load_info)) { |
507 | 0 | if (load_block_queue->add_load_id(id, put_dep).ok()) { |
508 | 0 | create_dep->set_ready(); |
509 | 0 | success_load_ids.emplace_back(id); |
510 | 0 | } |
511 | 0 | } else if (load_block_queue->schema_version > std::get<2>(load_info) || |
512 | 0 | (load_block_queue->schema_version == std::get<2>(load_info) && |
513 | 0 | load_block_queue->index_size != std::get<3>(load_info))) { |
514 | | // schema version mismatch: |
515 | | // 1. the schema version of created load block queue is newer than the load request |
516 | | // 2. the index size is not equal |
517 | | // set ready for the load request to let it fail |
518 | 0 | create_dep->set_ready(); |
519 | 0 | success_load_ids.emplace_back(id); |
520 | 0 | } |
521 | 0 | } |
522 | 0 | for (const auto& id : success_load_ids) { |
523 | 0 | _create_plan_deps.erase(id); |
524 | 0 | } |
525 | 0 | if (_create_plan_deps.empty()) { |
526 | 0 | _create_plan_start_time_ms = 0; |
527 | 0 | } |
528 | 0 | } |
529 | 0 | } |
530 | 0 | st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, result.pipeline_params); |
531 | 0 | if (!st.ok()) { |
532 | 0 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker); |
533 | 0 | auto finish_st = _finish_group_commit_load(_db_id, _table_id, label, txn_id, instance_id, |
534 | 0 | st, nullptr); |
535 | 0 | if (!finish_st.ok()) { |
536 | 0 | LOG(WARNING) << "finish group commit error, label=" << label |
537 | 0 | << ", st=" << finish_st.to_string(); |
538 | 0 | } |
539 | 0 | } |
540 | 0 | return st; |
541 | 0 | } |
542 | | |
543 | | Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_id, |
544 | | const std::string& label, int64_t txn_id, |
545 | | const TUniqueId& instance_id, Status& status, |
546 | 0 | RuntimeState* state) { |
547 | 0 | Status st; |
548 | 0 | Status result_status; |
549 | 0 | DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status", { |
550 | 0 | status = Status::InternalError("LoadBlockQueue._finish_group_commit_load.err_status"); |
551 | 0 | }); |
552 | 0 | DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.load_error", |
553 | 0 | { status = Status::InternalError("load_error"); }); |
554 | 0 | if (status.ok()) { |
555 | 0 | DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.commit_error", { |
556 | 0 | status = Status::InternalError("LoadBlockQueue._finish_group_commit_load.commit_error"); |
557 | 0 | }); |
558 | | // commit txn |
559 | 0 | TLoadTxnCommitRequest request; |
560 | | // deprecated and should be removed in 3.1, use token instead |
561 | 0 | request.__set_auth_code(0); |
562 | 0 | request.__set_token(_exec_env->cluster_info()->curr_auth_token); |
563 | 0 | request.__set_db_id(db_id); |
564 | 0 | request.__set_table_id(table_id); |
565 | 0 | request.__set_txnId(txn_id); |
566 | 0 | request.__set_thrift_rpc_timeout_ms(config::txn_commit_rpc_timeout_ms); |
567 | 0 | request.__set_groupCommit(true); |
568 | 0 | request.__set_receiveBytes(state->num_bytes_load_total()); |
569 | 0 | if (_exec_env->cluster_info()->backend_id != 0) { |
570 | 0 | request.__set_backendId(_exec_env->cluster_info()->backend_id); |
571 | 0 | } else { |
572 | 0 | LOG(WARNING) << "_exec_env->cluster_info not set backend_id"; |
573 | 0 | } |
574 | 0 | if (state) { |
575 | 0 | request.__set_commitInfos(state->tablet_commit_infos()); |
576 | 0 | } |
577 | 0 | TLoadTxnCommitResult result; |
578 | 0 | TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; |
579 | 0 | int retry_times = 0; |
580 | 0 | while (retry_times < config::mow_stream_load_commit_retry_times) { |
581 | 0 | st = ThriftRpcHelper::rpc<FrontendServiceClient>( |
582 | 0 | master_addr.hostname, master_addr.port, |
583 | 0 | [&request, &result](FrontendServiceConnection& client) { |
584 | 0 | client->loadTxnCommit(result, request); |
585 | 0 | }, |
586 | 0 | config::txn_commit_rpc_timeout_ms); |
587 | 0 | if (st.ok()) { |
588 | 0 | result_status = Status::create(result.status); |
589 | | // DELETE_BITMAP_LOCK_ERROR will be retried |
590 | 0 | if (result_status.ok() || |
591 | 0 | !result_status.is<ErrorCode::DELETE_BITMAP_LOCK_ERROR>()) { |
592 | 0 | break; |
593 | 0 | } |
594 | 0 | LOG_WARNING("Failed to commit txn on group commit") |
595 | 0 | .tag("label", label) |
596 | 0 | .tag("txn_id", txn_id) |
597 | 0 | .tag("retry_times", retry_times) |
598 | 0 | .error(result_status); |
599 | 0 | } else { |
600 | 0 | LOG_WARNING("Failed to commit txn on group commit") |
601 | 0 | .tag("label", label) |
602 | 0 | .tag("txn_id", txn_id) |
603 | 0 | .tag("retry_times", retry_times) |
604 | 0 | .error(st); |
605 | 0 | } |
606 | 0 | retry_times++; |
607 | 0 | } |
608 | 0 | DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.commit_success_and_rpc_error", |
609 | 0 | { result_status = Status::InternalError("commit_success_and_rpc_error"); }); |
610 | 0 | } else { |
611 | | // abort txn |
612 | 0 | TLoadTxnRollbackRequest request; |
613 | | // deprecated and should be removed in 3.1, use token instead |
614 | 0 | request.__set_auth_code(0); |
615 | 0 | request.__set_token(_exec_env->cluster_info()->curr_auth_token); |
616 | 0 | request.__set_db_id(db_id); |
617 | 0 | request.__set_txnId(txn_id); |
618 | 0 | request.__set_reason(status.to_string()); |
619 | 0 | TLoadTxnRollbackResult result; |
620 | 0 | TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr; |
621 | 0 | st = ThriftRpcHelper::rpc<FrontendServiceClient>( |
622 | 0 | master_addr.hostname, master_addr.port, |
623 | 0 | [&request, &result](FrontendServiceConnection& client) { |
624 | 0 | client->loadTxnRollback(result, request); |
625 | 0 | }); |
626 | 0 | if (st.ok()) { |
627 | 0 | result_status = Status::create<false>(result.status); |
628 | 0 | } |
629 | 0 | DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status", { |
630 | 0 | std ::string msg = "abort txn"; |
631 | 0 | LOG(INFO) << "debug promise set: " << msg; |
632 | 0 | ExecEnv::GetInstance()->group_commit_mgr()->debug_promise.set_value( |
633 | 0 | Status ::InternalError(msg)); |
634 | 0 | }); |
635 | 0 | } |
636 | 0 | std::shared_ptr<LoadBlockQueue> load_block_queue; |
637 | 0 | { |
638 | 0 | std::lock_guard<std::mutex> l(_lock); |
639 | 0 | auto it = _load_block_queues.find(instance_id); |
640 | 0 | if (it != _load_block_queues.end()) { |
641 | 0 | load_block_queue = it->second; |
642 | 0 | if (!status.ok()) { |
643 | 0 | load_block_queue->cancel(status); |
644 | 0 | } |
645 | | //close wal |
646 | 0 | RETURN_IF_ERROR(load_block_queue->close_wal()); |
647 | | // notify sync mode loads |
648 | 0 | { |
649 | 0 | std::unique_lock l2(load_block_queue->mutex); |
650 | 0 | load_block_queue->process_finish = true; |
651 | 0 | for (auto dep : load_block_queue->dependencies) { |
652 | 0 | dep->set_always_ready(); |
653 | 0 | } |
654 | 0 | } |
655 | 0 | } |
656 | 0 | _load_block_queues.erase(instance_id); |
657 | 0 | } |
658 | 0 | if (!load_block_queue) { |
659 | 0 | LOG(WARNING) << "finish group commit can not find load block queue, label=" << label |
660 | 0 | << ", txn_id=" << txn_id << ", instance_id=" << print_id(instance_id); |
661 | 0 | } |
662 | | // status: exec_plan_fragment result |
663 | | // st: commit txn rpc status |
664 | | // result_status: commit txn result |
665 | 0 | DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_st", { |
666 | 0 | st = Status::InternalError("LoadBlockQueue._finish_group_commit_load.err_st"); |
667 | 0 | }); |
668 | 0 | if (status.ok() && st.ok() && |
669 | 0 | (result_status.ok() || result_status.is<ErrorCode::PUBLISH_TIMEOUT>())) { |
670 | 0 | if (!config::group_commit_wait_replay_wal_finish) { |
671 | 0 | auto delete_st = _exec_env->wal_mgr()->delete_wal(table_id, txn_id); |
672 | 0 | if (!delete_st.ok()) { |
673 | 0 | LOG(WARNING) << "fail to delete wal " << txn_id << ", st=" << delete_st.to_string(); |
674 | 0 | } |
675 | 0 | } |
676 | 0 | } else { |
677 | 0 | std::string wal_path; |
678 | 0 | RETURN_IF_ERROR(_exec_env->wal_mgr()->get_wal_path(txn_id, wal_path)); |
679 | 0 | RETURN_IF_ERROR(_exec_env->wal_mgr()->add_recover_wal(db_id, table_id, txn_id, wal_path)); |
680 | 0 | } |
681 | 0 | std::stringstream ss; |
682 | 0 | ss << "finish group commit, db_id=" << db_id << ", table_id=" << table_id << ", label=" << label |
683 | 0 | << ", txn_id=" << txn_id << ", instance_id=" << print_id(instance_id) |
684 | 0 | << ", exec_plan_fragment status=" << status.to_string() |
685 | 0 | << ", commit/abort txn rpc status=" << st.to_string() |
686 | 0 | << ", commit/abort txn status=" << result_status.to_string(); |
687 | 0 | if (load_block_queue) { |
688 | 0 | ss << ", this group commit includes " << load_block_queue->group_commit_load_count |
689 | 0 | << " loads, flush because meet " |
690 | 0 | << (load_block_queue->data_size_condition ? "data size " : "time ") << "condition"; |
691 | 0 | } else { |
692 | 0 | ss << ", load block queue is missing when finishing group commit"; |
693 | 0 | } |
694 | 0 | ss << ", wal space info:" << ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string(); |
695 | 0 | if (state) { |
696 | 0 | if (!state->get_error_log_file_path().empty()) { |
697 | 0 | ss << ", error_url=" << state->get_error_log_file_path(); |
698 | 0 | } |
699 | 0 | if (!state->get_first_error_msg().empty()) { |
700 | 0 | ss << ", first_error_msg=" << state->get_first_error_msg(); |
701 | 0 | } |
702 | 0 | ss << ", rows=" << state->num_rows_load_success(); |
703 | 0 | } |
704 | 0 | LOG(INFO) << ss.str(); |
705 | 0 | DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.get_wal_back_pressure_msg", { |
706 | 0 | if (dp->param<int64_t>("table_id", -1) == table_id) { |
707 | 0 | std ::string msg = _exec_env->wal_mgr()->get_wal_dirs_info_string(); |
708 | 0 | LOG(INFO) << "table_id" << std::to_string(table_id) << " set debug promise: " << msg; |
709 | 0 | ExecEnv::GetInstance()->group_commit_mgr()->debug_promise.set_value( |
710 | 0 | Status ::InternalError(msg)); |
711 | 0 | } |
712 | 0 | };); |
713 | 0 | return st; |
714 | 0 | } |
715 | | |
716 | | Status GroupCommitTable::_exec_plan_fragment(int64_t db_id, int64_t table_id, |
717 | | const std::string& label, int64_t txn_id, |
718 | 0 | const TPipelineFragmentParams& pipeline_params) { |
719 | 0 | auto finish_cb = [db_id, table_id, label, txn_id, this](RuntimeState* state, Status* status) { |
720 | 0 | DCHECK(state); |
721 | 0 | auto finish_st = _finish_group_commit_load(db_id, table_id, label, txn_id, |
722 | 0 | state->fragment_instance_id(), *status, state); |
723 | 0 | if (!finish_st.ok()) { |
724 | 0 | LOG(WARNING) << "finish group commit error, label=" << label |
725 | 0 | << ", st=" << finish_st.to_string(); |
726 | 0 | } |
727 | 0 | }; |
728 | |
|
729 | 0 | TPipelineFragmentParamsList mocked; |
730 | 0 | return _exec_env->fragment_mgr()->exec_plan_fragment( |
731 | 0 | pipeline_params, QuerySource::GROUP_COMMIT_LOAD, finish_cb, mocked); |
732 | 0 | } |
733 | | |
734 | | Status GroupCommitTable::get_load_block_queue(const TUniqueId& instance_id, |
735 | | std::shared_ptr<LoadBlockQueue>& load_block_queue, |
736 | 0 | std::shared_ptr<Dependency> get_block_dep) { |
737 | 0 | std::unique_lock l(_lock); |
738 | 0 | auto it = _load_block_queues.find(instance_id); |
739 | 0 | if (it == _load_block_queues.end()) { |
740 | 0 | return Status::InternalError("group commit load instance " + print_id(instance_id) + |
741 | 0 | " not found"); |
742 | 0 | } |
743 | 0 | load_block_queue = it->second; |
744 | 0 | load_block_queue->append_read_dependency(get_block_dep); |
745 | 0 | return Status::OK(); |
746 | 0 | } |
747 | | |
748 | 0 | GroupCommitMgr::GroupCommitMgr(ExecEnv* exec_env) : _exec_env(exec_env) { |
749 | 0 | static_cast<void>(ThreadPoolBuilder("GroupCommitThreadPool") |
750 | 0 | .set_min_threads(1) |
751 | 0 | .set_max_threads(config::group_commit_insert_threads) |
752 | 0 | .build(&_thread_pool)); |
753 | 0 | _all_block_queues_bytes = std::make_shared<std::atomic_size_t>(0); |
754 | 0 | _group_commit_mem_tracker = |
755 | 0 | MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::LOAD, "GroupCommit"); |
756 | 0 | _create_plan_thread = std::thread(&GroupCommitMgr::_create_plan_worker, this); |
757 | 0 | } |
758 | | |
759 | 0 | GroupCommitMgr::~GroupCommitMgr() { |
760 | 0 | stop(); |
761 | 0 | LOG(INFO) << "GroupCommitMgr is destoried"; |
762 | 0 | } |
763 | | |
764 | 0 | void GroupCommitMgr::stop() { |
765 | 0 | { |
766 | 0 | std::lock_guard<std::mutex> l(_need_create_plan_lock); |
767 | 0 | if (_stopped) { |
768 | 0 | return; |
769 | 0 | } |
770 | 0 | _stopped = true; |
771 | 0 | } |
772 | 0 | _need_create_plan_cv.notify_all(); |
773 | 0 | if (_create_plan_thread.joinable()) { |
774 | 0 | _create_plan_thread.join(); |
775 | 0 | } |
776 | 0 | _thread_pool->shutdown(); |
777 | 0 | LOG(INFO) << "GroupCommitMgr is stopped"; |
778 | 0 | } |
779 | | |
780 | 0 | void GroupCommitMgr::add_need_create_plan_table(int64_t table_id) { |
781 | 0 | { |
782 | 0 | std::lock_guard<std::mutex> l(_need_create_plan_lock); |
783 | 0 | if (_stopped) { |
784 | 0 | return; |
785 | 0 | } |
786 | 0 | _need_create_plan_tables.insert(table_id); |
787 | 0 | } |
788 | 0 | _need_create_plan_cv.notify_one(); |
789 | 0 | } |
790 | | |
791 | 0 | void GroupCommitMgr::_create_plan_worker() { |
792 | 0 | SCOPED_INIT_THREAD_CONTEXT(); |
793 | 0 | while (true) { |
794 | 0 | std::set<int64_t> need_create_plan_tables; |
795 | 0 | { |
796 | 0 | std::unique_lock<std::mutex> l(_need_create_plan_lock); |
797 | 0 | _need_create_plan_cv.wait( |
798 | 0 | l, [this] { return _stopped || !_need_create_plan_tables.empty(); }); |
799 | 0 | if (_stopped && _need_create_plan_tables.empty()) { |
800 | 0 | return; |
801 | 0 | } |
802 | 0 | need_create_plan_tables.swap(_need_create_plan_tables); |
803 | 0 | } |
804 | 0 | for (const auto table_id : need_create_plan_tables) { |
805 | 0 | std::shared_ptr<GroupCommitTable> group_commit_table; |
806 | 0 | { |
807 | 0 | std::lock_guard<std::mutex> l(_lock); |
808 | 0 | auto it = _table_map.find(table_id); |
809 | 0 | if (it == _table_map.end()) { |
810 | 0 | continue; |
811 | 0 | } |
812 | 0 | group_commit_table = it->second; |
813 | 0 | } |
814 | 0 | auto st = group_commit_table->submit_create_group_commit_load(); |
815 | 0 | if (!st.ok()) { |
816 | 0 | LOG(WARNING) << "submit create group commit load task from worker for table: " |
817 | 0 | << table_id << ", error: " << st.to_string(); |
818 | 0 | } |
819 | 0 | } |
820 | 0 | } |
821 | 0 | } |
822 | | |
823 | | Status GroupCommitMgr::get_first_block_load_queue(int64_t db_id, int64_t table_id, |
824 | | int64_t base_schema_version, int64_t index_size, |
825 | | const UniqueId& load_id, |
826 | | std::shared_ptr<LoadBlockQueue>& load_block_queue, |
827 | | int be_exe_version, |
828 | | std::shared_ptr<Dependency> create_plan_dep, |
829 | 0 | std::shared_ptr<Dependency> put_block_dep) { |
830 | 0 | std::shared_ptr<GroupCommitTable> group_commit_table; |
831 | 0 | { |
832 | 0 | std::lock_guard wlock(_lock); |
833 | 0 | if (_table_map.find(table_id) == _table_map.end()) { |
834 | 0 | _table_map.emplace(table_id, std::make_shared<GroupCommitTable>( |
835 | 0 | _exec_env, _thread_pool.get(), db_id, table_id, |
836 | 0 | _all_block_queues_bytes, this)); |
837 | 0 | } |
838 | 0 | group_commit_table = _table_map[table_id]; |
839 | 0 | } |
840 | 0 | RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue( |
841 | 0 | table_id, base_schema_version, index_size, load_id, load_block_queue, be_exe_version, |
842 | 0 | create_plan_dep, put_block_dep)); |
843 | 0 | return Status::OK(); |
844 | 0 | } |
845 | | |
846 | | Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& instance_id, |
847 | | std::shared_ptr<LoadBlockQueue>& load_block_queue, |
848 | 0 | std::shared_ptr<Dependency> get_block_dep) { |
849 | 0 | std::shared_ptr<GroupCommitTable> group_commit_table; |
850 | 0 | { |
851 | 0 | std::lock_guard<std::mutex> l(_lock); |
852 | 0 | auto it = _table_map.find(table_id); |
853 | 0 | if (it == _table_map.end()) { |
854 | 0 | return Status::NotFound("table_id: " + std::to_string(table_id) + |
855 | 0 | ", instance_id: " + print_id(instance_id) + " dose not exist"); |
856 | 0 | } |
857 | 0 | group_commit_table = it->second; |
858 | 0 | } |
859 | 0 | return group_commit_table->get_load_block_queue(instance_id, load_block_queue, get_block_dep); |
860 | 0 | } |
861 | | |
862 | 0 | void GroupCommitMgr::remove_load_id(int64_t table_id, const UniqueId& load_id) { |
863 | 0 | std::lock_guard wlock(_lock); |
864 | 0 | if (_table_map.find(table_id) != _table_map.end()) { |
865 | 0 | _table_map.find(table_id)->second->remove_load_id(load_id); |
866 | 0 | } |
867 | 0 | } |
868 | | |
869 | | Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, |
870 | | const std::string& import_label, WalManager* wal_manager, |
871 | 0 | std::vector<TSlotDescriptor>& slot_desc, int be_exe_version) { |
872 | 0 | std::string real_label = config::group_commit_wait_replay_wal_finish |
873 | 0 | ? import_label + "_test_wait" |
874 | 0 | : import_label; |
875 | 0 | RETURN_IF_ERROR(ExecEnv::GetInstance()->wal_mgr()->create_wal_path( |
876 | 0 | db_id, tb_id, wal_id, real_label, _wal_base_path, WAL_VERSION)); |
877 | 0 | _v_wal_writer = std::make_shared<VWalWriter>(db_id, tb_id, wal_id, real_label, wal_manager, |
878 | 0 | slot_desc, be_exe_version); |
879 | 0 | return _v_wal_writer->init(); |
880 | 0 | } |
881 | | |
882 | 0 | Status LoadBlockQueue::close_wal() { |
883 | 0 | if (_v_wal_writer != nullptr) { |
884 | 0 | RETURN_IF_ERROR(_v_wal_writer->close()); |
885 | 0 | } |
886 | 0 | return Status::OK(); |
887 | 0 | } |
888 | | |
889 | 0 | void LoadBlockQueue::append_dependency(std::shared_ptr<Dependency> finish_dep) { |
890 | 0 | std::lock_guard<std::mutex> lock(mutex); |
891 | | // If not finished, dependencies should be blocked. |
892 | 0 | if (!process_finish) { |
893 | 0 | finish_dep->block(); |
894 | 0 | dependencies.push_back(finish_dep); |
895 | 0 | } |
896 | 0 | } |
897 | | |
898 | 0 | void LoadBlockQueue::append_read_dependency(std::shared_ptr<Dependency> read_dep) { |
899 | 0 | std::lock_guard<std::mutex> lock(mutex); |
900 | 0 | _read_deps.push_back(read_dep); |
901 | 0 | } |
902 | | |
903 | 0 | bool LoadBlockQueue::has_enough_wal_disk_space(size_t estimated_wal_bytes) { |
904 | 0 | DBUG_EXECUTE_IF("LoadBlockQueue.has_enough_wal_disk_space.low_space", { return false; }); |
905 | 0 | auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr(); |
906 | 0 | size_t available_bytes = 0; |
907 | 0 | { |
908 | 0 | Status st = wal_mgr->get_wal_dir_available_size(_wal_base_path, &available_bytes); |
909 | 0 | if (!st.ok()) { |
910 | 0 | LOG(WARNING) << "get wal dir available size failed, st=" << st.to_string(); |
911 | 0 | } |
912 | 0 | } |
913 | 0 | if (estimated_wal_bytes < available_bytes) { |
914 | 0 | Status st = |
915 | 0 | wal_mgr->update_wal_dir_estimated_wal_bytes(_wal_base_path, estimated_wal_bytes, 0); |
916 | 0 | if (!st.ok()) { |
917 | 0 | LOG(WARNING) << "update wal dir estimated_wal_bytes failed, reason: " << st.to_string(); |
918 | 0 | } |
919 | 0 | return true; |
920 | 0 | } else { |
921 | 0 | return false; |
922 | 0 | } |
923 | 0 | } |
924 | | |
925 | | } // namespace doris |