be/src/load/memtable/memtable_flush_executor.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "load/memtable/memtable_flush_executor.h" |
19 | | |
20 | | #include <gen_cpp/olap_file.pb.h> |
21 | | |
22 | | #include <algorithm> |
23 | | #include <cstddef> |
24 | | #include <ostream> |
25 | | |
26 | | #include "common/config.h" |
27 | | #include "common/logging.h" |
28 | | #include "common/metrics/doris_metrics.h" |
29 | | #include "common/metrics/metrics.h" |
30 | | #include "common/metrics/system_metrics.h" |
31 | | #include "common/signal_handler.h" |
32 | | #include "exec/sink/autoinc_buffer.h" |
33 | | #include "load/memtable/memtable.h" |
34 | | #include "runtime/thread_context.h" |
35 | | #include "storage/binlog.h" |
36 | | #include "storage/rowset/group_rowset_writer.h" |
37 | | #include "storage/rowset/rowset_writer.h" |
38 | | #include "storage/storage_engine.h" |
39 | | #include "storage/tablet_info.h" |
40 | | #include "util/debug_points.h" |
41 | | #include "util/pretty_printer.h" |
42 | | #include "util/stopwatch.hpp" |
43 | | #include "util/time.h" |
44 | | |
45 | | namespace doris { |
46 | | using namespace ErrorCode; |
47 | | |
48 | | bvar::Adder<int64_t> g_flush_task_num("memtable_flush_task_num"); |
49 | | |
50 | | class MemtableFlushTask : public Runnable { |
51 | | ENABLE_FACTORY_CREATOR(MemtableFlushTask); |
52 | | |
53 | | public: |
54 | | MemtableFlushTask(std::shared_ptr<FlushToken> flush_token, std::shared_ptr<MemTable> memtable, |
55 | | int32_t segment_id, int64_t submit_task_time) |
56 | 18 | : _flush_token(flush_token), |
57 | 18 | _memtable(memtable), |
58 | 18 | _segment_id(segment_id), |
59 | 18 | _submit_task_time(submit_task_time) { |
60 | 18 | g_flush_task_num << 1; |
61 | 18 | } |
62 | | |
63 | 18 | ~MemtableFlushTask() override { g_flush_task_num << -1; } |
64 | | |
65 | 12 | void run() override { |
66 | 12 | auto token = _flush_token.lock(); |
67 | 12 | if (token) { |
68 | 12 | token->_flush_memtable(_memtable, _segment_id, _submit_task_time); |
69 | 12 | } else { |
70 | 0 | LOG(WARNING) << "flush token is deconstructed, ignore the flush task"; |
71 | 0 | } |
72 | 12 | } |
73 | | |
74 | | protected: |
75 | | std::weak_ptr<FlushToken> _flush_token; |
76 | | std::shared_ptr<MemTable> _memtable; |
77 | | int32_t _segment_id; |
78 | | int64_t _submit_task_time; |
79 | | }; |
80 | | |
81 | | class PartOfGroupMemtableFlushTask final : public MemtableFlushTask { |
82 | | ENABLE_FACTORY_CREATOR(PartOfGroupMemtableFlushTask); |
83 | | |
84 | | public: |
85 | | PartOfGroupMemtableFlushTask(std::shared_ptr<FlushToken> flush_token, |
86 | | std::shared_ptr<SharedMemtable> shared_memtable, |
87 | | WriteRequestType write_req_type, int64_t submit_task_time) |
88 | 6 | : MemtableFlushTask(flush_token, nullptr, 0, submit_task_time), |
89 | 6 | _shared_memtable(std::move(shared_memtable)), |
90 | 6 | _write_req_type(write_req_type) {} |
91 | | |
92 | 6 | void run() override { |
93 | 6 | auto token = _flush_token.lock(); |
94 | 6 | if (token) { |
95 | 6 | token->_flush_group_memtable(_shared_memtable, _write_req_type, _submit_task_time); |
96 | 6 | } else { |
97 | 0 | LOG(WARNING) << "flush token is deconstructed, ignore the flush task"; |
98 | 0 | } |
99 | 6 | } |
100 | | |
101 | | private: |
102 | | std::shared_ptr<SharedMemtable> _shared_memtable; |
103 | | WriteRequestType _write_req_type; |
104 | | }; |
105 | | |
106 | 0 | std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) { |
107 | 0 | os << "(flush time(ms)=" << stat.flush_time_ns / NANOS_PER_MILLIS |
108 | 0 | << ", flush wait time(ms)=" << stat.flush_wait_time_ns / NANOS_PER_MILLIS |
109 | 0 | << ", flush submit count=" << stat.flush_submit_count |
110 | 0 | << ", running flush count=" << stat.flush_running_count |
111 | 0 | << ", finish flush count=" << stat.flush_finish_count |
112 | 0 | << ", flush bytes: " << stat.flush_size_bytes |
113 | 0 | << ", flush disk bytes: " << stat.flush_disk_size_bytes << ")"; |
114 | 0 | return os; |
115 | 0 | } |
116 | | |
117 | 3 | SharedMemtable::~SharedMemtable() { |
118 | 3 | if (block == nullptr) { |
119 | 0 | return; |
120 | 0 | } |
121 | 3 | DCHECK(memtable != nullptr); |
122 | 3 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
123 | 3 | memtable->resource_ctx()->memory_context()->mem_tracker()->write_tracker()); |
124 | 3 | SCOPED_CONSUME_MEM_TRACKER(memtable->mem_tracker()); |
125 | 3 | block.reset(); |
126 | 3 | } |
127 | | |
128 | | Status FlushToken::_submit_sub_tasks(ThreadPool* pool, |
129 | 15 | std::vector<std::shared_ptr<Runnable>> sub_tasks) { |
130 | 33 | for (int i = 0; i < sub_tasks.size(); ++i) { |
131 | 18 | { |
132 | 18 | std::shared_lock rdlk(_flush_status_lock); |
133 | 18 | DBUG_EXECUTE_IF("FlushToken.submit_sub_task_error", { |
134 | 18 | if (i != 0) { |
135 | | // only affect flush binlog task |
136 | 18 | _flush_status = Status::IOError<false>("dbug_be_memtable_submit_flush_error"); |
137 | 18 | } |
138 | 18 | }); |
139 | 18 | if (!_flush_status.ok()) { |
140 | 0 | return _flush_status; |
141 | 0 | } |
142 | 18 | } |
143 | 18 | Status submit_st = pool->submit(std::move(sub_tasks[i])); |
144 | 18 | if (UNLIKELY(!submit_st.ok())) { |
145 | 0 | { |
146 | 0 | std::lock_guard wrlk(_flush_status_lock); |
147 | 0 | if (_flush_status.ok()) { |
148 | 0 | _flush_status = submit_st; |
149 | 0 | } |
150 | 0 | } |
151 | 0 | _shutdown_flush_token(); |
152 | 0 | return submit_st; |
153 | 0 | } |
154 | 18 | _stats.flush_submit_count++; |
155 | 18 | } |
156 | 15 | return Status::OK(); |
157 | 15 | } |
158 | | |
159 | 18 | Status FlushToken::submit(std::shared_ptr<MemTable> mem_table) { |
160 | 18 | { |
161 | 18 | std::shared_lock rdlk(_flush_status_lock); |
162 | 18 | DBUG_EXECUTE_IF("FlushToken.submit_flush_error", { |
163 | 18 | _flush_status = Status::IOError<false>("dbug_be_memtable_submit_flush_error"); |
164 | 18 | }); |
165 | 18 | if (!_flush_status.ok()) { |
166 | 0 | return _flush_status; |
167 | 0 | } |
168 | 18 | } |
169 | | |
170 | 18 | if (mem_table == nullptr || mem_table->empty()) { |
171 | 3 | return Status::OK(); |
172 | 3 | } |
173 | 15 | int64_t submit_task_time = MonotonicNanos(); |
174 | 15 | auto* group_rowset_writer = typeid_cast<GroupRowsetWriter*>(_rowset_writer.get()); |
175 | 15 | std::shared_ptr<SharedMemtable> shared_memtable; |
176 | 15 | std::vector<std::shared_ptr<Runnable>> tasks; |
177 | 15 | if (group_rowset_writer != nullptr) { |
178 | 3 | auto data_writer = group_rowset_writer->data_writer(); |
179 | 3 | auto binlog_writer = group_rowset_writer->row_binlog_writer(); |
180 | 3 | DCHECK(data_writer != nullptr); |
181 | 3 | DCHECK(binlog_writer != nullptr); |
182 | | |
183 | 3 | shared_memtable = std::make_shared<SharedMemtable>(); |
184 | 3 | shared_memtable->memtable = mem_table; |
185 | | // Keep data/binlog segment_id allocators in sync. |
186 | 3 | auto segment_id = data_writer->allocate_segment_id(); |
187 | 3 | auto binlog_segment_id = binlog_writer->allocate_segment_id(); |
188 | 3 | DCHECK_EQ(segment_id, binlog_segment_id); |
189 | 3 | shared_memtable->segment_id = segment_id; |
190 | | |
191 | 3 | if (binlog_writer->context().write_binlog_opt().need_build_binlog()) { |
192 | 3 | if (_row_binlog_lsn_buffer == nullptr) { |
193 | 0 | std::unique_lock<std::mutex> lock(_mutex); |
194 | 0 | if (_row_binlog_lsn_buffer == nullptr) { |
195 | 0 | if (_table_schema_param == nullptr) { |
196 | 0 | return Status::InternalError<false>( |
197 | 0 | "need binlog but table_schema_param is null"); |
198 | 0 | } |
199 | 0 | _row_binlog_lsn_buffer = |
200 | 0 | GlobalAutoIncBuffers::GetInstance()->get_auto_inc_buffer( |
201 | 0 | _table_schema_param->db_id(), _table_schema_param->table_id(), |
202 | 0 | kBinlogLsnAutoIncId); |
203 | 0 | } |
204 | 0 | } |
205 | 3 | std::shared_ptr<std::vector<int128_t>> lsn; |
206 | 3 | RETURN_IF_ERROR( |
207 | 3 | allocate_binlog_lsn(_row_binlog_lsn_buffer, mem_table->raw_rows(), &lsn)); |
208 | 3 | DCHECK(lsn != nullptr && !lsn->empty()); |
209 | 3 | const_cast<RowsetWriterContext&>(binlog_writer->context()) |
210 | 3 | .write_binlog_opt() |
211 | 3 | .write_binlog_config() |
212 | 3 | .insert_seg_lsn(shared_memtable->segment_id, lsn); |
213 | 3 | } |
214 | | |
215 | 3 | tasks.emplace_back(PartOfGroupMemtableFlushTask::create_shared( |
216 | 3 | shared_from_this(), shared_memtable, WriteRequestType::DATA_IN_GROUP, |
217 | 3 | submit_task_time)); |
218 | 3 | tasks.emplace_back(PartOfGroupMemtableFlushTask::create_shared( |
219 | 3 | shared_from_this(), shared_memtable, WriteRequestType::BINLOG_IN_GROUP, |
220 | 3 | submit_task_time)); |
221 | 12 | } else { |
222 | 12 | tasks.emplace_back(MemtableFlushTask::create_shared(shared_from_this(), mem_table, |
223 | 12 | _rowset_writer->allocate_segment_id(), |
224 | 12 | submit_task_time)); |
225 | 12 | } |
226 | | // NOTE: we should guarantee WorkloadGroup is not deconstructed when submit memtable flush task. |
227 | | // because currently WorkloadGroup's can only be destroyed when all queries in the group is finished, |
228 | | // but not consider whether load channel is finish. |
229 | 15 | std::shared_ptr<WorkloadGroup> wg_sptr = _wg_wptr.lock(); |
230 | 15 | ThreadPool* wg_thread_pool = nullptr; |
231 | 15 | if (wg_sptr) { |
232 | 0 | wg_thread_pool = wg_sptr->get_memtable_flush_pool(); |
233 | 0 | } |
234 | 15 | ThreadPool* pool = wg_thread_pool ? wg_thread_pool : _thread_pool; |
235 | | |
236 | 15 | return _submit_sub_tasks(pool, std::move(tasks)); |
237 | 15 | } |
238 | | |
239 | | void FlushToken::_flush_group_memtable(std::shared_ptr<SharedMemtable> shared_memtable, |
240 | 6 | WriteRequestType write_req_type, int64_t submit_task_time) { |
241 | 6 | DCHECK(shared_memtable != nullptr); |
242 | 6 | DCHECK(shared_memtable->memtable != nullptr); |
243 | 6 | DCHECK(write_req_type == WriteRequestType::DATA_IN_GROUP || |
244 | 6 | write_req_type == WriteRequestType::BINLOG_IN_GROUP); |
245 | | |
246 | 6 | auto* group_rowset_writer = typeid_cast<GroupRowsetWriter*>(_rowset_writer.get()); |
247 | 6 | DCHECK(group_rowset_writer != nullptr); |
248 | 6 | auto flush_writer = write_req_type == WriteRequestType::DATA_IN_GROUP |
249 | 6 | ? group_rowset_writer->data_writer() |
250 | 6 | : group_rowset_writer->row_binlog_writer(); |
251 | 6 | DCHECK(flush_writer != nullptr); |
252 | 6 | _flush_memtable_impl(flush_writer.get(), shared_memtable->memtable.get(), |
253 | 6 | shared_memtable->segment_id, submit_task_time, shared_memtable.get()); |
254 | 6 | } |
255 | | |
256 | | // NOTE: FlushToken's submit/cancel/wait run in one thread, |
257 | | // so we don't need to make them mutually exclusive, std::atomic is enough. |
258 | 18 | void FlushToken::_wait_submit_task_finish() { |
259 | 18 | std::unique_lock<std::mutex> lock(_mutex); |
260 | 33 | _submit_task_finish_cond.wait(lock, [&]() { return _stats.flush_submit_count.load() == 0; }); |
261 | 18 | } |
262 | | |
263 | 30 | void FlushToken::_wait_running_task_finish() { |
264 | 30 | std::unique_lock<std::mutex> lock(_mutex); |
265 | 30 | _running_task_finish_cond.wait(lock, [&]() { return _stats.flush_running_count.load() == 0; }); |
266 | 30 | } |
267 | | |
268 | 30 | void FlushToken::cancel() { |
269 | 30 | _shutdown_flush_token(); |
270 | 30 | _wait_running_task_finish(); |
271 | 30 | } |
272 | | |
273 | 18 | Status FlushToken::wait() { |
274 | 18 | _wait_submit_task_finish(); |
275 | 18 | { |
276 | 18 | std::shared_lock rdlk(_flush_status_lock); |
277 | 18 | if (!_flush_status.ok()) { |
278 | 2 | return _flush_status; |
279 | 2 | } |
280 | 18 | } |
281 | 16 | return Status::OK(); |
282 | 18 | } |
283 | | |
284 | | Status FlushToken::_try_reserve_memory(const std::shared_ptr<ResourceContext>& resource_context, |
285 | 0 | int64_t size) { |
286 | 0 | auto* thread_context = doris::thread_context(); |
287 | 0 | auto* memtable_flush_executor = |
288 | 0 | ExecEnv::GetInstance()->storage_engine().memtable_flush_executor(); |
289 | 0 | Status st; |
290 | 0 | int32_t max_waiting_time = config::memtable_wait_for_memory_sleep_time_s; |
291 | 0 | do { |
292 | | // only try to reserve process memory |
293 | 0 | st = thread_context->thread_mem_tracker_mgr->try_reserve( |
294 | 0 | size, ThreadMemTrackerMgr::TryReserveChecker::CHECK_PROCESS); |
295 | 0 | if (st.ok()) { |
296 | 0 | memtable_flush_executor->inc_flushing_task(); |
297 | 0 | break; |
298 | 0 | } |
299 | 0 | if (_is_shutdown() || resource_context->task_controller()->is_cancelled()) { |
300 | 0 | st = Status::Cancelled("flush memtable already cancelled"); |
301 | 0 | break; |
302 | 0 | } |
303 | | // Make sure at least one memtable is flushing even reserve memory failed. |
304 | 0 | if (memtable_flush_executor->check_and_inc_has_any_flushing_task()) { |
305 | | // If there are already any flushing task, Wait for some time and retry. |
306 | 0 | LOG_EVERY_T(INFO, 60) << fmt::format( |
307 | 0 | "Failed to reserve memory {} for flush memtable, retry after 100ms", |
308 | 0 | PrettyPrinter::print_bytes(size)); |
309 | 0 | std::this_thread::sleep_for(std::chrono::seconds(1)); |
310 | 0 | max_waiting_time -= 1; |
311 | 0 | } else { |
312 | 0 | st = Status::OK(); |
313 | 0 | break; |
314 | 0 | } |
315 | 0 | } while (max_waiting_time > 0); |
316 | 0 | return st; |
317 | 0 | } |
318 | | |
319 | | Status FlushToken::_memtable2block(MemTable* memtable, SharedMemtable* shared_memtable, |
320 | 17 | std::shared_ptr<Block>& flush_block) { |
321 | 17 | DCHECK(memtable != nullptr); |
322 | | |
323 | 17 | if (shared_memtable == nullptr) { |
324 | 12 | std::unique_ptr<Block> block; |
325 | 12 | RETURN_IF_ERROR(memtable->to_block(&block)); |
326 | 12 | flush_block.reset(block.release()); |
327 | 12 | return Status::OK(); |
328 | 12 | } |
329 | | |
330 | 5 | std::call_once(shared_memtable->block_once, [&]() { |
331 | 3 | std::unique_ptr<Block> block; |
332 | 3 | shared_memtable->block_status = memtable->to_block(&block); |
333 | 3 | if (shared_memtable->block_status.ok()) { |
334 | 3 | shared_memtable->block.reset(block.release()); |
335 | 3 | } |
336 | 3 | }); |
337 | 5 | if (!shared_memtable->block_status.ok()) { |
338 | 0 | return shared_memtable->block_status; |
339 | 0 | } |
340 | 5 | flush_block = shared_memtable->block; |
341 | 5 | DCHECK(flush_block != nullptr); |
342 | 5 | return Status::OK(); |
343 | 5 | } |
344 | | |
345 | | void FlushToken::_flush_memtable_impl(RowsetWriter* flush_writer, MemTable* memtable, |
346 | | int32_t segment_id, int64_t submit_task_time, |
347 | 18 | SharedMemtable* shared_memtable) { |
348 | 18 | DCHECK(flush_writer != nullptr); |
349 | 18 | DCHECK(memtable != nullptr); |
350 | | |
351 | 18 | signal::set_signal_task_id(flush_writer->load_id()); |
352 | 18 | signal::tablet_id = memtable->tablet_id(); |
353 | | // Count the task as running before registering the deferred cleanup so |
354 | | // cancel/shutdown paths keep flush_running_count symmetric on every exit. |
355 | 18 | _stats.flush_running_count++; |
356 | 18 | Defer defer {[&]() { |
357 | 18 | std::lock_guard<std::mutex> lock(_mutex); |
358 | 18 | _stats.flush_submit_count--; |
359 | 18 | if (_stats.flush_submit_count == 0) { |
360 | 15 | _submit_task_finish_cond.notify_one(); |
361 | 15 | } |
362 | 18 | _stats.flush_running_count--; |
363 | 18 | if (_stats.flush_running_count == 0) { |
364 | 15 | _running_task_finish_cond.notify_one(); |
365 | 15 | } |
366 | 18 | }}; |
367 | 18 | DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_before_first_shutdown", |
368 | 18 | { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); }); |
369 | 18 | if (_is_shutdown()) { |
370 | 0 | return; |
371 | 0 | } |
372 | 18 | DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_after_first_shutdown", |
373 | 18 | { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); }); |
374 | | // double check if shutdown to avoid wait running task finish count not accurate |
375 | 18 | if (_is_shutdown()) { |
376 | 0 | return; |
377 | 0 | } |
378 | 18 | DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_after_second_shutdown", |
379 | 18 | { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); }); |
380 | 18 | uint64_t flush_wait_time_ns = MonotonicNanos() - submit_task_time; |
381 | 18 | _stats.flush_wait_time_ns += flush_wait_time_ns; |
382 | | // If previous flush has failed, return directly |
383 | 18 | { |
384 | 18 | std::shared_lock rdlk(_flush_status_lock); |
385 | 18 | if (!_flush_status.ok()) { |
386 | 0 | return; |
387 | 0 | } |
388 | 18 | } |
389 | | |
390 | 18 | MonotonicStopWatch timer; |
391 | 18 | timer.start(); |
392 | 18 | size_t memory_usage = memtable->memory_usage(); |
393 | | |
394 | 18 | int64_t flush_size = 0; |
395 | 18 | Status s; |
396 | 18 | memtable->update_mem_type(MemType::FLUSH); |
397 | 18 | int64_t duration_ns = 0; |
398 | 18 | { |
399 | 18 | s = [&]() { |
400 | 18 | SCOPED_RAW_TIMER(&duration_ns); |
401 | 18 | SCOPED_ATTACH_TASK(memtable->resource_ctx()); |
402 | 18 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
403 | 18 | memtable->resource_ctx()->memory_context()->mem_tracker()->write_tracker()); |
404 | 18 | SCOPED_CONSUME_MEM_TRACKER(memtable->mem_tracker()); |
405 | | |
406 | | // DEFER_RELEASE_RESERVED(); |
407 | | |
408 | | // auto reserve_size = memtable->get_flush_reserve_memory_size(); |
409 | | // if (memtable->resource_ctx()->task_controller()->is_enable_reserve_memory() && |
410 | | // reserve_size > 0) { |
411 | | // RETURN_IF_ERROR(_try_reserve_memory(memtable->resource_ctx(), reserve_size)); |
412 | | // } |
413 | | |
414 | | // Defer defer {[&]() { |
415 | | // ExecEnv::GetInstance()->storage_engine().memtable_flush_executor()->dec_flushing_task(); |
416 | | // }}; |
417 | 18 | std::shared_ptr<Block> flush_block; |
418 | 18 | RETURN_IF_ERROR(_memtable2block(memtable, shared_memtable, flush_block)); |
419 | 18 | RETURN_IF_ERROR( |
420 | 18 | flush_writer->flush_memtable(flush_block.get(), segment_id, &flush_size)); |
421 | 16 | memtable->set_flush_success(); |
422 | | |
423 | 16 | return Status::OK(); |
424 | 18 | }(); |
425 | | |
426 | 18 | if (s.ok()) { |
427 | 16 | bool record_memtable_stat = shared_memtable == nullptr; |
428 | 16 | if (shared_memtable != nullptr) { |
429 | 4 | auto finished_sub_task_count = shared_memtable->add_finished_sub_task() + 1; |
430 | 4 | record_memtable_stat = |
431 | 4 | finished_sub_task_count == shared_memtable->total_sub_task_count.load(); |
432 | 4 | } |
433 | 16 | if (record_memtable_stat) { |
434 | 13 | _memtable_stat += memtable->stat(); |
435 | 13 | } |
436 | 16 | DorisMetrics::instance()->memtable_flush_total->increment(1); |
437 | 16 | DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000); |
438 | 16 | } |
439 | 18 | } |
440 | | |
441 | 18 | { |
442 | 18 | std::shared_lock rdlk(_flush_status_lock); |
443 | 18 | if (!_flush_status.ok()) { |
444 | 0 | return; |
445 | 0 | } |
446 | 18 | } |
447 | 18 | if (!s.ok()) { |
448 | 2 | std::lock_guard wrlk(_flush_status_lock); |
449 | 2 | if (_flush_status.ok()) { |
450 | 2 | LOG(WARNING) << "Flush memtable failed with res = " << s |
451 | 2 | << ", load_id: " << print_id(flush_writer->load_id()); |
452 | 2 | _flush_status = s; |
453 | 2 | } |
454 | 2 | _shutdown_flush_token(); |
455 | 2 | return; |
456 | 2 | } |
457 | | |
458 | 16 | VLOG_CRITICAL << "flush memtable wait time: " |
459 | 3 | << PrettyPrinter::print(flush_wait_time_ns, TUnit::TIME_NS) |
460 | 3 | << ", flush memtable cost: " |
461 | 3 | << PrettyPrinter::print(timer.elapsed_time(), TUnit::TIME_NS) |
462 | 3 | << ", submit count: " << _stats.flush_submit_count |
463 | 3 | << ", running count: " << _stats.flush_running_count |
464 | 3 | << ", finish count: " << _stats.flush_finish_count |
465 | 3 | << ", mem size: " << PrettyPrinter::print_bytes(memory_usage) |
466 | 3 | << ", disk size: " << PrettyPrinter::print_bytes(flush_size); |
467 | 16 | _stats.flush_time_ns += timer.elapsed_time(); |
468 | 16 | _stats.flush_finish_count++; |
469 | 16 | _stats.flush_size_bytes += memtable->memory_usage(); |
470 | 16 | _stats.flush_disk_size_bytes += flush_size; |
471 | 16 | } |
472 | | |
473 | | void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable_ptr, int32_t segment_id, |
474 | 12 | int64_t submit_task_time) { |
475 | 12 | _flush_memtable_impl(_rowset_writer.get(), memtable_ptr.get(), segment_id, submit_task_time); |
476 | 12 | } |
477 | | |
478 | | std::pair<int, int> MemTableFlushExecutor::calc_flush_thread_count(int num_cpus, int num_disk, |
479 | 113 | int thread_num_per_store) { |
480 | 113 | if (config::enable_adaptive_flush_threads && num_cpus > 0) { |
481 | 97 | int min = std::max(1, (int)(num_cpus * config::min_flush_thread_num_per_cpu)); |
482 | 97 | int max = std::max(min, num_cpus * config::max_flush_thread_num_per_cpu); |
483 | 97 | return {min, max}; |
484 | 97 | } |
485 | 16 | int min = std::max(1, thread_num_per_store); |
486 | 16 | int max = num_cpus == 0 |
487 | 16 | ? num_disk * min |
488 | 16 | : std::min(num_disk * min, num_cpus * config::max_flush_thread_num_per_cpu); |
489 | 16 | return {min, max}; |
490 | 113 | } |
491 | | |
492 | 43 | void MemTableFlushExecutor::init(int num_disk) { |
493 | 43 | _num_disk = std::max(1, num_disk); |
494 | 43 | int num_cpus = std::thread::hardware_concurrency(); |
495 | | |
496 | 43 | auto [min_threads, max_threads] = |
497 | 43 | calc_flush_thread_count(num_cpus, _num_disk, config::flush_thread_num_per_store); |
498 | 43 | static_cast<void>(ThreadPoolBuilder("MemTableFlushThreadPool") |
499 | 43 | .set_min_threads(min_threads) |
500 | 43 | .set_max_threads(max_threads) |
501 | 43 | .build(&_flush_pool)); |
502 | | |
503 | 43 | auto [hi_min, hi_max] = calc_flush_thread_count( |
504 | 43 | num_cpus, _num_disk, config::high_priority_flush_thread_num_per_store); |
505 | 43 | static_cast<void>(ThreadPoolBuilder("MemTableHighPriorityFlushThreadPool") |
506 | 43 | .set_min_threads(hi_min) |
507 | 43 | .set_max_threads(hi_max) |
508 | 43 | .build(&_high_prio_flush_pool)); |
509 | 43 | } |
510 | | |
511 | 11 | void MemTableFlushExecutor::update_memtable_flush_threads() { |
512 | 11 | int num_cpus = std::thread::hardware_concurrency(); |
513 | | |
514 | 11 | auto [min_threads, max_threads] = |
515 | 11 | calc_flush_thread_count(num_cpus, _num_disk, config::flush_thread_num_per_store); |
516 | | // Update max_threads first to avoid constraint violation when increasing min_threads |
517 | 11 | static_cast<void>(_flush_pool->set_max_threads(max_threads)); |
518 | 11 | static_cast<void>(_flush_pool->set_min_threads(min_threads)); |
519 | | |
520 | 11 | auto [hi_min, hi_max] = calc_flush_thread_count( |
521 | 11 | num_cpus, _num_disk, config::high_priority_flush_thread_num_per_store); |
522 | | // Update max_threads first to avoid constraint violation when increasing min_threads |
523 | 11 | static_cast<void>(_high_prio_flush_pool->set_max_threads(hi_max)); |
524 | 11 | static_cast<void>(_high_prio_flush_pool->set_min_threads(hi_min)); |
525 | 11 | } |
526 | | |
527 | | // NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order. |
528 | | Status MemTableFlushExecutor::create_flush_token( |
529 | | std::shared_ptr<FlushToken>& flush_token, std::shared_ptr<RowsetWriter> rowset_writer, |
530 | | bool is_high_priority, std::shared_ptr<WorkloadGroup> wg_sptr, |
531 | 17 | std::shared_ptr<OlapTableSchemaParam> table_schema_param) { |
532 | 17 | switch (rowset_writer->type()) { |
533 | 0 | case ALPHA_ROWSET: |
534 | | // alpha rowset do not support flush in CONCURRENT. and not support alpha rowset now. |
535 | 0 | return Status::InternalError<false>("not support alpha rowset load now."); |
536 | 17 | case BETA_ROWSET: { |
537 | | // beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer. |
538 | 17 | ThreadPool* pool = is_high_priority ? _high_prio_flush_pool.get() : _flush_pool.get(); |
539 | 17 | flush_token = FlushToken::create_shared(pool, wg_sptr); |
540 | 17 | flush_token->set_rowset_writer(rowset_writer); |
541 | 17 | flush_token->set_table_schema_param(std::move(table_schema_param)); |
542 | 17 | return Status::OK(); |
543 | 0 | } |
544 | 0 | default: |
545 | 0 | return Status::InternalError<false>("unknown rowset type."); |
546 | 17 | } |
547 | 17 | } |
548 | | |
549 | | } // namespace doris |