be/src/load/memtable/memtable_flush_executor.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <atomic> |
21 | | #include <condition_variable> |
22 | | #include <cstdint> |
23 | | #include <iosfwd> |
24 | | #include <memory> |
25 | | #include <utility> |
26 | | #include <vector> |
27 | | |
28 | | #include "common/status.h" |
29 | | #include "load/memtable/memtable.h" |
30 | | #include "util/threadpool.h" |
31 | | |
32 | | namespace doris { |
33 | | |
34 | | class DataDir; |
35 | | class MemTable; |
36 | | class RowsetWriter; |
37 | | class WorkloadGroup; |
38 | | |
39 | | // the statistic of a certain flush handler. |
40 | | // use atomic because it may be updated by multi threads |
41 | | struct FlushStatistic { |
42 | | std::atomic_uint64_t flush_time_ns = 0; |
43 | | std::atomic_uint64_t flush_submit_count = 0; |
44 | | std::atomic_int64_t flush_running_count = 0; |
45 | | std::atomic_uint64_t flush_finish_count = 0; |
46 | | std::atomic_uint64_t flush_size_bytes = 0; |
47 | | std::atomic_uint64_t flush_disk_size_bytes = 0; |
48 | | std::atomic_uint64_t flush_wait_time_ns = 0; |
49 | | }; |
50 | | |
51 | | std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat); |
52 | | |
53 | | // A thin wrapper of ThreadPoolToken to submit task. |
54 | | // For a tablet, there may be multiple memtables, which will be flushed to disk |
55 | | // one by one in the order of generation. |
56 | | // If a memtable flush fails, then: |
57 | | // 1. Immediately disallow submission of any subsequent memtable |
58 | | // 2. For the memtables that have already been submitted, there is no need to flush, |
59 | | // because the entire job will definitely fail; |
60 | | class FlushToken : public std::enable_shared_from_this<FlushToken> { |
61 | | ENABLE_FACTORY_CREATOR(FlushToken); |
62 | | |
63 | | public: |
64 | | FlushToken(ThreadPool* thread_pool, std::shared_ptr<WorkloadGroup> wg_sptr) |
65 | 15 | : _flush_status(Status::OK()), _thread_pool(thread_pool), _wg_wptr(wg_sptr) {} |
66 | | |
67 | | Status submit(std::shared_ptr<MemTable> mem_table); |
68 | | |
69 | | // error has happens, so we cancel this token |
70 | | // And remove all tasks in the queue. |
71 | | void cancel(); |
72 | | |
73 | | // wait all tasks in token to be completed. |
74 | | Status wait(); |
75 | | |
76 | | // get flush operations' statistics |
77 | 35 | const FlushStatistic& get_stats() const { return _stats; } |
78 | | |
79 | 15 | void set_rowset_writer(std::shared_ptr<RowsetWriter> rowset_writer) { |
80 | 15 | _rowset_writer = rowset_writer; |
81 | 15 | } |
82 | | |
83 | 30 | const MemTableStat& memtable_stat() { return _memtable_stat; } |
84 | | |
85 | | private: |
86 | 30 | void _shutdown_flush_token() { _shutdown.store(true); } |
87 | 24 | bool _is_shutdown() { return _shutdown.load(); } |
88 | | void _wait_submit_task_finish(); |
89 | | void _wait_running_task_finish(); |
90 | | |
91 | | private: |
92 | | friend class MemtableFlushTask; |
93 | | |
94 | | void _flush_memtable(std::shared_ptr<MemTable> memtable_ptr, int32_t segment_id, |
95 | | int64_t submit_task_time); |
96 | | |
97 | | Status _do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size); |
98 | | |
99 | | Status _try_reserve_memory(const std::shared_ptr<ResourceContext>& resource_context, |
100 | | int64_t size); |
101 | | |
102 | | // Records the current flush status of the tablet. |
103 | | // Note: Once its value is set to Failed, it cannot return to SUCCESS. |
104 | | std::shared_mutex _flush_status_lock; |
105 | | Status _flush_status; |
106 | | |
107 | | FlushStatistic _stats; |
108 | | |
109 | | std::shared_ptr<RowsetWriter> _rowset_writer = nullptr; |
110 | | |
111 | | MemTableStat _memtable_stat; |
112 | | |
113 | | std::atomic<bool> _shutdown = false; |
114 | | ThreadPool* _thread_pool = nullptr; |
115 | | |
116 | | std::mutex _mutex; |
117 | | std::condition_variable _submit_task_finish_cond; |
118 | | std::condition_variable _running_task_finish_cond; |
119 | | |
120 | | std::weak_ptr<WorkloadGroup> _wg_wptr; |
121 | | }; |
122 | | |
123 | | // MemTableFlushExecutor is responsible for flushing memtables to disk. |
124 | | // It encapsulate a ThreadPool to handle all tasks. |
125 | | // Usage Example: |
126 | | // ... |
127 | | // std::shared_ptr<FlushHandler> flush_handler; |
128 | | // memTableFlushExecutor.create_flush_token(&flush_handler); |
129 | | // ... |
130 | | // flush_token->submit(memtable) |
131 | | // ... |
132 | | class MemTableFlushExecutor { |
133 | | public: |
134 | 38 | MemTableFlushExecutor() = default; |
135 | 38 | ~MemTableFlushExecutor() { |
136 | 38 | _flush_pool->shutdown(); |
137 | 38 | _high_prio_flush_pool->shutdown(); |
138 | 38 | } |
139 | | |
140 | | // init should be called after storage engine is opened, |
141 | | // because it needs path hash of each data dir. |
142 | | void init(int num_disk); |
143 | | |
144 | | Status create_flush_token(std::shared_ptr<FlushToken>& flush_token, |
145 | | std::shared_ptr<RowsetWriter> rowset_writer, bool is_high_priority, |
146 | | std::shared_ptr<WorkloadGroup> wg_sptr); |
147 | | |
148 | | // return true if it already has any flushing task |
149 | 0 | bool check_and_inc_has_any_flushing_task() { |
150 | | // need to use CAS instead of only `if (0 == _flushing_task_count)` statement, |
151 | | // to avoid concurrent entries both pass the if statement |
152 | 0 | int expected_count = 0; |
153 | 0 | if (!_flushing_task_count.compare_exchange_strong(expected_count, 1)) { |
154 | 0 | return true; |
155 | 0 | } |
156 | 0 | DCHECK(expected_count == 0 && _flushing_task_count == 1); |
157 | 0 | return false; |
158 | 0 | } |
159 | | |
160 | 0 | void inc_flushing_task() { _flushing_task_count++; } |
161 | | |
162 | 0 | void dec_flushing_task() { _flushing_task_count--; } |
163 | | |
164 | 10 | ThreadPool* flush_pool() { return _flush_pool.get(); } |
165 | | |
166 | | void update_memtable_flush_threads(); |
167 | | |
168 | | private: |
169 | | std::unique_ptr<ThreadPool> _flush_pool; |
170 | | std::unique_ptr<ThreadPool> _high_prio_flush_pool; |
171 | | std::atomic<int> _flushing_task_count = 0; |
172 | | int _num_disk = 0; |
173 | | }; |
174 | | |
175 | | } // namespace doris |