/root/doris/be/src/olap/memtable.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <stddef.h> |
21 | | #include <stdint.h> |
22 | | |
23 | | #include <cstring> |
24 | | #include <functional> |
25 | | #include <memory> |
26 | | #include <optional> |
27 | | #include <ostream> |
28 | | #include <vector> |
29 | | |
30 | | #include "common/object_pool.h" |
31 | | #include "common/status.h" |
32 | | #include "gutil/integral_types.h" |
33 | | #include "olap/olap_common.h" |
34 | | #include "olap/partial_update_info.h" |
35 | | #include "olap/tablet_schema.h" |
36 | | #include "runtime/memory/mem_tracker.h" |
37 | | #include "runtime/thread_context.h" |
38 | | #include "vec/aggregate_functions/aggregate_function.h" |
39 | | #include "vec/common/arena.h" |
40 | | #include "vec/core/block.h" |
41 | | |
42 | | namespace doris { |
43 | | |
44 | | class Schema; |
45 | | class SlotDescriptor; |
46 | | class TabletSchema; |
47 | | class TupleDescriptor; |
48 | | enum KeysType : int; |
49 | | |
50 | | // row pos in _input_mutable_block |
51 | | struct RowInBlock { |
52 | | size_t _row_pos; |
53 | | char* _agg_mem = nullptr; |
54 | | size_t* _agg_state_offset = nullptr; |
55 | | bool _has_init_agg; |
56 | | |
57 | 12 | RowInBlock(size_t row) : _row_pos(row), _has_init_agg(false) {} |
58 | | |
59 | 2 | void init_agg_places(char* agg_mem, size_t* agg_state_offset) { |
60 | 2 | _has_init_agg = true; |
61 | 2 | _agg_mem = agg_mem; |
62 | 2 | _agg_state_offset = agg_state_offset; |
63 | 2 | } |
64 | | |
65 | 12 | char* agg_places(size_t offset) const { return _agg_mem + _agg_state_offset[offset]; } |
66 | | |
67 | 16 | inline bool has_init_agg() const { return _has_init_agg; } |
68 | | |
69 | 2 | inline void remove_init_agg() { _has_init_agg = false; } |
70 | | }; |
71 | | |
72 | | class Tie { |
73 | | public: |
74 | | class Iter { |
75 | | public: |
76 | 41 | Iter(Tie& tie) : _tie(tie), _next(tie._begin + 1) {} |
77 | 27 | size_t left() const { return _left; } |
78 | 27 | size_t right() const { return _right; } |
79 | | |
80 | | // return false means no more ranges |
81 | 55 | bool next() { |
82 | 55 | if (_next >= _tie._end) { |
83 | 35 | return false; |
84 | 35 | } |
85 | 20 | _next = _find(1, _next); |
86 | 20 | if (_next >= _tie._end) { |
87 | 7 | return false; |
88 | 7 | } |
89 | 13 | _left = _next - 1; |
90 | 13 | _next = _find(0, _next); |
91 | 13 | _right = _next; |
92 | 13 | return true; |
93 | 20 | } |
94 | | |
95 | | private: |
96 | 33 | size_t _find(uint8_t value, size_t start) { |
97 | 33 | if (start >= _tie._end) { |
98 | 0 | return start; |
99 | 0 | } |
100 | 33 | size_t offset = start - _tie._begin; |
101 | 33 | size_t size = _tie._end - start; |
102 | 33 | void* p = std::memchr(_tie._bits.data() + offset, value, size); |
103 | 33 | if (p == nullptr) { |
104 | 17 | return _tie._end; |
105 | 17 | } |
106 | 16 | return static_cast<uint8_t*>(p) - _tie._bits.data() + _tie._begin; |
107 | 33 | } |
108 | | |
109 | | private: |
110 | | Tie& _tie; |
111 | | size_t _left; |
112 | | size_t _right; |
113 | | size_t _next; |
114 | | }; |
115 | | |
116 | | public: |
117 | 11 | Tie(size_t begin, size_t end) : _begin(begin), _end(end) { |
118 | 11 | _bits = std::vector<uint8_t>(_end - _begin, 1); |
119 | 11 | } |
120 | 0 | uint8_t operator[](int i) const { return _bits[i - _begin]; } |
121 | 57 | uint8_t& operator[](int i) { return _bits[i - _begin]; } |
122 | 41 | Iter iter() { return Iter(*this); } |
123 | | |
124 | | private: |
125 | | const size_t _begin; |
126 | | const size_t _end; |
127 | | std::vector<uint8_t> _bits; |
128 | | }; |
129 | | |
130 | | class RowInBlockComparator { |
131 | | public: |
132 | | RowInBlockComparator(std::shared_ptr<TabletSchema> tablet_schema) |
133 | 11 | : _tablet_schema(tablet_schema) {} |
134 | | // call set_block before operator(). |
135 | | // only first time insert block to create _input_mutable_block, |
136 | | // so can not Comparator of construct to set pblock |
137 | 18 | void set_block(vectorized::MutableBlock* pblock) { _pblock = pblock; } |
138 | | int operator()(const RowInBlock* left, const RowInBlock* right) const; |
139 | | |
140 | | private: |
141 | | std::shared_ptr<TabletSchema> _tablet_schema; |
142 | | vectorized::MutableBlock* _pblock = nullptr; // corresponds to Memtable::_input_mutable_block |
143 | | }; |
144 | | |
145 | | class MemTableStat { |
146 | | public: |
147 | 8 | MemTableStat& operator+=(const MemTableStat& stat) { |
148 | 8 | raw_rows += stat.raw_rows; |
149 | 8 | merged_rows += stat.merged_rows; |
150 | 8 | sort_ns += stat.sort_ns; |
151 | 8 | agg_ns += stat.agg_ns; |
152 | 8 | put_into_output_ns += stat.put_into_output_ns; |
153 | 8 | duration_ns += stat.duration_ns; |
154 | 8 | sort_times += stat.sort_times; |
155 | 8 | agg_times += stat.agg_times; |
156 | | |
157 | 8 | return *this; |
158 | 8 | } |
159 | | |
160 | | std::atomic<int64_t> raw_rows = 0; |
161 | | std::atomic<int64_t> merged_rows = 0; |
162 | | int64_t sort_ns = 0; |
163 | | int64_t agg_ns = 0; |
164 | | int64_t put_into_output_ns = 0; |
165 | | int64_t duration_ns = 0; |
166 | | std::atomic<int64_t> sort_times = 0; |
167 | | std::atomic<int64_t> agg_times = 0; |
168 | | }; |
169 | | |
170 | | class MemTable { |
171 | | public: |
172 | | MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schema, |
173 | | const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc, |
174 | | bool enable_unique_key_mow, PartialUpdateInfo* partial_update_info, |
175 | | const std::shared_ptr<MemTracker>& insert_mem_tracker, |
176 | | const std::shared_ptr<MemTracker>& flush_mem_tracker); |
177 | | ~MemTable(); |
178 | | |
179 | 8 | int64_t tablet_id() const { return _tablet_id; } |
180 | 30 | size_t memory_usage() const { |
181 | 30 | return _insert_mem_tracker->consumption() + _arena->used_size() + |
182 | 30 | _flush_mem_tracker->consumption(); |
183 | 30 | } |
184 | | // insert tuple from (row_pos) to (row_pos+num_rows) |
185 | | Status insert(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs); |
186 | | |
187 | | void shrink_memtable_by_agg(); |
188 | | |
189 | | bool need_flush() const; |
190 | | |
191 | | bool need_agg() const; |
192 | | |
193 | | Status to_block(std::unique_ptr<vectorized::Block>* res); |
194 | | |
195 | 11 | bool empty() const { return _input_mutable_block.rows() == 0; } |
196 | | |
197 | 8 | const MemTableStat& stat() { return _stat; } |
198 | | |
199 | 8 | std::shared_ptr<MemTracker> flush_mem_tracker() { return _flush_mem_tracker; } |
200 | | |
201 | 0 | QueryThreadContext query_thread_context() { return _query_thread_context; } |
202 | | |
203 | | private: |
204 | | // for vectorized |
205 | | void _aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, RowInBlock* new_row, |
206 | | RowInBlock* row_in_skiplist); |
207 | | |
208 | | // Used to wrapped by to_block to do exception handle logic |
209 | | Status _to_block(std::unique_ptr<vectorized::Block>* res); |
210 | | |
211 | | private: |
212 | | int64_t _tablet_id; |
213 | | bool _enable_unique_key_mow = false; |
214 | | bool _is_partial_update = false; |
215 | | const KeysType _keys_type; |
216 | | std::shared_ptr<TabletSchema> _tablet_schema; |
217 | | |
218 | | std::shared_ptr<RowInBlockComparator> _vec_row_comparator; |
219 | | |
220 | | QueryThreadContext _query_thread_context; |
221 | | |
222 | | // `_insert_manual_mem_tracker` manually records the memory value of memtable insert() |
223 | | // `_flush_hook_mem_tracker` automatically records the memory value of memtable flush() through mem hook. |
224 | | // Is used to flush when _insert_manual_mem_tracker larger than write_buffer_size and run flush memtable |
225 | | // when the sum of all memtable (_insert_manual_mem_tracker + _flush_hook_mem_tracker) exceeds the limit. |
226 | | std::shared_ptr<MemTracker> _insert_mem_tracker; |
227 | | std::shared_ptr<MemTracker> _flush_mem_tracker; |
228 | | // Only the rows will be inserted into block can allocate memory from _arena. |
229 | | // In this way, we can make MemTable::memory_usage() to be more accurate, and eventually |
230 | | // reduce the number of segment files that are generated by current load |
231 | | std::unique_ptr<vectorized::Arena> _arena; |
232 | | // The object buffer pool for convert tuple to row |
233 | | ObjectPool _agg_buffer_pool; |
234 | | |
235 | | void _init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs, |
236 | | const TupleDescriptor* tuple_desc); |
237 | | std::vector<int> _column_offset; |
238 | | |
239 | | // Number of rows inserted to this memtable. |
240 | | // This is not the rows in this memtable, because rows may be merged |
241 | | // in unique or aggregate key model. |
242 | | MemTableStat _stat; |
243 | | |
244 | | //for vectorized |
245 | | vectorized::MutableBlock _input_mutable_block; |
246 | | vectorized::MutableBlock _output_mutable_block; |
247 | | size_t _last_sorted_pos = 0; |
248 | | |
249 | | //return number of same keys |
250 | | size_t _sort(); |
251 | | Status _sort_by_cluster_keys(); |
252 | | void _sort_one_column(std::vector<RowInBlock*>& row_in_blocks, Tie& tie, |
253 | | std::function<int(const RowInBlock*, const RowInBlock*)> cmp); |
254 | | template <bool is_final> |
255 | | void _finalize_one_row(RowInBlock* row, const vectorized::ColumnsWithTypeAndName& block_data, |
256 | | int row_pos); |
257 | | template <bool is_final> |
258 | | void _aggregate(); |
259 | | Status _put_into_output(vectorized::Block& in_block); |
260 | | bool _is_first_insertion; |
261 | | |
262 | | void _init_agg_functions(const vectorized::Block* block); |
263 | | std::vector<vectorized::AggregateFunctionPtr> _agg_functions; |
264 | | std::vector<size_t> _offsets_of_aggregate_states; |
265 | | size_t _total_size_of_aggregate_states; |
266 | | std::vector<RowInBlock*> _row_in_blocks; |
267 | | // Memory usage without _arena. |
268 | | size_t _mem_usage; |
269 | | |
270 | | size_t _num_columns; |
271 | | int32_t _seq_col_idx_in_block = -1; |
272 | | |
273 | | bool _is_partial_update_and_auto_inc = false; |
274 | | }; // class MemTable |
275 | | |
276 | | } // namespace doris |