be/src/load/memtable/memtable_flush_executor.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <atomic> |
21 | | #include <condition_variable> |
22 | | #include <cstdint> |
23 | | #include <iosfwd> |
24 | | #include <memory> |
25 | | #include <utility> |
26 | | #include <vector> |
27 | | |
28 | | #include "common/status.h" |
29 | | #include "load/delta_writer/delta_writer_context.h" |
30 | | #include "load/memtable/memtable.h" |
31 | | #include "util/threadpool.h" |
32 | | |
33 | | namespace doris { |
34 | | |
35 | | class DataDir; |
36 | | class MemTable; |
37 | | class MemTableMemoryLimiter; |
38 | | class Block; |
39 | | class GroupRowsetWriter; |
40 | | class OlapTableSchemaParam; |
41 | | class AutoIncIDBuffer; |
42 | | class RowsetWriter; |
43 | | class SystemMetrics; |
44 | | class WorkloadGroup; |
45 | | |
46 | | // the statistic of a certain flush handler. |
47 | | // use atomic because it may be updated by multi threads |
48 | | struct FlushStatistic { |
49 | | std::atomic_uint64_t flush_time_ns = 0; |
50 | | std::atomic_uint64_t flush_submit_count = 0; |
51 | | std::atomic_int64_t flush_running_count = 0; |
52 | | std::atomic_uint64_t flush_finish_count = 0; |
53 | | std::atomic_uint64_t flush_size_bytes = 0; |
54 | | std::atomic_uint64_t flush_disk_size_bytes = 0; |
55 | | std::atomic_uint64_t flush_wait_time_ns = 0; |
56 | | }; |
57 | | |
58 | | struct SharedMemtable { |
59 | | std::shared_ptr<MemTable> memtable; |
60 | | int32_t segment_id = 0; |
61 | | |
62 | | ~SharedMemtable(); |
63 | | |
64 | | std::once_flag block_once; |
65 | | Status block_status; |
66 | | std::shared_ptr<Block> block; |
67 | | |
68 | | std::atomic<int> finished_sub_task_count {0}; |
69 | | // data + binlog |
70 | | std::atomic<int> total_sub_task_count {2}; |
71 | | |
72 | 4 | int add_finished_sub_task() { return finished_sub_task_count.fetch_add(1); } |
73 | | |
74 | 0 | std::string debug_string() const { |
75 | 0 | return "PartOfGroupMemtableFlushTask{segment_id=" + std::to_string(segment_id) + |
76 | 0 | ", finished_sub_task_count=" + std::to_string(finished_sub_task_count.load()) + |
77 | 0 | ", total_sub_task_count=" + std::to_string(total_sub_task_count.load()) + "}"; |
78 | 0 | } |
79 | | }; |
80 | | |
81 | | std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat); |
82 | | |
83 | | // A thin wrapper of ThreadPoolToken to submit task. |
84 | | // For a tablet, there may be multiple memtables, which will be flushed to disk |
85 | | // one by one in the order of generation. |
86 | | // If a memtable flush fails, then: |
87 | | // 1. Immediately disallow submission of any subsequent memtable |
88 | | // 2. For the memtables that have already been submitted, there is no need to flush, |
89 | | // because the entire job will definitely fail; |
90 | | class FlushToken : public std::enable_shared_from_this<FlushToken> { |
91 | | ENABLE_FACTORY_CREATOR(FlushToken); |
92 | | |
93 | | public: |
94 | | FlushToken(ThreadPool* thread_pool, std::shared_ptr<WorkloadGroup> wg_sptr) |
95 | 18 | : _flush_status(Status::OK()), _thread_pool(thread_pool), _wg_wptr(wg_sptr) {} |
96 | | |
97 | | Status submit(std::shared_ptr<MemTable> mem_table); |
98 | | |
99 | | // error has happens, so we cancel this token |
100 | | // And remove all tasks in the queue. |
101 | | void cancel(); |
102 | | |
103 | | // wait all tasks in token to be completed. |
104 | | Status wait(); |
105 | | |
106 | | // get flush operations' statistics |
107 | 41 | const FlushStatistic& get_stats() const { return _stats; } |
108 | | |
109 | 18 | void set_rowset_writer(std::shared_ptr<RowsetWriter> rowset_writer) { |
110 | 18 | _rowset_writer = rowset_writer; |
111 | 18 | } |
112 | | |
113 | 18 | void set_table_schema_param(std::shared_ptr<OlapTableSchemaParam> table_schema_param) { |
114 | 18 | _table_schema_param = std::move(table_schema_param); |
115 | 18 | } |
116 | | |
117 | | #ifdef BE_TEST |
118 | | void set_row_binlog_lsn_buffer_for_test( |
119 | 3 | std::shared_ptr<AutoIncIDBuffer> row_binlog_lsn_buffer) { |
120 | 3 | _row_binlog_lsn_buffer = std::move(row_binlog_lsn_buffer); |
121 | 3 | } |
122 | | #endif |
123 | | |
124 | 30 | const MemTableStat& memtable_stat() { return _memtable_stat; } |
125 | | |
126 | | private: |
127 | 32 | void _shutdown_flush_token() { _shutdown.store(true); } |
128 | 36 | bool _is_shutdown() { return _shutdown.load(); } |
129 | | void _wait_submit_task_finish(); |
130 | | void _wait_running_task_finish(); |
131 | | |
132 | | private: |
133 | | friend class MemtableFlushTask; |
134 | | friend class PartOfGroupMemtableFlushTask; |
135 | | |
136 | | Status _submit_sub_tasks(ThreadPool* pool, std::vector<std::shared_ptr<Runnable>> sub_tasks); |
137 | | |
138 | | void _flush_memtable_impl(RowsetWriter* flush_writer, MemTable* memtable, int32_t segment_id, |
139 | | int64_t submit_task_time, SharedMemtable* shared_memtable = nullptr); |
140 | | |
141 | | void _flush_memtable(std::shared_ptr<MemTable> memtable_ptr, int32_t segment_id, |
142 | | int64_t submit_task_time); |
143 | | |
144 | | void _flush_group_memtable(std::shared_ptr<SharedMemtable> shared_memtable, |
145 | | WriteRequestType write_req_type, int64_t submit_task_time); |
146 | | |
147 | | Status _memtable2block(MemTable* memtable, SharedMemtable* shared_memtable, |
148 | | std::shared_ptr<Block>& flush_block); |
149 | | |
150 | | Status _try_reserve_memory(const std::shared_ptr<ResourceContext>& resource_context, |
151 | | int64_t size); |
152 | | |
153 | | // Records the current flush status of the tablet. |
154 | | // Note: Once its value is set to Failed, it cannot return to SUCCESS. |
155 | | std::shared_mutex _flush_status_lock; |
156 | | Status _flush_status; |
157 | | |
158 | | FlushStatistic _stats; |
159 | | |
160 | | std::shared_ptr<RowsetWriter> _rowset_writer = nullptr; |
161 | | |
162 | | std::shared_ptr<OlapTableSchemaParam> _table_schema_param = nullptr; |
163 | | std::shared_ptr<AutoIncIDBuffer> _row_binlog_lsn_buffer = nullptr; |
164 | | |
165 | | MemTableStat _memtable_stat; |
166 | | |
167 | | std::atomic<bool> _shutdown = false; |
168 | | ThreadPool* _thread_pool = nullptr; |
169 | | |
170 | | std::mutex _mutex; |
171 | | std::condition_variable _submit_task_finish_cond; |
172 | | std::condition_variable _running_task_finish_cond; |
173 | | |
174 | | std::weak_ptr<WorkloadGroup> _wg_wptr; |
175 | | }; |
176 | | |
177 | | // MemTableFlushExecutor is responsible for flushing memtables to disk. |
178 | | // It encapsulate a ThreadPool to handle all tasks. |
179 | | // Usage Example: |
180 | | // ... |
181 | | // std::shared_ptr<FlushHandler> flush_handler; |
182 | | // memTableFlushExecutor.create_flush_token(&flush_handler); |
183 | | // ... |
184 | | // flush_token->submit(memtable) |
185 | | // ... |
186 | | class MemTableFlushExecutor { |
187 | | public: |
188 | 43 | MemTableFlushExecutor() = default; |
189 | 43 | ~MemTableFlushExecutor() { |
190 | 43 | _flush_pool->shutdown(); |
191 | 43 | _high_prio_flush_pool->shutdown(); |
192 | 43 | } |
193 | | |
194 | | // init should be called after storage engine is opened, |
195 | | // because it needs path hash of each data dir. |
196 | | void init(int num_disk); |
197 | | |
198 | | Status create_flush_token(std::shared_ptr<FlushToken>& flush_token, |
199 | | std::shared_ptr<RowsetWriter> rowset_writer, bool is_high_priority, |
200 | | std::shared_ptr<WorkloadGroup> wg_sptr, |
201 | | std::shared_ptr<OlapTableSchemaParam> table_schema_param = nullptr); |
202 | | |
203 | | // return true if it already has any flushing task |
204 | 0 | bool check_and_inc_has_any_flushing_task() { |
205 | | // need to use CAS instead of only `if (0 == _flushing_task_count)` statement, |
206 | | // to avoid concurrent entries both pass the if statement |
207 | 0 | int expected_count = 0; |
208 | 0 | if (!_flushing_task_count.compare_exchange_strong(expected_count, 1)) { |
209 | 0 | return true; |
210 | 0 | } |
211 | 0 | DCHECK(expected_count == 0 && _flushing_task_count == 1); |
212 | 0 | return false; |
213 | 0 | } |
214 | | |
215 | 0 | void inc_flushing_task() { _flushing_task_count++; } |
216 | | |
217 | 0 | void dec_flushing_task() { _flushing_task_count--; } |
218 | | |
219 | 10 | ThreadPool* flush_pool() { return _flush_pool.get(); } |
220 | | |
221 | 0 | ThreadPool* high_prio_flush_pool() { return _high_prio_flush_pool.get(); } |
222 | | |
223 | | void update_memtable_flush_threads(); |
224 | | |
225 | | // Returns {min_threads, max_threads} for a flush thread pool. |
226 | | // thread_num_per_store is used as the baseline when adaptive mode is off. |
227 | | static std::pair<int, int> calc_flush_thread_count(int num_cpus, int num_disk, |
228 | | int thread_num_per_store); |
229 | | |
230 | | private: |
231 | | std::unique_ptr<ThreadPool> _flush_pool; |
232 | | std::unique_ptr<ThreadPool> _high_prio_flush_pool; |
233 | | std::atomic<int> _flushing_task_count = 0; |
234 | | int _num_disk = 0; |
235 | | }; |
236 | | |
237 | | } // namespace doris |