be/src/load/memtable/memtable_flush_executor.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "load/memtable/memtable_flush_executor.h" |
19 | | |
20 | | #include <gen_cpp/olap_file.pb.h> |
21 | | |
22 | | #include <algorithm> |
23 | | #include <cstddef> |
24 | | #include <ostream> |
25 | | |
26 | | #include "common/config.h" |
27 | | #include "common/logging.h" |
28 | | #include "common/metrics/doris_metrics.h" |
29 | | #include "common/metrics/metrics.h" |
30 | | #include "common/signal_handler.h" |
31 | | #include "load/memtable/memtable.h" |
32 | | #include "runtime/thread_context.h" |
33 | | #include "storage/rowset/rowset_writer.h" |
34 | | #include "storage/storage_engine.h" |
35 | | #include "util/debug_points.h" |
36 | | #include "util/pretty_printer.h" |
37 | | #include "util/stopwatch.hpp" |
38 | | #include "util/time.h" |
39 | | |
40 | | namespace doris { |
41 | | using namespace ErrorCode; |
42 | | |
43 | | bvar::Adder<int64_t> g_flush_task_num("memtable_flush_task_num"); |
44 | | |
45 | | class MemtableFlushTask final : public Runnable { |
46 | | ENABLE_FACTORY_CREATOR(MemtableFlushTask); |
47 | | |
48 | | public: |
49 | | MemtableFlushTask(std::shared_ptr<FlushToken> flush_token, std::shared_ptr<MemTable> memtable, |
50 | | int32_t segment_id, int64_t submit_task_time) |
51 | 12 | : _flush_token(flush_token), |
52 | 12 | _memtable(memtable), |
53 | 12 | _segment_id(segment_id), |
54 | 12 | _submit_task_time(submit_task_time) { |
55 | 12 | g_flush_task_num << 1; |
56 | 12 | } |
57 | | |
58 | 12 | ~MemtableFlushTask() override { g_flush_task_num << -1; } |
59 | | |
60 | 12 | void run() override { |
61 | 12 | auto token = _flush_token.lock(); |
62 | 12 | if (token) { |
63 | 12 | token->_flush_memtable(_memtable, _segment_id, _submit_task_time); |
64 | 12 | } else { |
65 | 0 | LOG(WARNING) << "flush token is deconstructed, ignore the flush task"; |
66 | 0 | } |
67 | 12 | } |
68 | | |
69 | | private: |
70 | | std::weak_ptr<FlushToken> _flush_token; |
71 | | std::shared_ptr<MemTable> _memtable; |
72 | | int32_t _segment_id; |
73 | | int64_t _submit_task_time; |
74 | | }; |
75 | | |
76 | 0 | std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) { |
77 | 0 | os << "(flush time(ms)=" << stat.flush_time_ns / NANOS_PER_MILLIS |
78 | 0 | << ", flush wait time(ms)=" << stat.flush_wait_time_ns / NANOS_PER_MILLIS |
79 | 0 | << ", flush submit count=" << stat.flush_submit_count |
80 | 0 | << ", running flush count=" << stat.flush_running_count |
81 | 0 | << ", finish flush count=" << stat.flush_finish_count |
82 | 0 | << ", flush bytes: " << stat.flush_size_bytes |
83 | 0 | << ", flush disk bytes: " << stat.flush_disk_size_bytes << ")"; |
84 | 0 | return os; |
85 | 0 | } |
86 | | |
87 | 15 | Status FlushToken::submit(std::shared_ptr<MemTable> mem_table) { |
88 | 15 | { |
89 | 15 | std::shared_lock rdlk(_flush_status_lock); |
90 | 15 | DBUG_EXECUTE_IF("FlushToken.submit_flush_error", { |
91 | 15 | _flush_status = Status::IOError<false>("dbug_be_memtable_submit_flush_error"); |
92 | 15 | }); |
93 | 15 | if (!_flush_status.ok()) { |
94 | 0 | return _flush_status; |
95 | 0 | } |
96 | 15 | } |
97 | | |
98 | 15 | if (mem_table == nullptr || mem_table->empty()) { |
99 | 3 | return Status::OK(); |
100 | 3 | } |
101 | 12 | int64_t submit_task_time = MonotonicNanos(); |
102 | 12 | auto task = MemtableFlushTask::create_shared( |
103 | 12 | shared_from_this(), mem_table, _rowset_writer->allocate_segment_id(), submit_task_time); |
104 | | // NOTE: we should guarantee WorkloadGroup is not deconstructed when submit memtable flush task. |
105 | | // because currently WorkloadGroup's can only be destroyed when all queries in the group is finished, |
106 | | // but not consider whether load channel is finish. |
107 | 12 | std::shared_ptr<WorkloadGroup> wg_sptr = _wg_wptr.lock(); |
108 | 12 | ThreadPool* wg_thread_pool = nullptr; |
109 | 12 | if (wg_sptr) { |
110 | 0 | wg_thread_pool = wg_sptr->get_memtable_flush_pool(); |
111 | 0 | } |
112 | 12 | Status ret = wg_thread_pool ? wg_thread_pool->submit(std::move(task)) |
113 | 12 | : _thread_pool->submit(std::move(task)); |
114 | 12 | if (ret.ok()) { |
115 | | // _wait_running_task_finish was executed after this function, so no need to notify _cond here |
116 | 12 | _stats.flush_submit_count++; |
117 | 12 | } |
118 | 12 | return ret; |
119 | 15 | } |
120 | | |
121 | | // NOTE: FlushToken's submit/cancel/wait run in one thread, |
122 | | // so we don't need to make them mutually exclusive, std::atomic is enough. |
123 | 15 | void FlushToken::_wait_submit_task_finish() { |
124 | 15 | std::unique_lock<std::mutex> lock(_mutex); |
125 | 27 | _submit_task_finish_cond.wait(lock, [&]() { return _stats.flush_submit_count.load() == 0; }); |
126 | 15 | } |
127 | | |
128 | 30 | void FlushToken::_wait_running_task_finish() { |
129 | 30 | std::unique_lock<std::mutex> lock(_mutex); |
130 | 30 | _running_task_finish_cond.wait(lock, [&]() { return _stats.flush_running_count.load() == 0; }); |
131 | 30 | } |
132 | | |
133 | 30 | void FlushToken::cancel() { |
134 | 30 | _shutdown_flush_token(); |
135 | 30 | _wait_running_task_finish(); |
136 | 30 | } |
137 | | |
138 | 15 | Status FlushToken::wait() { |
139 | 15 | _wait_submit_task_finish(); |
140 | 15 | { |
141 | 15 | std::shared_lock rdlk(_flush_status_lock); |
142 | 15 | if (!_flush_status.ok()) { |
143 | 0 | return _flush_status; |
144 | 0 | } |
145 | 15 | } |
146 | 15 | return Status::OK(); |
147 | 15 | } |
148 | | |
149 | | Status FlushToken::_try_reserve_memory(const std::shared_ptr<ResourceContext>& resource_context, |
150 | 0 | int64_t size) { |
151 | 0 | auto* thread_context = doris::thread_context(); |
152 | 0 | auto* memtable_flush_executor = |
153 | 0 | ExecEnv::GetInstance()->storage_engine().memtable_flush_executor(); |
154 | 0 | Status st; |
155 | 0 | int32_t max_waiting_time = config::memtable_wait_for_memory_sleep_time_s; |
156 | 0 | do { |
157 | | // only try to reserve process memory |
158 | 0 | st = thread_context->thread_mem_tracker_mgr->try_reserve( |
159 | 0 | size, ThreadMemTrackerMgr::TryReserveChecker::CHECK_PROCESS); |
160 | 0 | if (st.ok()) { |
161 | 0 | memtable_flush_executor->inc_flushing_task(); |
162 | 0 | break; |
163 | 0 | } |
164 | 0 | if (_is_shutdown() || resource_context->task_controller()->is_cancelled()) { |
165 | 0 | st = Status::Cancelled("flush memtable already cancelled"); |
166 | 0 | break; |
167 | 0 | } |
168 | | // Make sure at least one memtable is flushing even reserve memory failed. |
169 | 0 | if (memtable_flush_executor->check_and_inc_has_any_flushing_task()) { |
170 | | // If there are already any flushing task, Wait for some time and retry. |
171 | 0 | LOG_EVERY_T(INFO, 60) << fmt::format( |
172 | 0 | "Failed to reserve memory {} for flush memtable, retry after 100ms", |
173 | 0 | PrettyPrinter::print_bytes(size)); |
174 | 0 | std::this_thread::sleep_for(std::chrono::seconds(1)); |
175 | 0 | max_waiting_time -= 1; |
176 | 0 | } else { |
177 | 0 | st = Status::OK(); |
178 | 0 | break; |
179 | 0 | } |
180 | 0 | } while (max_waiting_time > 0); |
181 | 0 | return st; |
182 | 0 | } |
183 | | |
184 | 12 | Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size) { |
185 | 12 | VLOG_CRITICAL << "begin to flush memtable for tablet: " << memtable->tablet_id() |
186 | 3 | << ", memsize: " << PrettyPrinter::print_bytes(memtable->memory_usage()) |
187 | 3 | << ", rows: " << memtable->stat().raw_rows; |
188 | 12 | memtable->update_mem_type(MemType::FLUSH); |
189 | 12 | int64_t duration_ns = 0; |
190 | 12 | { |
191 | 12 | SCOPED_RAW_TIMER(&duration_ns); |
192 | 12 | SCOPED_ATTACH_TASK(memtable->resource_ctx()); |
193 | 12 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
194 | 12 | memtable->resource_ctx()->memory_context()->mem_tracker()->write_tracker()); |
195 | 12 | SCOPED_CONSUME_MEM_TRACKER(memtable->mem_tracker()); |
196 | | |
197 | | // DEFER_RELEASE_RESERVED(); |
198 | | |
199 | | // auto reserve_size = memtable->get_flush_reserve_memory_size(); |
200 | | // if (memtable->resource_ctx()->task_controller()->is_enable_reserve_memory() && |
201 | | // reserve_size > 0) { |
202 | | // RETURN_IF_ERROR(_try_reserve_memory(memtable->resource_ctx(), reserve_size)); |
203 | | // } |
204 | | |
205 | | // Defer defer {[&]() { |
206 | | // ExecEnv::GetInstance()->storage_engine().memtable_flush_executor()->dec_flushing_task(); |
207 | | // }}; |
208 | 12 | std::unique_ptr<Block> block; |
209 | 12 | RETURN_IF_ERROR(memtable->to_block(&block)); |
210 | 12 | RETURN_IF_ERROR(_rowset_writer->flush_memtable(block.get(), segment_id, flush_size)); |
211 | 12 | memtable->set_flush_success(); |
212 | 12 | } |
213 | 0 | _memtable_stat += memtable->stat(); |
214 | 12 | DorisMetrics::instance()->memtable_flush_total->increment(1); |
215 | 12 | DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000); |
216 | 12 | VLOG_CRITICAL << "after flush memtable for tablet: " << memtable->tablet_id() |
217 | 3 | << ", flushsize: " << PrettyPrinter::print_bytes(*flush_size); |
218 | 12 | return Status::OK(); |
219 | 12 | } |
220 | | |
221 | | void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable_ptr, int32_t segment_id, |
222 | 12 | int64_t submit_task_time) { |
223 | 12 | signal::set_signal_task_id(_rowset_writer->load_id()); |
224 | 12 | signal::tablet_id = memtable_ptr->tablet_id(); |
225 | 12 | Defer defer {[&]() { |
226 | 12 | std::lock_guard<std::mutex> lock(_mutex); |
227 | 12 | _stats.flush_submit_count--; |
228 | 12 | if (_stats.flush_submit_count == 0) { |
229 | 12 | _submit_task_finish_cond.notify_one(); |
230 | 12 | } |
231 | 12 | _stats.flush_running_count--; |
232 | 12 | if (_stats.flush_running_count == 0) { |
233 | 12 | _running_task_finish_cond.notify_one(); |
234 | 12 | } |
235 | 12 | }}; |
236 | 12 | DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_before_first_shutdown", |
237 | 12 | { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); }); |
238 | 12 | if (_is_shutdown()) { |
239 | 0 | return; |
240 | 0 | } |
241 | 12 | DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_after_first_shutdown", |
242 | 12 | { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); }); |
243 | 12 | _stats.flush_running_count++; |
244 | | // double check if shutdown to avoid wait running task finish count not accurate |
245 | 12 | if (_is_shutdown()) { |
246 | 0 | return; |
247 | 0 | } |
248 | 12 | DBUG_EXECUTE_IF("FlushToken.flush_memtable.wait_after_second_shutdown", |
249 | 12 | { std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000)); }); |
250 | 12 | uint64_t flush_wait_time_ns = MonotonicNanos() - submit_task_time; |
251 | 12 | _stats.flush_wait_time_ns += flush_wait_time_ns; |
252 | | // If previous flush has failed, return directly |
253 | 12 | { |
254 | 12 | std::shared_lock rdlk(_flush_status_lock); |
255 | 12 | if (!_flush_status.ok()) { |
256 | 0 | return; |
257 | 0 | } |
258 | 12 | } |
259 | | |
260 | 12 | MonotonicStopWatch timer; |
261 | 12 | timer.start(); |
262 | 12 | size_t memory_usage = memtable_ptr->memory_usage(); |
263 | | |
264 | 12 | int64_t flush_size; |
265 | 12 | Status s = _do_flush_memtable(memtable_ptr.get(), segment_id, &flush_size); |
266 | | |
267 | 12 | { |
268 | 12 | std::shared_lock rdlk(_flush_status_lock); |
269 | 12 | if (!_flush_status.ok()) { |
270 | 0 | return; |
271 | 0 | } |
272 | 12 | } |
273 | 12 | if (!s.ok()) { |
274 | 0 | std::lock_guard wrlk(_flush_status_lock); |
275 | 0 | LOG(WARNING) << "Flush memtable failed with res = " << s |
276 | 0 | << ", load_id: " << print_id(_rowset_writer->load_id()); |
277 | 0 | _flush_status = s; |
278 | 0 | return; |
279 | 0 | } |
280 | | |
281 | 12 | VLOG_CRITICAL << "flush memtable wait time: " |
282 | 3 | << PrettyPrinter::print(flush_wait_time_ns, TUnit::TIME_NS) |
283 | 3 | << ", flush memtable cost: " |
284 | 3 | << PrettyPrinter::print(timer.elapsed_time(), TUnit::TIME_NS) |
285 | 3 | << ", submit count: " << _stats.flush_submit_count |
286 | 3 | << ", running count: " << _stats.flush_running_count |
287 | 3 | << ", finish count: " << _stats.flush_finish_count |
288 | 3 | << ", mem size: " << PrettyPrinter::print_bytes(memory_usage) |
289 | 3 | << ", disk size: " << PrettyPrinter::print_bytes(flush_size); |
290 | 12 | _stats.flush_time_ns += timer.elapsed_time(); |
291 | 12 | _stats.flush_finish_count++; |
292 | 12 | _stats.flush_size_bytes += memtable_ptr->memory_usage(); |
293 | 12 | _stats.flush_disk_size_bytes += flush_size; |
294 | 12 | } |
295 | | |
296 | 38 | void MemTableFlushExecutor::init(int num_disk) { |
297 | 38 | _num_disk = std::max(1, num_disk); |
298 | 38 | int num_cpus = std::thread::hardware_concurrency(); |
299 | 38 | int min_threads = std::max(1, config::flush_thread_num_per_store); |
300 | 38 | int max_threads = num_cpus == 0 ? _num_disk * min_threads |
301 | 38 | : std::min(_num_disk * min_threads, |
302 | 38 | num_cpus * config::max_flush_thread_num_per_cpu); |
303 | 38 | static_cast<void>(ThreadPoolBuilder("MemTableFlushThreadPool") |
304 | 38 | .set_min_threads(min_threads) |
305 | 38 | .set_max_threads(max_threads) |
306 | 38 | .build(&_flush_pool)); |
307 | | |
308 | 38 | min_threads = std::max(1, config::high_priority_flush_thread_num_per_store); |
309 | 38 | max_threads = num_cpus == 0 ? _num_disk * min_threads |
310 | 38 | : std::min(_num_disk * min_threads, |
311 | 38 | num_cpus * config::max_flush_thread_num_per_cpu); |
312 | 38 | static_cast<void>(ThreadPoolBuilder("MemTableHighPriorityFlushThreadPool") |
313 | 38 | .set_min_threads(min_threads) |
314 | 38 | .set_max_threads(max_threads) |
315 | 38 | .build(&_high_prio_flush_pool)); |
316 | 38 | } |
317 | | |
318 | 10 | void MemTableFlushExecutor::update_memtable_flush_threads() { |
319 | 10 | int num_cpus = std::thread::hardware_concurrency(); |
320 | 10 | int min_threads = std::max(1, config::flush_thread_num_per_store); |
321 | 10 | int max_threads = num_cpus == 0 ? _num_disk * min_threads |
322 | 10 | : std::min(_num_disk * min_threads, |
323 | 10 | num_cpus * config::max_flush_thread_num_per_cpu); |
324 | | // Update max_threads first to avoid constraint violation when increasing min_threads |
325 | 10 | static_cast<void>(_flush_pool->set_max_threads(max_threads)); |
326 | 10 | static_cast<void>(_flush_pool->set_min_threads(min_threads)); |
327 | | |
328 | 10 | min_threads = std::max(1, config::high_priority_flush_thread_num_per_store); |
329 | 10 | max_threads = num_cpus == 0 ? _num_disk * min_threads |
330 | 10 | : std::min(_num_disk * min_threads, |
331 | 10 | num_cpus * config::max_flush_thread_num_per_cpu); |
332 | | // Update max_threads first to avoid constraint violation when increasing min_threads |
333 | 10 | static_cast<void>(_high_prio_flush_pool->set_max_threads(max_threads)); |
334 | 10 | static_cast<void>(_high_prio_flush_pool->set_min_threads(min_threads)); |
335 | 10 | } |
336 | | |
337 | | // NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order. |
338 | | Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& flush_token, |
339 | | std::shared_ptr<RowsetWriter> rowset_writer, |
340 | | bool is_high_priority, |
341 | 15 | std::shared_ptr<WorkloadGroup> wg_sptr) { |
342 | 15 | switch (rowset_writer->type()) { |
343 | 0 | case ALPHA_ROWSET: |
344 | | // alpha rowset do not support flush in CONCURRENT. and not support alpha rowset now. |
345 | 0 | return Status::InternalError<false>("not support alpha rowset load now."); |
346 | 15 | case BETA_ROWSET: { |
347 | | // beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer. |
348 | 15 | ThreadPool* pool = is_high_priority ? _high_prio_flush_pool.get() : _flush_pool.get(); |
349 | 15 | flush_token = FlushToken::create_shared(pool, wg_sptr); |
350 | 15 | flush_token->set_rowset_writer(rowset_writer); |
351 | 15 | return Status::OK(); |
352 | 0 | } |
353 | 0 | default: |
354 | 0 | return Status::InternalError<false>("unknown rowset type."); |
355 | 15 | } |
356 | 15 | } |
357 | | |
358 | | } // namespace doris |