/root/doris/be/src/olap/memtable.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <stddef.h> |
21 | | #include <stdint.h> |
22 | | |
23 | | #include <cstring> |
24 | | #include <functional> |
25 | | #include <memory> |
26 | | #include <optional> |
27 | | #include <ostream> |
28 | | #include <vector> |
29 | | |
30 | | #include "common/object_pool.h" |
31 | | #include "common/status.h" |
32 | | #include "gutil/integral_types.h" |
33 | | #include "olap/olap_common.h" |
34 | | #include "olap/partial_update_info.h" |
35 | | #include "olap/tablet_schema.h" |
36 | | #include "runtime/memory/mem_tracker.h" |
37 | | #include "runtime/thread_context.h" |
38 | | #include "vec/aggregate_functions/aggregate_function.h" |
39 | | #include "vec/common/arena.h" |
40 | | #include "vec/core/block.h" |
41 | | |
42 | | namespace doris { |
43 | | |
44 | | class Schema; |
45 | | class SlotDescriptor; |
46 | | class TabletSchema; |
47 | | class TupleDescriptor; |
48 | | enum KeysType : int; |
49 | | |
50 | | // Active: the memtable is currently used by writer to insert into blocks |
51 | | // Write_finished: the memtable finished write blocks and in the queue waiting for flush |
52 | | // FLUSH: the memtable is under flushing, write segment to disk. |
53 | | enum MemType { ACTIVE = 0, WRITE_FINISHED = 1, FLUSH = 2 }; |
54 | | |
55 | | // row pos in _input_mutable_block |
56 | | struct RowInBlock { |
57 | | size_t _row_pos; |
58 | | char* _agg_mem = nullptr; |
59 | | size_t* _agg_state_offset = nullptr; |
60 | | bool _has_init_agg; |
61 | | |
62 | 21 | RowInBlock(size_t row) : _row_pos(row), _has_init_agg(false) {} |
63 | | |
64 | 3 | void init_agg_places(char* agg_mem, size_t* agg_state_offset) { |
65 | 3 | _has_init_agg = true; |
66 | 3 | _agg_mem = agg_mem; |
67 | 3 | _agg_state_offset = agg_state_offset; |
68 | 3 | } |
69 | | |
70 | 18 | char* agg_places(size_t offset) const { return _agg_mem + _agg_state_offset[offset]; } |
71 | | |
72 | 26 | inline bool has_init_agg() const { return _has_init_agg; } |
73 | | |
74 | 3 | inline void remove_init_agg() { _has_init_agg = false; } |
75 | | }; |
76 | | |
77 | | class Tie { |
78 | | public: |
79 | | class Iter { |
80 | | public: |
81 | 47 | Iter(Tie& tie) : _tie(tie), _next(tie._begin + 1) {} |
82 | 41 | size_t left() const { return _left; } |
83 | 48 | size_t right() const { return _right; } |
84 | | |
85 | | // return false means no more ranges |
86 | 66 | bool next() { |
87 | 66 | if (_next >= _tie._end) { |
88 | 39 | return false; |
89 | 39 | } |
90 | 27 | _next = _find(1, _next); |
91 | 27 | if (_next >= _tie._end) { |
92 | 9 | return false; |
93 | 9 | } |
94 | 18 | _left = _next - 1; |
95 | 18 | _next = _find(0, _next); |
96 | 18 | _right = _next; |
97 | 18 | return true; |
98 | 27 | } |
99 | | |
100 | | private: |
101 | 45 | size_t _find(uint8_t value, size_t start) { |
102 | 45 | if (start >= _tie._end) { |
103 | 0 | return start; |
104 | 0 | } |
105 | 45 | size_t offset = start - _tie._begin; |
106 | 45 | size_t size = _tie._end - start; |
107 | 45 | void* p = std::memchr(_tie._bits.data() + offset, value, size); |
108 | 45 | if (p == nullptr) { |
109 | 23 | return _tie._end; |
110 | 23 | } |
111 | 22 | return static_cast<uint8_t*>(p) - _tie._bits.data() + _tie._begin; |
112 | 45 | } |
113 | | |
114 | | private: |
115 | | Tie& _tie; |
116 | | size_t _left; |
117 | | size_t _right; |
118 | | size_t _next; |
119 | | }; |
120 | | |
121 | | public: |
122 | 13 | Tie(size_t begin, size_t end) : _begin(begin), _end(end) { |
123 | 13 | _bits = std::vector<uint8_t>(_end - _begin, 1); |
124 | 13 | } |
125 | 0 | uint8_t operator[](int i) const { return _bits[i - _begin]; } |
126 | 72 | uint8_t& operator[](int i) { return _bits[i - _begin]; } |
127 | 47 | Iter iter() { return Iter(*this); } |
128 | | |
129 | | private: |
130 | | const size_t _begin; |
131 | | const size_t _end; |
132 | | std::vector<uint8_t> _bits; |
133 | | }; |
134 | | |
135 | | class RowInBlockComparator { |
136 | | public: |
137 | | RowInBlockComparator(std::shared_ptr<TabletSchema> tablet_schema) |
138 | 12 | : _tablet_schema(tablet_schema) {} |
139 | | // call set_block before operator(). |
140 | | // only first time insert block to create _input_mutable_block, |
141 | | // so can not Comparator of construct to set pblock |
142 | 21 | void set_block(vectorized::MutableBlock* pblock) { _pblock = pblock; } |
143 | | int operator()(const RowInBlock* left, const RowInBlock* right) const; |
144 | | |
145 | | private: |
146 | | std::shared_ptr<TabletSchema> _tablet_schema; |
147 | | vectorized::MutableBlock* _pblock = nullptr; // corresponds to Memtable::_input_mutable_block |
148 | | }; |
149 | | |
150 | | class MemTableStat { |
151 | | public: |
152 | 9 | MemTableStat& operator+=(const MemTableStat& stat) { |
153 | 9 | raw_rows += stat.raw_rows; |
154 | 9 | merged_rows += stat.merged_rows; |
155 | 9 | sort_ns += stat.sort_ns; |
156 | 9 | agg_ns += stat.agg_ns; |
157 | 9 | put_into_output_ns += stat.put_into_output_ns; |
158 | 9 | duration_ns += stat.duration_ns; |
159 | 9 | sort_times += stat.sort_times; |
160 | 9 | agg_times += stat.agg_times; |
161 | | |
162 | 9 | return *this; |
163 | 9 | } |
164 | | |
165 | | std::atomic<int64_t> raw_rows = 0; |
166 | | std::atomic<int64_t> merged_rows = 0; |
167 | | int64_t sort_ns = 0; |
168 | | int64_t agg_ns = 0; |
169 | | int64_t put_into_output_ns = 0; |
170 | | int64_t duration_ns = 0; |
171 | | std::atomic<int64_t> sort_times = 0; |
172 | | std::atomic<int64_t> agg_times = 0; |
173 | | }; |
174 | | |
175 | | class MemTable { |
176 | | public: |
177 | | MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schema, |
178 | | const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc, |
179 | | bool enable_unique_key_mow, PartialUpdateInfo* partial_update_info); |
180 | | ~MemTable(); |
181 | | |
182 | 9 | int64_t tablet_id() const { return _tablet_id; } |
183 | 37 | size_t memory_usage() const { return _mem_tracker->consumption(); } |
184 | | // insert tuple from (row_pos) to (row_pos+num_rows) |
185 | | Status insert(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs); |
186 | | |
187 | | void shrink_memtable_by_agg(); |
188 | | |
189 | | bool need_flush() const; |
190 | | |
191 | | bool need_agg() const; |
192 | | |
193 | | Status to_block(std::unique_ptr<vectorized::Block>* res); |
194 | | |
195 | 12 | bool empty() const { return _input_mutable_block.rows() == 0; } |
196 | | |
197 | 9 | const MemTableStat& stat() { return _stat; } |
198 | | |
199 | 0 | QueryThreadContext query_thread_context() { return _query_thread_context; } |
200 | | |
201 | 0 | std::shared_ptr<MemTracker> mem_tracker() { return _mem_tracker; } |
202 | | |
203 | 9 | void set_flush_success() { _is_flush_success = true; } |
204 | | |
205 | 0 | MemType get_mem_type() { return _mem_type; } |
206 | | |
207 | 21 | void update_mem_type(MemType memtype) { _mem_type = memtype; } |
208 | | |
209 | | private: |
210 | | // for vectorized |
211 | | template <bool has_skip_bitmap_col> |
212 | | void _aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, RowInBlock* new_row, |
213 | | RowInBlock* row_in_skiplist); |
214 | | |
215 | | // Used to wrapped by to_block to do exception handle logic |
216 | | Status _to_block(std::unique_ptr<vectorized::Block>* res); |
217 | | |
218 | | private: |
219 | | std::atomic<MemType> _mem_type; |
220 | | int64_t _tablet_id; |
221 | | bool _enable_unique_key_mow = false; |
222 | | bool _is_flush_success = false; |
223 | | UniqueKeyUpdateModePB _partial_update_mode {UniqueKeyUpdateModePB::UPSERT}; |
224 | | const KeysType _keys_type; |
225 | | std::shared_ptr<TabletSchema> _tablet_schema; |
226 | | |
227 | | std::shared_ptr<RowInBlockComparator> _vec_row_comparator; |
228 | | |
229 | | QueryThreadContext _query_thread_context; |
230 | | |
231 | | std::shared_ptr<MemTracker> _mem_tracker; |
232 | | // Only the rows will be inserted into block can allocate memory from _arena. |
233 | | // In this way, we can make MemTable::memory_usage() to be more accurate, and eventually |
234 | | // reduce the number of segment files that are generated by current load |
235 | | std::unique_ptr<vectorized::Arena> _arena; |
236 | | |
237 | | void _init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs, |
238 | | const TupleDescriptor* tuple_desc); |
239 | | std::vector<int> _column_offset; |
240 | | |
241 | | // Number of rows inserted to this memtable. |
242 | | // This is not the rows in this memtable, because rows may be merged |
243 | | // in unique or aggregate key model. |
244 | | MemTableStat _stat; |
245 | | |
246 | | //for vectorized |
247 | | vectorized::MutableBlock _input_mutable_block; |
248 | | vectorized::MutableBlock _output_mutable_block; |
249 | | size_t _last_sorted_pos = 0; |
250 | | |
251 | | //return number of same keys |
252 | | size_t _sort(); |
253 | | Status _sort_by_cluster_keys(); |
254 | | void _sort_one_column(std::vector<RowInBlock*>& row_in_blocks, Tie& tie, |
255 | | std::function<int(const RowInBlock*, const RowInBlock*)> cmp); |
256 | | template <bool is_final> |
257 | | void _finalize_one_row(RowInBlock* row, const vectorized::ColumnsWithTypeAndName& block_data, |
258 | | int row_pos); |
259 | | template <bool is_final, bool has_skip_bitmap_col = false> |
260 | | void _aggregate(); |
261 | | Status _put_into_output(vectorized::Block& in_block); |
262 | | bool _is_first_insertion; |
263 | | |
264 | | void _init_agg_functions(const vectorized::Block* block); |
265 | | std::vector<vectorized::AggregateFunctionPtr> _agg_functions; |
266 | | std::vector<size_t> _offsets_of_aggregate_states; |
267 | | size_t _total_size_of_aggregate_states; |
268 | | std::vector<RowInBlock*> _row_in_blocks; |
269 | | |
270 | | size_t _num_columns; |
271 | | int32_t _seq_col_idx_in_block = -1; |
272 | | int32_t _skip_bitmap_col_idx {-1}; |
273 | | int32_t _seq_col_unique_id {-1}; |
274 | | |
275 | | bool _is_partial_update_and_auto_inc = false; |
276 | | }; // class MemTable |
277 | | |
278 | | } // namespace doris |