/root/doris/be/src/olap/memtable.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <stddef.h> |
21 | | #include <stdint.h> |
22 | | |
23 | | #include <cstring> |
24 | | #include <functional> |
25 | | #include <memory> |
26 | | #include <vector> |
27 | | |
28 | | #include "common/status.h" |
29 | | #include "olap/partial_update_info.h" |
30 | | #include "olap/tablet_schema.h" |
31 | | #include "runtime/memory/mem_tracker.h" |
32 | | #include "runtime/thread_context.h" |
33 | | #include "vec/aggregate_functions/aggregate_function.h" |
34 | | #include "vec/common/arena.h" |
35 | | #include "vec/common/custom_allocator.h" |
36 | | #include "vec/core/block.h" |
37 | | |
38 | | namespace doris { |
39 | | |
40 | | class Schema; |
41 | | class SlotDescriptor; |
42 | | class TabletSchema; |
43 | | class TupleDescriptor; |
44 | | enum KeysType : int; |
45 | | |
46 | | // Active: the memtable is currently used by writer to insert into blocks |
47 | | // Write_finished: the memtable finished write blocks and in the queue waiting for flush |
48 | | // FLUSH: the memtable is under flushing, write segment to disk. |
49 | | enum MemType { ACTIVE = 0, WRITE_FINISHED = 1, FLUSH = 2 }; |
50 | | |
51 | | // row pos in _input_mutable_block |
52 | | struct RowInBlock { |
53 | | size_t _row_pos; |
54 | | char* _agg_mem = nullptr; |
55 | | size_t* _agg_state_offset = nullptr; |
56 | | bool _has_init_agg; |
57 | | |
58 | 24 | RowInBlock(size_t row) : _row_pos(row), _has_init_agg(false) {} |
59 | | |
60 | 3 | void init_agg_places(char* agg_mem, size_t* agg_state_offset) { |
61 | 3 | _has_init_agg = true; |
62 | 3 | _agg_mem = agg_mem; |
63 | 3 | _agg_state_offset = agg_state_offset; |
64 | 3 | } |
65 | | |
66 | 18 | char* agg_places(size_t offset) const { return _agg_mem + _agg_state_offset[offset]; } |
67 | | |
68 | 29 | inline bool has_init_agg() const { return _has_init_agg; } |
69 | | |
70 | 3 | inline void remove_init_agg() { _has_init_agg = false; } |
71 | | }; |
72 | | |
73 | | class Tie { |
74 | | public: |
75 | | class Iter { |
76 | | public: |
77 | 53 | Iter(Tie& tie) : _tie(tie), _next(tie._begin + 1) {} |
78 | 41 | size_t left() const { return _left; } |
79 | 48 | size_t right() const { return _right; } |
80 | | |
81 | | // return false means no more ranges |
82 | 72 | bool next() { |
83 | 72 | if (_next >= _tie._end) { |
84 | 45 | return false; |
85 | 45 | } |
86 | 27 | _next = _find(1, _next); |
87 | 27 | if (_next >= _tie._end) { |
88 | 9 | return false; |
89 | 9 | } |
90 | 18 | _left = _next - 1; |
91 | 18 | _next = _find(0, _next); |
92 | 18 | _right = _next; |
93 | 18 | return true; |
94 | 27 | } |
95 | | |
96 | | private: |
97 | 45 | size_t _find(uint8_t value, size_t start) { |
98 | 45 | if (start >= _tie._end) { |
99 | 0 | return start; |
100 | 0 | } |
101 | 45 | size_t offset = start - _tie._begin; |
102 | 45 | size_t size = _tie._end - start; |
103 | 45 | void* p = std::memchr(_tie._bits.data() + offset, value, size); |
104 | 45 | if (p == nullptr) { |
105 | 23 | return _tie._end; |
106 | 23 | } |
107 | 22 | return static_cast<uint8_t*>(p) - _tie._bits.data() + _tie._begin; |
108 | 45 | } |
109 | | |
110 | | private: |
111 | | Tie& _tie; |
112 | | size_t _left; |
113 | | size_t _right; |
114 | | size_t _next; |
115 | | }; |
116 | | |
117 | | public: |
118 | 16 | Tie(size_t begin, size_t end) : _begin(begin), _end(end) { |
119 | 16 | _bits = std::vector<uint8_t>(_end - _begin, 1); |
120 | 16 | } |
121 | 0 | uint8_t operator[](int i) const { return _bits[i - _begin]; } |
122 | 72 | uint8_t& operator[](int i) { return _bits[i - _begin]; } |
123 | 53 | Iter iter() { return Iter(*this); } |
124 | | |
125 | | private: |
126 | | const size_t _begin; |
127 | | const size_t _end; |
128 | | std::vector<uint8_t> _bits; |
129 | | }; |
130 | | |
131 | | class RowInBlockComparator { |
132 | | public: |
133 | | RowInBlockComparator(std::shared_ptr<TabletSchema> tablet_schema) |
134 | 15 | : _tablet_schema(tablet_schema) {} |
135 | | // call set_block before operator(). |
136 | | // only first time insert block to create _input_mutable_block, |
137 | | // so can not Comparator of construct to set pblock |
138 | 27 | void set_block(vectorized::MutableBlock* pblock) { _pblock = pblock; } |
139 | | int operator()(const RowInBlock* left, const RowInBlock* right) const; |
140 | | |
141 | | private: |
142 | | std::shared_ptr<TabletSchema> _tablet_schema; |
143 | | vectorized::MutableBlock* _pblock = nullptr; // corresponds to Memtable::_input_mutable_block |
144 | | }; |
145 | | |
146 | | class MemTableStat { |
147 | | public: |
148 | 12 | MemTableStat& operator+=(const MemTableStat& stat) { |
149 | 12 | raw_rows += stat.raw_rows; |
150 | 12 | merged_rows += stat.merged_rows; |
151 | 12 | sort_ns += stat.sort_ns; |
152 | 12 | agg_ns += stat.agg_ns; |
153 | 12 | put_into_output_ns += stat.put_into_output_ns; |
154 | 12 | duration_ns += stat.duration_ns; |
155 | 12 | sort_times += stat.sort_times; |
156 | 12 | agg_times += stat.agg_times; |
157 | | |
158 | 12 | return *this; |
159 | 12 | } |
160 | | |
161 | | std::atomic<int64_t> raw_rows = 0; |
162 | | std::atomic<int64_t> merged_rows = 0; |
163 | | int64_t sort_ns = 0; |
164 | | int64_t agg_ns = 0; |
165 | | int64_t put_into_output_ns = 0; |
166 | | int64_t duration_ns = 0; |
167 | | std::atomic<int64_t> sort_times = 0; |
168 | | std::atomic<int64_t> agg_times = 0; |
169 | | }; |
170 | | |
171 | | class MemTable { |
172 | | public: |
173 | | MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schema, |
174 | | const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc, |
175 | | bool enable_unique_key_mow, PartialUpdateInfo* partial_update_info); |
176 | | ~MemTable(); |
177 | | |
178 | 12 | int64_t tablet_id() const { return _tablet_id; } |
179 | 49 | size_t memory_usage() const { return _mem_tracker->consumption(); } |
180 | | size_t get_flush_reserve_memory_size() const; |
181 | | // insert tuple from (row_pos) to (row_pos+num_rows) |
182 | | Status insert(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs); |
183 | | |
184 | | void shrink_memtable_by_agg(); |
185 | | |
186 | | bool need_flush() const; |
187 | | |
188 | | bool need_agg() const; |
189 | | |
190 | | Status to_block(std::unique_ptr<vectorized::Block>* res); |
191 | | |
192 | 15 | bool empty() const { return _input_mutable_block.rows() == 0; } |
193 | | |
194 | 12 | const MemTableStat& stat() { return _stat; } |
195 | | |
196 | 0 | std::shared_ptr<ResourceContext> resource_ctx() { return _resource_ctx; } |
197 | | |
198 | 0 | std::shared_ptr<MemTracker> mem_tracker() { return _mem_tracker; } |
199 | | |
200 | 12 | void set_flush_success() { _is_flush_success = true; } |
201 | | |
202 | 0 | MemType get_mem_type() { return _mem_type; } |
203 | | |
204 | 27 | void update_mem_type(MemType memtype) { _mem_type = memtype; } |
205 | | |
206 | | private: |
207 | | // for vectorized |
208 | | template <bool has_skip_bitmap_col> |
209 | | void _aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, RowInBlock* new_row, |
210 | | RowInBlock* row_in_skiplist); |
211 | | |
212 | | // Used to wrapped by to_block to do exception handle logic |
213 | | Status _to_block(std::unique_ptr<vectorized::Block>* res); |
214 | | |
215 | | private: |
216 | | std::atomic<MemType> _mem_type; |
217 | | int64_t _tablet_id; |
218 | | bool _enable_unique_key_mow = false; |
219 | | bool _is_flush_success = false; |
220 | | UniqueKeyUpdateModePB _partial_update_mode {UniqueKeyUpdateModePB::UPSERT}; |
221 | | const KeysType _keys_type; |
222 | | std::shared_ptr<TabletSchema> _tablet_schema; |
223 | | |
224 | | std::shared_ptr<RowInBlockComparator> _vec_row_comparator; |
225 | | |
226 | | std::shared_ptr<ResourceContext> _resource_ctx; |
227 | | |
228 | | std::shared_ptr<MemTracker> _mem_tracker; |
229 | | // Only the rows will be inserted into block can allocate memory from _arena. |
230 | | // In this way, we can make MemTable::memory_usage() to be more accurate, and eventually |
231 | | // reduce the number of segment files that are generated by current load |
232 | | std::unique_ptr<vectorized::Arena> _arena; |
233 | | |
234 | | void _init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs, |
235 | | const TupleDescriptor* tuple_desc); |
236 | | std::vector<int> _column_offset; |
237 | | |
238 | | // Number of rows inserted to this memtable. |
239 | | // This is not the rows in this memtable, because rows may be merged |
240 | | // in unique or aggregate key model. |
241 | | MemTableStat _stat; |
242 | | |
243 | | //for vectorized |
244 | | vectorized::MutableBlock _input_mutable_block; |
245 | | vectorized::MutableBlock _output_mutable_block; |
246 | | size_t _last_sorted_pos = 0; |
247 | | |
248 | | //return number of same keys |
249 | | size_t _sort(); |
250 | | Status _sort_by_cluster_keys(); |
251 | | void _sort_one_column(DorisVector<RowInBlock*>& row_in_blocks, Tie& tie, |
252 | | std::function<int(const RowInBlock*, const RowInBlock*)> cmp); |
253 | | template <bool is_final> |
254 | | void _finalize_one_row(RowInBlock* row, const vectorized::ColumnsWithTypeAndName& block_data, |
255 | | int row_pos); |
256 | | template <bool is_final, bool has_skip_bitmap_col = false> |
257 | | void _aggregate(); |
258 | | Status _put_into_output(vectorized::Block& in_block); |
259 | | bool _is_first_insertion; |
260 | | |
261 | | void _init_agg_functions(const vectorized::Block* block); |
262 | | std::vector<vectorized::AggregateFunctionPtr> _agg_functions; |
263 | | std::vector<size_t> _offsets_of_aggregate_states; |
264 | | size_t _total_size_of_aggregate_states; |
265 | | std::unique_ptr<DorisVector<RowInBlock*>> _row_in_blocks; |
266 | | |
267 | | size_t _num_columns; |
268 | | int32_t _seq_col_idx_in_block = -1; |
269 | | int32_t _skip_bitmap_col_idx {-1}; |
270 | | int32_t _seq_col_unique_id {-1}; |
271 | | |
272 | | bool _is_partial_update_and_auto_inc = false; |
273 | | }; // class MemTable |
274 | | |
275 | | } // namespace doris |