/root/doris/be/src/olap/memtable.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <stddef.h> |
21 | | #include <stdint.h> |
22 | | |
23 | | #include <cstdint> |
24 | | #include <cstring> |
25 | | #include <functional> |
26 | | #include <memory> |
27 | | #include <vector> |
28 | | |
29 | | #include "common/status.h" |
30 | | #include "olap/partial_update_info.h" |
31 | | #include "olap/tablet_schema.h" |
32 | | #include "runtime/memory/mem_tracker.h" |
33 | | #include "runtime/thread_context.h" |
34 | | #include "vec/aggregate_functions/aggregate_function.h" |
35 | | #include "vec/common/arena.h" |
36 | | #include "vec/common/custom_allocator.h" |
37 | | #include "vec/core/block.h" |
38 | | |
39 | | namespace doris { |
40 | | |
41 | | class Schema; |
42 | | class SlotDescriptor; |
43 | | class TabletSchema; |
44 | | class TupleDescriptor; |
45 | | enum KeysType : int; |
46 | | |
47 | | // Active: the memtable is currently used by writer to insert into blocks |
48 | | // Write_finished: the memtable finished write blocks and in the queue waiting for flush |
49 | | // FLUSH: the memtable is under flushing, write segment to disk. |
50 | | enum MemType { ACTIVE = 0, WRITE_FINISHED = 1, FLUSH = 2 }; |
51 | | |
52 | | // row pos in _input_mutable_block |
53 | | struct RowInBlock { |
54 | | size_t _row_pos; |
55 | | char* _agg_mem = nullptr; |
56 | | size_t* _agg_state_offset = nullptr; |
57 | | bool _has_init_agg; |
58 | | |
59 | 24 | RowInBlock(size_t row) : _row_pos(row), _has_init_agg(false) {} |
60 | | |
61 | 3 | void init_agg_places(char* agg_mem, size_t* agg_state_offset) { |
62 | 3 | _has_init_agg = true; |
63 | 3 | _agg_mem = agg_mem; |
64 | 3 | _agg_state_offset = agg_state_offset; |
65 | 3 | } |
66 | | |
67 | 18 | char* agg_places(size_t offset) const { return _agg_mem + _agg_state_offset[offset]; } |
68 | | |
69 | 29 | inline bool has_init_agg() const { return _has_init_agg; } |
70 | | |
71 | 3 | inline void remove_init_agg() { _has_init_agg = false; } |
72 | | }; |
73 | | |
74 | | class Tie { |
75 | | public: |
76 | | class Iter { |
77 | | public: |
78 | 53 | Iter(Tie& tie) : _tie(tie), _next(tie._begin + 1) {} |
79 | 41 | size_t left() const { return _left; } |
80 | 48 | size_t right() const { return _right; } |
81 | | |
82 | | // return false means no more ranges |
83 | 72 | bool next() { |
84 | 72 | if (_next >= _tie._end) { |
85 | 45 | return false; |
86 | 45 | } |
87 | 27 | _next = _find(1, _next); |
88 | 27 | if (_next >= _tie._end) { |
89 | 9 | return false; |
90 | 9 | } |
91 | 18 | _left = _next - 1; |
92 | 18 | _next = _find(0, _next); |
93 | 18 | _right = _next; |
94 | 18 | return true; |
95 | 27 | } |
96 | | |
97 | | private: |
98 | 45 | size_t _find(uint8_t value, size_t start) { |
99 | 45 | if (start >= _tie._end) { |
100 | 0 | return start; |
101 | 0 | } |
102 | 45 | size_t offset = start - _tie._begin; |
103 | 45 | size_t size = _tie._end - start; |
104 | 45 | void* p = std::memchr(_tie._bits.data() + offset, value, size); |
105 | 45 | if (p == nullptr) { |
106 | 23 | return _tie._end; |
107 | 23 | } |
108 | 22 | return static_cast<uint8_t*>(p) - _tie._bits.data() + _tie._begin; |
109 | 45 | } |
110 | | |
111 | | private: |
112 | | Tie& _tie; |
113 | | size_t _left; |
114 | | size_t _right; |
115 | | size_t _next; |
116 | | }; |
117 | | |
118 | | public: |
119 | 16 | Tie(size_t begin, size_t end) : _begin(begin), _end(end) { |
120 | 16 | _bits = std::vector<uint8_t>(_end - _begin, 1); |
121 | 16 | } |
122 | 0 | uint8_t operator[](size_t i) const { return _bits[i - _begin]; } |
123 | 72 | uint8_t& operator[](size_t i) { return _bits[i - _begin]; } |
124 | 53 | Iter iter() { return Iter(*this); } |
125 | | |
126 | | private: |
127 | | const size_t _begin; |
128 | | const size_t _end; |
129 | | std::vector<uint8_t> _bits; |
130 | | }; |
131 | | |
132 | | class RowInBlockComparator { |
133 | | public: |
134 | | RowInBlockComparator(std::shared_ptr<TabletSchema> tablet_schema) |
135 | 15 | : _tablet_schema(tablet_schema) {} |
136 | | // call set_block before operator(). |
137 | | // only first time insert block to create _input_mutable_block, |
138 | | // so can not Comparator of construct to set pblock |
139 | 27 | void set_block(vectorized::MutableBlock* pblock) { _pblock = pblock; } |
140 | | int operator()(const RowInBlock* left, const RowInBlock* right) const; |
141 | | |
142 | | private: |
143 | | std::shared_ptr<TabletSchema> _tablet_schema; |
144 | | vectorized::MutableBlock* _pblock = nullptr; // corresponds to Memtable::_input_mutable_block |
145 | | }; |
146 | | |
147 | | class MemTableStat { |
148 | | public: |
149 | 12 | MemTableStat& operator+=(const MemTableStat& stat) { |
150 | 12 | raw_rows += stat.raw_rows; |
151 | 12 | merged_rows += stat.merged_rows; |
152 | 12 | sort_ns += stat.sort_ns; |
153 | 12 | agg_ns += stat.agg_ns; |
154 | 12 | put_into_output_ns += stat.put_into_output_ns; |
155 | 12 | duration_ns += stat.duration_ns; |
156 | 12 | sort_times += stat.sort_times; |
157 | 12 | agg_times += stat.agg_times; |
158 | | |
159 | 12 | return *this; |
160 | 12 | } |
161 | | |
162 | | std::atomic<int64_t> raw_rows = 0; |
163 | | std::atomic<int64_t> merged_rows = 0; |
164 | | int64_t sort_ns = 0; |
165 | | int64_t agg_ns = 0; |
166 | | int64_t put_into_output_ns = 0; |
167 | | int64_t duration_ns = 0; |
168 | | std::atomic<int64_t> sort_times = 0; |
169 | | std::atomic<int64_t> agg_times = 0; |
170 | | }; |
171 | | |
172 | | class MemTable { |
173 | | public: |
174 | | MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schema, |
175 | | const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc, |
176 | | bool enable_unique_key_mow, PartialUpdateInfo* partial_update_info, |
177 | | const std::shared_ptr<ResourceContext>& resource_ctx); |
178 | | ~MemTable(); |
179 | | |
180 | 12 | int64_t tablet_id() const { return _tablet_id; } |
181 | 49 | size_t memory_usage() const { return _mem_tracker->consumption(); } |
182 | | size_t get_flush_reserve_memory_size() const; |
183 | | // insert tuple from (row_pos) to (row_pos+num_rows) |
184 | | Status insert(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs); |
185 | | |
186 | | void shrink_memtable_by_agg(); |
187 | | |
188 | | bool need_flush() const; |
189 | | |
190 | | bool need_agg() const; |
191 | | |
192 | | Status to_block(std::unique_ptr<vectorized::Block>* res); |
193 | | |
194 | 15 | bool empty() const { return _input_mutable_block.rows() == 0; } |
195 | | |
196 | 12 | const MemTableStat& stat() { return _stat; } |
197 | | |
198 | 48 | std::shared_ptr<ResourceContext> resource_ctx() { return _resource_ctx; } |
199 | | |
200 | 12 | std::shared_ptr<MemTracker> mem_tracker() { return _mem_tracker; } |
201 | | |
202 | 12 | void set_flush_success() { _is_flush_success = true; } |
203 | | |
204 | 0 | MemType get_mem_type() { return _mem_type; } |
205 | | |
206 | 27 | void update_mem_type(MemType memtype) { _mem_type = memtype; } |
207 | | |
208 | 20 | int64_t raw_rows() { return _stat.raw_rows.load(); } |
209 | | |
210 | | private: |
211 | | // for vectorized |
212 | | template <bool has_skip_bitmap_col> |
213 | | void _aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, RowInBlock* new_row, |
214 | | RowInBlock* row_in_skiplist); |
215 | | |
216 | | // Used to wrapped by to_block to do exception handle logic |
217 | | Status _to_block(std::unique_ptr<vectorized::Block>* res); |
218 | | |
219 | | int64_t _adaptive_write_buffer_size() const; |
220 | | |
221 | | private: |
222 | | std::atomic<MemType> _mem_type; |
223 | | int64_t _tablet_id; |
224 | | bool _enable_unique_key_mow = false; |
225 | | bool _is_flush_success = false; |
226 | | UniqueKeyUpdateModePB _partial_update_mode {UniqueKeyUpdateModePB::UPSERT}; |
227 | | const KeysType _keys_type; |
228 | | std::shared_ptr<TabletSchema> _tablet_schema; |
229 | | |
230 | | std::shared_ptr<RowInBlockComparator> _vec_row_comparator; |
231 | | |
232 | | std::shared_ptr<ResourceContext> _resource_ctx; |
233 | | |
234 | | std::shared_ptr<MemTracker> _mem_tracker; |
235 | | // Only the rows will be inserted into block can allocate memory from _arena. |
236 | | // In this way, we can make MemTable::memory_usage() to be more accurate, and eventually |
237 | | // reduce the number of segment files that are generated by current load |
238 | | vectorized::Arena _arena; |
239 | | int64_t _load_mem_limit = -1; |
240 | | |
241 | | void _init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs, |
242 | | const TupleDescriptor* tuple_desc); |
243 | | std::vector<int> _column_offset; |
244 | | |
245 | | // Number of rows inserted to this memtable. |
246 | | // This is not the rows in this memtable, because rows may be merged |
247 | | // in unique or aggregate key model. |
248 | | MemTableStat _stat; |
249 | | |
250 | | //for vectorized |
251 | | vectorized::MutableBlock _input_mutable_block; |
252 | | vectorized::MutableBlock _output_mutable_block; |
253 | | size_t _last_sorted_pos = 0; |
254 | | size_t _last_agg_pos = 0; |
255 | | |
256 | | //return number of same keys |
257 | | size_t _sort(); |
258 | | Status _sort_by_cluster_keys(); |
259 | | void _sort_one_column(DorisVector<std::shared_ptr<RowInBlock>>& row_in_blocks, Tie& tie, |
260 | | std::function<int(RowInBlock*, RowInBlock*)> cmp); |
261 | | template <bool is_final> |
262 | | void _finalize_one_row(RowInBlock* row, const vectorized::ColumnsWithTypeAndName& block_data, |
263 | | int row_pos); |
264 | | void _init_row_for_agg(RowInBlock* row, vectorized::MutableBlock& mutable_block); |
265 | | void _clear_row_agg(RowInBlock* row); |
266 | | |
267 | | template <bool is_final, bool has_skip_bitmap_col = false> |
268 | | void _aggregate(); |
269 | | |
270 | | template <bool is_final> |
271 | | void _aggregate_for_flexible_partial_update_without_seq_col( |
272 | | const vectorized::ColumnsWithTypeAndName& block_data, |
273 | | vectorized::MutableBlock& mutable_block, |
274 | | DorisVector<std::shared_ptr<RowInBlock>>& temp_row_in_blocks); |
275 | | |
276 | | template <bool is_final> |
277 | | void _aggregate_for_flexible_partial_update_with_seq_col( |
278 | | const vectorized::ColumnsWithTypeAndName& block_data, |
279 | | vectorized::MutableBlock& mutable_block, |
280 | | DorisVector<std::shared_ptr<RowInBlock>>& temp_row_in_blocks); |
281 | | |
282 | | Status _put_into_output(vectorized::Block& in_block); |
283 | | bool _is_first_insertion; |
284 | | |
285 | | void _init_agg_functions(const vectorized::Block* block); |
286 | | std::vector<vectorized::AggregateFunctionPtr> _agg_functions; |
287 | | std::vector<size_t> _offsets_of_aggregate_states; |
288 | | size_t _total_size_of_aggregate_states; |
289 | | std::unique_ptr<DorisVector<std::shared_ptr<RowInBlock>>> _row_in_blocks; |
290 | | |
291 | | size_t _num_columns; |
292 | | int32_t _seq_col_idx_in_block {-1}; |
293 | | int32_t _skip_bitmap_col_idx {-1}; |
294 | | int32_t _delete_sign_col_idx {-1}; |
295 | | int32_t _delete_sign_col_unique_id {-1}; |
296 | | int32_t _seq_col_unique_id {-1}; |
297 | | |
298 | | bool _is_partial_update_and_auto_inc = false; |
299 | | }; // class MemTable |
300 | | |
301 | | } // namespace doris |