/root/doris/be/src/olap/memtable.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "olap/memtable.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/olap_file.pb.h> |
22 | | #include <pdqsort.h> |
23 | | |
24 | | #include <algorithm> |
25 | | #include <limits> |
26 | | #include <string> |
27 | | #include <vector> |
28 | | |
29 | | #include "bvar/bvar.h" |
30 | | #include "common/config.h" |
31 | | #include "olap/memtable_memory_limiter.h" |
32 | | #include "olap/olap_define.h" |
33 | | #include "olap/tablet_schema.h" |
34 | | #include "runtime/descriptors.h" |
35 | | #include "runtime/exec_env.h" |
36 | | #include "runtime/thread_context.h" |
37 | | #include "tablet_meta.h" |
38 | | #include "util/debug_points.h" |
39 | | #include "util/runtime_profile.h" |
40 | | #include "util/stopwatch.hpp" |
41 | | #include "vec/aggregate_functions/aggregate_function_reader.h" |
42 | | #include "vec/aggregate_functions/aggregate_function_simple_factory.h" |
43 | | #include "vec/columns/column.h" |
44 | | |
45 | | namespace doris { |
46 | | |
47 | | bvar::Adder<int64_t> g_memtable_cnt("memtable_cnt"); |
48 | | bvar::Adder<int64_t> g_memtable_input_block_allocated_size("memtable_input_block_allocated_size"); |
49 | | |
50 | | using namespace ErrorCode; |
51 | | |
52 | | MemTable::MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schema, |
53 | | const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc, |
54 | | bool enable_unique_key_mow, PartialUpdateInfo* partial_update_info, |
55 | | const std::shared_ptr<MemTracker>& insert_mem_tracker, |
56 | | const std::shared_ptr<MemTracker>& flush_mem_tracker) |
57 | | : _tablet_id(tablet_id), |
58 | | _enable_unique_key_mow(enable_unique_key_mow), |
59 | | _keys_type(tablet_schema->keys_type()), |
60 | | _tablet_schema(tablet_schema), |
61 | | _insert_mem_tracker(insert_mem_tracker), |
62 | | _flush_mem_tracker(flush_mem_tracker), |
63 | | _is_first_insertion(true), |
64 | | _agg_functions(tablet_schema->num_columns()), |
65 | | _offsets_of_aggregate_states(tablet_schema->num_columns()), |
66 | | _total_size_of_aggregate_states(0), |
67 | 11 | _mem_usage(0) { |
68 | 11 | g_memtable_cnt << 1; |
69 | 11 | _query_thread_context.init(); |
70 | 11 | _arena = std::make_unique<vectorized::Arena>(); |
71 | 11 | _vec_row_comparator = std::make_shared<RowInBlockComparator>(_tablet_schema); |
72 | 11 | _num_columns = _tablet_schema->num_columns(); |
73 | 11 | if (partial_update_info != nullptr) { |
74 | 11 | _is_partial_update = partial_update_info->is_partial_update; |
75 | 11 | if (_is_partial_update) { |
76 | 0 | _num_columns = partial_update_info->partial_update_input_columns.size(); |
77 | 0 | if (partial_update_info->is_schema_contains_auto_inc_column && |
78 | 0 | !partial_update_info->is_input_columns_contains_auto_inc_column) { |
79 | 0 | _is_partial_update_and_auto_inc = true; |
80 | 0 | _num_columns += 1; |
81 | 0 | } |
82 | 0 | } |
83 | 11 | } |
84 | | // TODO: Support ZOrderComparator in the future |
85 | 11 | _init_columns_offset_by_slot_descs(slot_descs, tuple_desc); |
86 | 11 | } |
87 | | |
88 | | void MemTable::_init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs, |
89 | 11 | const TupleDescriptor* tuple_desc) { |
90 | 83 | for (auto slot_desc : *slot_descs) { |
91 | 83 | const auto& slots = tuple_desc->slots(); |
92 | 612 | for (int j = 0; j < slots.size(); ++j) { |
93 | 612 | if (slot_desc->id() == slots[j]->id()) { |
94 | 83 | _column_offset.emplace_back(j); |
95 | 83 | break; |
96 | 83 | } |
97 | 612 | } |
98 | 83 | } |
99 | 11 | if (_is_partial_update_and_auto_inc) { |
100 | 0 | _column_offset.emplace_back(_column_offset.size()); |
101 | 0 | } |
102 | 11 | } |
103 | | |
104 | 8 | void MemTable::_init_agg_functions(const vectorized::Block* block) { |
105 | 35 | for (uint32_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) { |
106 | 27 | vectorized::AggregateFunctionPtr function; |
107 | 27 | if (_keys_type == KeysType::UNIQUE_KEYS && _enable_unique_key_mow) { |
108 | | // In such table, non-key column's aggregation type is NONE, so we need to construct |
109 | | // the aggregate function manually. |
110 | 6 | function = vectorized::AggregateFunctionSimpleFactory::instance().get( |
111 | 6 | "replace_load", {block->get_data_type(cid)}, |
112 | 6 | block->get_data_type(cid)->is_nullable()); |
113 | 21 | } else { |
114 | 21 | function = |
115 | 21 | _tablet_schema->column(cid).get_aggregate_function(vectorized::AGG_LOAD_SUFFIX); |
116 | 21 | if (function == nullptr) { |
117 | 0 | LOG(WARNING) << "column get aggregate function failed, column=" |
118 | 0 | << _tablet_schema->column(cid).name(); |
119 | 0 | } |
120 | 21 | } |
121 | | |
122 | 27 | DCHECK(function != nullptr); |
123 | 27 | _agg_functions[cid] = function; |
124 | 27 | } |
125 | | |
126 | 35 | for (uint32_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) { |
127 | 27 | _offsets_of_aggregate_states[cid] = _total_size_of_aggregate_states; |
128 | 27 | _total_size_of_aggregate_states += _agg_functions[cid]->size_of_data(); |
129 | | |
130 | | // If not the last aggregate_state, we need pad it so that next aggregate_state will be aligned. |
131 | 27 | if (cid + 1 < _num_columns) { |
132 | 20 | size_t alignment_of_next_state = _agg_functions[cid + 1]->align_of_data(); |
133 | | |
134 | | /// Extend total_size to next alignment requirement |
135 | | /// Add padding by rounding up 'total_size_of_aggregate_states' to be a multiplier of alignment_of_next_state. |
136 | 20 | _total_size_of_aggregate_states = |
137 | 20 | (_total_size_of_aggregate_states + alignment_of_next_state - 1) / |
138 | 20 | alignment_of_next_state * alignment_of_next_state; |
139 | 20 | } |
140 | 27 | } |
141 | 8 | } |
142 | | |
143 | 11 | MemTable::~MemTable() { |
144 | 11 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker); |
145 | 11 | g_memtable_input_block_allocated_size << -_input_mutable_block.allocated_bytes(); |
146 | 11 | g_memtable_cnt << -1; |
147 | 11 | if (_keys_type != KeysType::DUP_KEYS) { |
148 | 23 | for (auto it = _row_in_blocks.begin(); it != _row_in_blocks.end(); it++) { |
149 | 12 | if (!(*it)->has_init_agg()) { |
150 | 12 | continue; |
151 | 12 | } |
152 | | // We should release agg_places here, because they are not released when a |
153 | | // load is canceled. |
154 | 0 | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { |
155 | 0 | auto function = _agg_functions[i]; |
156 | 0 | DCHECK(function != nullptr); |
157 | 0 | function->destroy((*it)->agg_places(i)); |
158 | 0 | } |
159 | 0 | } |
160 | 11 | } |
161 | 11 | std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(), std::default_delete<RowInBlock>()); |
162 | 11 | _insert_mem_tracker->release(_mem_usage); |
163 | 11 | _flush_mem_tracker->set_consumption(0); |
164 | 11 | DCHECK_EQ(_insert_mem_tracker->consumption(), 0) |
165 | 0 | << std::endl |
166 | 0 | << MemTracker::log_usage(_insert_mem_tracker->make_snapshot()); |
167 | 11 | DCHECK_EQ(_flush_mem_tracker->consumption(), 0); |
168 | 11 | _arena.reset(); |
169 | 11 | _agg_buffer_pool.clear(); |
170 | 11 | _vec_row_comparator.reset(); |
171 | 11 | _row_in_blocks.clear(); |
172 | 11 | _agg_functions.clear(); |
173 | 11 | _input_mutable_block.clear(); |
174 | 11 | _output_mutable_block.clear(); |
175 | 11 | } |
176 | | |
177 | 2 | int RowInBlockComparator::operator()(const RowInBlock* left, const RowInBlock* right) const { |
178 | 2 | return _pblock->compare_at(left->_row_pos, right->_row_pos, _tablet_schema->num_key_columns(), |
179 | 2 | *_pblock, -1); |
180 | 2 | } |
181 | | |
182 | | Status MemTable::insert(const vectorized::Block* input_block, |
183 | 12 | const std::vector<uint32_t>& row_idxs) { |
184 | 12 | if (_is_first_insertion) { |
185 | 8 | _is_first_insertion = false; |
186 | 8 | auto clone_block = input_block->clone_without_columns(&_column_offset); |
187 | 8 | _input_mutable_block = vectorized::MutableBlock::build_mutable_block(&clone_block); |
188 | 8 | _vec_row_comparator->set_block(&_input_mutable_block); |
189 | 8 | _output_mutable_block = vectorized::MutableBlock::build_mutable_block(&clone_block); |
190 | 8 | if (_keys_type != KeysType::DUP_KEYS) { |
191 | | // there may be additional intermediate columns in input_block |
192 | | // we only need columns indicated by column offset in the output |
193 | 8 | _init_agg_functions(&clone_block); |
194 | 8 | } |
195 | 8 | if (_tablet_schema->has_sequence_col()) { |
196 | 6 | if (_is_partial_update) { |
197 | | // for unique key partial update, sequence column index in block |
198 | | // may be different with the index in `_tablet_schema` |
199 | 0 | for (size_t i = 0; i < clone_block.columns(); i++) { |
200 | 0 | if (clone_block.get_by_position(i).name == SEQUENCE_COL) { |
201 | 0 | _seq_col_idx_in_block = i; |
202 | 0 | break; |
203 | 0 | } |
204 | 0 | } |
205 | 6 | } else { |
206 | 6 | _seq_col_idx_in_block = _tablet_schema->sequence_col_idx(); |
207 | 6 | } |
208 | 6 | } |
209 | 8 | } |
210 | | |
211 | 12 | auto num_rows = row_idxs.size(); |
212 | 12 | size_t cursor_in_mutableblock = _input_mutable_block.rows(); |
213 | 12 | auto block_size0 = _input_mutable_block.allocated_bytes(); |
214 | 12 | RETURN_IF_ERROR(_input_mutable_block.add_rows(input_block, row_idxs.data(), |
215 | 12 | row_idxs.data() + num_rows, &_column_offset)); |
216 | 12 | auto block_size1 = _input_mutable_block.allocated_bytes(); |
217 | 12 | g_memtable_input_block_allocated_size << block_size1 - block_size0; |
218 | 12 | auto input_size = size_t(input_block->bytes() * num_rows / input_block->rows() * |
219 | 12 | config::memtable_insert_memory_ratio); |
220 | 12 | _mem_usage += input_size; |
221 | 12 | _insert_mem_tracker->consume(input_size); |
222 | 24 | for (int i = 0; i < num_rows; i++) { |
223 | 12 | _row_in_blocks.emplace_back(new RowInBlock {cursor_in_mutableblock + i}); |
224 | 12 | } |
225 | | |
226 | 12 | _stat.raw_rows += num_rows; |
227 | 12 | return Status::OK(); |
228 | 12 | } |
229 | | |
230 | | void MemTable::_aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, |
231 | 2 | RowInBlock* src_row, RowInBlock* dst_row) { |
232 | 2 | if (_tablet_schema->has_sequence_col() && _seq_col_idx_in_block >= 0) { |
233 | 2 | DCHECK_LT(_seq_col_idx_in_block, mutable_block.columns()); |
234 | 2 | auto col_ptr = mutable_block.mutable_columns()[_seq_col_idx_in_block].get(); |
235 | 2 | auto res = col_ptr->compare_at(dst_row->_row_pos, src_row->_row_pos, *col_ptr, -1); |
236 | | // dst sequence column larger than src, don't need to update |
237 | 2 | if (res > 0) { |
238 | 2 | return; |
239 | 2 | } |
240 | | // need to update the row pos in dst row to the src row pos when has |
241 | | // sequence column |
242 | 0 | dst_row->_row_pos = src_row->_row_pos; |
243 | 0 | } |
244 | | // dst is non-sequence row, or dst sequence is smaller |
245 | 0 | for (uint32_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) { |
246 | 0 | auto col_ptr = mutable_block.mutable_columns()[cid].get(); |
247 | 0 | _agg_functions[cid]->add(dst_row->agg_places(cid), |
248 | 0 | const_cast<const doris::vectorized::IColumn**>(&col_ptr), |
249 | 0 | src_row->_row_pos, _arena.get()); |
250 | 0 | } |
251 | 0 | } |
252 | 6 | Status MemTable::_put_into_output(vectorized::Block& in_block) { |
253 | 6 | SCOPED_RAW_TIMER(&_stat.put_into_output_ns); |
254 | 6 | std::vector<uint32_t> row_pos_vec; |
255 | 6 | DCHECK(in_block.rows() <= std::numeric_limits<int>::max()); |
256 | 6 | row_pos_vec.reserve(in_block.rows()); |
257 | 14 | for (int i = 0; i < _row_in_blocks.size(); i++) { |
258 | 8 | row_pos_vec.emplace_back(_row_in_blocks[i]->_row_pos); |
259 | 8 | } |
260 | 6 | return _output_mutable_block.add_rows(&in_block, row_pos_vec.data(), |
261 | 6 | row_pos_vec.data() + in_block.rows()); |
262 | 6 | } |
263 | | |
264 | 8 | size_t MemTable::_sort() { |
265 | 8 | SCOPED_RAW_TIMER(&_stat.sort_ns); |
266 | 8 | _stat.sort_times++; |
267 | 8 | size_t same_keys_num = 0; |
268 | | // sort new rows |
269 | 8 | Tie tie = Tie(_last_sorted_pos, _row_in_blocks.size()); |
270 | 34 | for (size_t i = 0; i < _tablet_schema->num_key_columns(); i++) { |
271 | 26 | auto cmp = [&](const RowInBlock* lhs, const RowInBlock* rhs) -> int { |
272 | 12 | return _input_mutable_block.compare_one_column(lhs->_row_pos, rhs->_row_pos, i, -1); |
273 | 12 | }; |
274 | 26 | _sort_one_column(_row_in_blocks, tie, cmp); |
275 | 26 | } |
276 | 8 | bool is_dup = (_keys_type == KeysType::DUP_KEYS); |
277 | | // sort extra round by _row_pos to make the sort stable |
278 | 8 | auto iter = tie.iter(); |
279 | 10 | while (iter.next()) { |
280 | 2 | pdqsort(std::next(_row_in_blocks.begin(), iter.left()), |
281 | 2 | std::next(_row_in_blocks.begin(), iter.right()), |
282 | 2 | [&is_dup](const RowInBlock* lhs, const RowInBlock* rhs) -> bool { |
283 | 2 | return is_dup ? lhs->_row_pos > rhs->_row_pos : lhs->_row_pos < rhs->_row_pos; |
284 | 2 | }); |
285 | 2 | same_keys_num += iter.right() - iter.left(); |
286 | 2 | } |
287 | | // merge new rows and old rows |
288 | 8 | _vec_row_comparator->set_block(&_input_mutable_block); |
289 | 8 | auto cmp_func = [this, is_dup, &same_keys_num](const RowInBlock* l, |
290 | 8 | const RowInBlock* r) -> bool { |
291 | 0 | auto value = (*(this->_vec_row_comparator))(l, r); |
292 | 0 | if (value == 0) { |
293 | 0 | same_keys_num++; |
294 | 0 | return is_dup ? l->_row_pos > r->_row_pos : l->_row_pos < r->_row_pos; |
295 | 0 | } else { |
296 | 0 | return value < 0; |
297 | 0 | } |
298 | 0 | }; |
299 | 8 | auto new_row_it = std::next(_row_in_blocks.begin(), _last_sorted_pos); |
300 | 8 | std::inplace_merge(_row_in_blocks.begin(), new_row_it, _row_in_blocks.end(), cmp_func); |
301 | 8 | _last_sorted_pos = _row_in_blocks.size(); |
302 | 8 | return same_keys_num; |
303 | 8 | } |
304 | | |
305 | 0 | Status MemTable::_sort_by_cluster_keys() { |
306 | 0 | SCOPED_RAW_TIMER(&_stat.sort_ns); |
307 | 0 | _stat.sort_times++; |
308 | | // sort all rows |
309 | 0 | vectorized::Block in_block = _output_mutable_block.to_block(); |
310 | 0 | vectorized::MutableBlock mutable_block = |
311 | 0 | vectorized::MutableBlock::build_mutable_block(&in_block); |
312 | 0 | auto clone_block = in_block.clone_without_columns(); |
313 | 0 | _output_mutable_block = vectorized::MutableBlock::build_mutable_block(&clone_block); |
314 | |
|
315 | 0 | std::vector<RowInBlock*> row_in_blocks; |
316 | 0 | std::unique_ptr<int, std::function<void(int*)>> row_in_blocks_deleter((int*)0x01, [&](int*) { |
317 | 0 | std::for_each(row_in_blocks.begin(), row_in_blocks.end(), |
318 | 0 | std::default_delete<RowInBlock>()); |
319 | 0 | }); |
320 | 0 | row_in_blocks.reserve(mutable_block.rows()); |
321 | 0 | for (size_t i = 0; i < mutable_block.rows(); i++) { |
322 | 0 | row_in_blocks.emplace_back(new RowInBlock {i}); |
323 | 0 | } |
324 | 0 | Tie tie = Tie(0, mutable_block.rows()); |
325 | |
|
326 | 0 | for (auto i : _tablet_schema->cluster_key_idxes()) { |
327 | 0 | auto cmp = [&](const RowInBlock* lhs, const RowInBlock* rhs) -> int { |
328 | 0 | return mutable_block.compare_one_column(lhs->_row_pos, rhs->_row_pos, i, -1); |
329 | 0 | }; |
330 | 0 | _sort_one_column(row_in_blocks, tie, cmp); |
331 | 0 | } |
332 | | |
333 | | // sort extra round by _row_pos to make the sort stable |
334 | 0 | auto iter = tie.iter(); |
335 | 0 | while (iter.next()) { |
336 | 0 | pdqsort(std::next(row_in_blocks.begin(), iter.left()), |
337 | 0 | std::next(row_in_blocks.begin(), iter.right()), |
338 | 0 | [](const RowInBlock* lhs, const RowInBlock* rhs) -> bool { |
339 | 0 | return lhs->_row_pos < rhs->_row_pos; |
340 | 0 | }); |
341 | 0 | } |
342 | |
|
343 | 0 | in_block = mutable_block.to_block(); |
344 | 0 | SCOPED_RAW_TIMER(&_stat.put_into_output_ns); |
345 | 0 | std::vector<uint32_t> row_pos_vec; |
346 | 0 | DCHECK(in_block.rows() <= std::numeric_limits<int>::max()); |
347 | 0 | row_pos_vec.reserve(in_block.rows()); |
348 | 0 | for (int i = 0; i < row_in_blocks.size(); i++) { |
349 | 0 | row_pos_vec.emplace_back(row_in_blocks[i]->_row_pos); |
350 | 0 | } |
351 | 0 | return _output_mutable_block.add_rows(&in_block, row_pos_vec.data(), |
352 | 0 | row_pos_vec.data() + in_block.rows(), &_column_offset); |
353 | 0 | } |
354 | | |
355 | | void MemTable::_sort_one_column(std::vector<RowInBlock*>& row_in_blocks, Tie& tie, |
356 | 26 | std::function<int(const RowInBlock*, const RowInBlock*)> cmp) { |
357 | 26 | auto iter = tie.iter(); |
358 | 32 | while (iter.next()) { |
359 | 6 | pdqsort(std::next(row_in_blocks.begin(), iter.left()), |
360 | 6 | std::next(row_in_blocks.begin(), iter.right()), |
361 | 6 | [&cmp](auto lhs, auto rhs) -> bool { return cmp(lhs, rhs) < 0; }); |
362 | 6 | tie[iter.left()] = 0; |
363 | 12 | for (int i = iter.left() + 1; i < iter.right(); i++) { |
364 | 6 | tie[i] = (cmp(row_in_blocks[i - 1], row_in_blocks[i]) == 0); |
365 | 6 | } |
366 | 6 | } |
367 | 26 | } |
368 | | |
369 | | template <bool is_final> |
370 | | void MemTable::_finalize_one_row(RowInBlock* row, |
371 | | const vectorized::ColumnsWithTypeAndName& block_data, |
372 | 2 | int row_pos) { |
373 | | // move key columns |
374 | 6 | for (size_t i = 0; i < _tablet_schema->num_key_columns(); ++i) { |
375 | 4 | _output_mutable_block.get_column_by_position(i)->insert_from(*block_data[i].column.get(), |
376 | 4 | row->_row_pos); |
377 | 4 | } |
378 | 2 | if (row->has_init_agg()) { |
379 | | // get value columns from agg_places |
380 | 8 | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { |
381 | 6 | auto function = _agg_functions[i]; |
382 | 6 | auto* agg_place = row->agg_places(i); |
383 | 6 | auto* col_ptr = _output_mutable_block.get_column_by_position(i).get(); |
384 | 6 | function->insert_result_into(agg_place, *col_ptr); |
385 | | |
386 | 6 | if constexpr (is_final) { |
387 | 0 | function->destroy(agg_place); |
388 | 0 | } else { |
389 | 0 | function->reset(agg_place); |
390 | 0 | } |
391 | 6 | } |
392 | | |
393 | 2 | if constexpr (is_final) { |
394 | 0 | row->remove_init_agg(); |
395 | 0 | } else { |
396 | 0 | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { |
397 | 0 | auto function = _agg_functions[i]; |
398 | 0 | auto* agg_place = row->agg_places(i); |
399 | 0 | auto* col_ptr = _output_mutable_block.get_column_by_position(i).get(); |
400 | 0 | function->add(agg_place, const_cast<const doris::vectorized::IColumn**>(&col_ptr), |
401 | 0 | row_pos, _arena.get()); |
402 | 0 | } |
403 | 0 | } |
404 | 2 | } else { |
405 | | // move columns for rows do not need agg |
406 | 0 | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { |
407 | 0 | _output_mutable_block.get_column_by_position(i)->insert_from( |
408 | 0 | *block_data[i].column.get(), row->_row_pos); |
409 | 0 | } |
410 | 0 | } |
411 | 2 | if constexpr (!is_final) { |
412 | 0 | row->_row_pos = row_pos; |
413 | 0 | } |
414 | 2 | } Unexecuted instantiation: _ZN5doris8MemTable17_finalize_one_rowILb0EEEvPNS_10RowInBlockERKSt6vectorINS_10vectorized21ColumnWithTypeAndNameESaIS6_EEi _ZN5doris8MemTable17_finalize_one_rowILb1EEEvPNS_10RowInBlockERKSt6vectorINS_10vectorized21ColumnWithTypeAndNameESaIS6_EEi Line | Count | Source | 372 | 2 | int row_pos) { | 373 | | // move key columns | 374 | 6 | for (size_t i = 0; i < _tablet_schema->num_key_columns(); ++i) { | 375 | 4 | _output_mutable_block.get_column_by_position(i)->insert_from(*block_data[i].column.get(), | 376 | 4 | row->_row_pos); | 377 | 4 | } | 378 | 2 | if (row->has_init_agg()) { | 379 | | // get value columns from agg_places | 380 | 8 | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { | 381 | 6 | auto function = _agg_functions[i]; | 382 | 6 | auto* agg_place = row->agg_places(i); | 383 | 6 | auto* col_ptr = _output_mutable_block.get_column_by_position(i).get(); | 384 | 6 | function->insert_result_into(agg_place, *col_ptr); | 385 | | | 386 | 6 | if constexpr (is_final) { | 387 | 6 | function->destroy(agg_place); | 388 | 6 | } else { | 389 | 6 | function->reset(agg_place); | 390 | 6 | } | 391 | 6 | } | 392 | | | 393 | 2 | if constexpr (is_final) { | 394 | 2 | row->remove_init_agg(); | 395 | 2 | } else { | 396 | 2 | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { | 397 | 2 | auto function = _agg_functions[i]; | 398 | 2 | auto* agg_place = row->agg_places(i); | 399 | 2 | auto* col_ptr = _output_mutable_block.get_column_by_position(i).get(); | 400 | 2 | function->add(agg_place, const_cast<const doris::vectorized::IColumn**>(&col_ptr), | 401 | 2 | row_pos, _arena.get()); | 402 | 2 | } | 403 | 2 | } | 404 | 2 | } else { | 405 | | // move columns for rows do not need agg | 406 | 0 | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { | 407 | 0 | _output_mutable_block.get_column_by_position(i)->insert_from( | 408 | 0 | *block_data[i].column.get(), row->_row_pos); | 409 | 0 | } | 410 | 0 | } | 411 | 2 | if constexpr (!is_final) { | 412 | 2 | row->_row_pos = row_pos; | 413 | 2 | } | 414 | 2 | } |
|
415 | | |
416 | | template <bool is_final> |
417 | 2 | void MemTable::_aggregate() { |
418 | 2 | SCOPED_RAW_TIMER(&_stat.agg_ns); |
419 | 2 | _stat.agg_times++; |
420 | 2 | vectorized::Block in_block = _input_mutable_block.to_block(); |
421 | 2 | vectorized::MutableBlock mutable_block = |
422 | 2 | vectorized::MutableBlock::build_mutable_block(&in_block); |
423 | 2 | _vec_row_comparator->set_block(&mutable_block); |
424 | 2 | auto& block_data = in_block.get_columns_with_type_and_name(); |
425 | 2 | std::vector<RowInBlock*> temp_row_in_blocks; |
426 | 2 | temp_row_in_blocks.reserve(_last_sorted_pos); |
427 | 2 | RowInBlock* prev_row = nullptr; |
428 | 2 | int row_pos = -1; |
429 | | //only init agg if needed |
430 | 6 | for (int i = 0; i < _row_in_blocks.size(); i++) { |
431 | 4 | if (!temp_row_in_blocks.empty() && |
432 | 4 | (*_vec_row_comparator)(prev_row, _row_in_blocks[i]) == 0) { |
433 | 2 | if (!prev_row->has_init_agg()) { |
434 | 2 | prev_row->init_agg_places( |
435 | 2 | _arena->aligned_alloc(_total_size_of_aggregate_states, 16), |
436 | 2 | _offsets_of_aggregate_states.data()); |
437 | 8 | for (auto cid = _tablet_schema->num_key_columns(); cid < _num_columns; cid++) { |
438 | 6 | auto col_ptr = mutable_block.mutable_columns()[cid].get(); |
439 | 6 | auto data = prev_row->agg_places(cid); |
440 | 6 | _agg_functions[cid]->create(data); |
441 | 6 | _agg_functions[cid]->add( |
442 | 6 | data, const_cast<const doris::vectorized::IColumn**>(&col_ptr), |
443 | 6 | prev_row->_row_pos, _arena.get()); |
444 | 6 | } |
445 | 2 | } |
446 | 2 | _stat.merged_rows++; |
447 | 2 | _aggregate_two_row_in_block(mutable_block, _row_in_blocks[i], prev_row); |
448 | 2 | } else { |
449 | 2 | prev_row = _row_in_blocks[i]; |
450 | 2 | if (!temp_row_in_blocks.empty()) { |
451 | | // no more rows to merge for prev row, finalize it |
452 | 0 | _finalize_one_row<is_final>(temp_row_in_blocks.back(), block_data, row_pos); |
453 | 0 | } |
454 | 2 | temp_row_in_blocks.push_back(prev_row); |
455 | 2 | row_pos++; |
456 | 2 | } |
457 | 4 | } |
458 | 2 | if (!temp_row_in_blocks.empty()) { |
459 | | // finalize the last low |
460 | 2 | _finalize_one_row<is_final>(temp_row_in_blocks.back(), block_data, row_pos); |
461 | 2 | } |
462 | 2 | if constexpr (!is_final) { |
463 | | // if is not final, we collect the agg results to input_block and then continue to insert |
464 | 0 | size_t shrunked_after_agg = _output_mutable_block.allocated_bytes(); |
465 | | // flush will not run here, so will not duplicate `_flush_mem_tracker` |
466 | 0 | _insert_mem_tracker->consume(shrunked_after_agg - _mem_usage); |
467 | 0 | _mem_usage = shrunked_after_agg; |
468 | 0 | _input_mutable_block.swap(_output_mutable_block); |
469 | | //TODO(weixang):opt here. |
470 | 0 | std::unique_ptr<vectorized::Block> empty_input_block = in_block.create_same_struct_block(0); |
471 | 0 | _output_mutable_block = |
472 | 0 | vectorized::MutableBlock::build_mutable_block(empty_input_block.get()); |
473 | 0 | _output_mutable_block.clear_column_data(); |
474 | 0 | _row_in_blocks = temp_row_in_blocks; |
475 | 0 | _last_sorted_pos = _row_in_blocks.size(); |
476 | 0 | } |
477 | 2 | } Unexecuted instantiation: _ZN5doris8MemTable10_aggregateILb0EEEvv _ZN5doris8MemTable10_aggregateILb1EEEvv Line | Count | Source | 417 | 2 | void MemTable::_aggregate() { | 418 | 2 | SCOPED_RAW_TIMER(&_stat.agg_ns); | 419 | 2 | _stat.agg_times++; | 420 | 2 | vectorized::Block in_block = _input_mutable_block.to_block(); | 421 | 2 | vectorized::MutableBlock mutable_block = | 422 | 2 | vectorized::MutableBlock::build_mutable_block(&in_block); | 423 | 2 | _vec_row_comparator->set_block(&mutable_block); | 424 | 2 | auto& block_data = in_block.get_columns_with_type_and_name(); | 425 | 2 | std::vector<RowInBlock*> temp_row_in_blocks; | 426 | 2 | temp_row_in_blocks.reserve(_last_sorted_pos); | 427 | 2 | RowInBlock* prev_row = nullptr; | 428 | 2 | int row_pos = -1; | 429 | | //only init agg if needed | 430 | 6 | for (int i = 0; i < _row_in_blocks.size(); i++) { | 431 | 4 | if (!temp_row_in_blocks.empty() && | 432 | 4 | (*_vec_row_comparator)(prev_row, _row_in_blocks[i]) == 0) { | 433 | 2 | if (!prev_row->has_init_agg()) { | 434 | 2 | prev_row->init_agg_places( | 435 | 2 | _arena->aligned_alloc(_total_size_of_aggregate_states, 16), | 436 | 2 | _offsets_of_aggregate_states.data()); | 437 | 8 | for (auto cid = _tablet_schema->num_key_columns(); cid < _num_columns; cid++) { | 438 | 6 | auto col_ptr = mutable_block.mutable_columns()[cid].get(); | 439 | 6 | auto data = prev_row->agg_places(cid); | 440 | 6 | _agg_functions[cid]->create(data); | 441 | 6 | _agg_functions[cid]->add( | 442 | 6 | data, const_cast<const doris::vectorized::IColumn**>(&col_ptr), | 443 | 6 | prev_row->_row_pos, _arena.get()); | 444 | 6 | } | 445 | 2 | } | 446 | 2 | _stat.merged_rows++; | 447 | 2 | _aggregate_two_row_in_block(mutable_block, _row_in_blocks[i], prev_row); | 448 | 2 | } else { | 449 | 2 | prev_row = _row_in_blocks[i]; | 450 | 2 | if (!temp_row_in_blocks.empty()) { | 451 | | // no more rows to merge for prev row, finalize it | 452 | 0 | _finalize_one_row<is_final>(temp_row_in_blocks.back(), block_data, row_pos); | 453 | 0 | } | 454 | 2 | temp_row_in_blocks.push_back(prev_row); | 455 | 2 | row_pos++; | 456 | 2 | } | 457 | 4 | } | 458 | 2 | if (!temp_row_in_blocks.empty()) { | 459 | | // finalize the last low | 460 | 2 | _finalize_one_row<is_final>(temp_row_in_blocks.back(), block_data, row_pos); | 461 | 2 | } | 462 | 2 | if constexpr (!is_final) { | 463 | | // if is not final, we collect the agg results to input_block and then continue to insert | 464 | 2 | size_t shrunked_after_agg = _output_mutable_block.allocated_bytes(); | 465 | | // flush will not run here, so will not duplicate `_flush_mem_tracker` | 466 | 2 | _insert_mem_tracker->consume(shrunked_after_agg - _mem_usage); | 467 | 2 | _mem_usage = shrunked_after_agg; | 468 | 2 | _input_mutable_block.swap(_output_mutable_block); | 469 | | //TODO(weixang):opt here. | 470 | 2 | std::unique_ptr<vectorized::Block> empty_input_block = in_block.create_same_struct_block(0); | 471 | 2 | _output_mutable_block = | 472 | 2 | vectorized::MutableBlock::build_mutable_block(empty_input_block.get()); | 473 | 2 | _output_mutable_block.clear_column_data(); | 474 | 2 | _row_in_blocks = temp_row_in_blocks; | 475 | 2 | _last_sorted_pos = _row_in_blocks.size(); | 476 | 2 | } | 477 | 2 | } |
|
478 | | |
479 | 0 | void MemTable::shrink_memtable_by_agg() { |
480 | 0 | if (_keys_type == KeysType::DUP_KEYS) { |
481 | 0 | return; |
482 | 0 | } |
483 | 0 | size_t same_keys_num = _sort(); |
484 | 0 | if (same_keys_num != 0) { |
485 | 0 | _aggregate<false>(); |
486 | 0 | } |
487 | 0 | } |
488 | | |
489 | 12 | bool MemTable::need_flush() const { |
490 | 12 | DBUG_EXECUTE_IF("MemTable.need_flush", { return true; }); |
491 | 12 | auto max_size = config::write_buffer_size; |
492 | 12 | if (_is_partial_update) { |
493 | 0 | auto update_columns_size = _num_columns; |
494 | 0 | max_size = max_size * update_columns_size / _tablet_schema->num_columns(); |
495 | 0 | max_size = max_size > 1048576 ? max_size : 1048576; |
496 | 0 | } |
497 | 12 | return memory_usage() >= max_size; |
498 | 12 | } |
499 | | |
500 | 12 | bool MemTable::need_agg() const { |
501 | 12 | if (_keys_type == KeysType::AGG_KEYS) { |
502 | 2 | auto max_size = config::write_buffer_size_for_agg; |
503 | 2 | return memory_usage() >= max_size; |
504 | 2 | } |
505 | 10 | return false; |
506 | 12 | } |
507 | | |
508 | 8 | Status MemTable::_to_block(std::unique_ptr<vectorized::Block>* res) { |
509 | 8 | size_t same_keys_num = _sort(); |
510 | 8 | if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0) { |
511 | 6 | if (_keys_type == KeysType::DUP_KEYS && _tablet_schema->num_key_columns() == 0) { |
512 | 0 | _output_mutable_block.swap(_input_mutable_block); |
513 | 6 | } else { |
514 | 6 | vectorized::Block in_block = _input_mutable_block.to_block(); |
515 | 6 | RETURN_IF_ERROR(_put_into_output(in_block)); |
516 | 6 | } |
517 | 6 | } else { |
518 | 2 | _aggregate<true>(); |
519 | 2 | } |
520 | 8 | if (_keys_type == KeysType::UNIQUE_KEYS && _enable_unique_key_mow && |
521 | 8 | !_tablet_schema->cluster_key_idxes().empty()) { |
522 | 0 | RETURN_IF_ERROR(_sort_by_cluster_keys()); |
523 | 0 | } |
524 | 8 | g_memtable_input_block_allocated_size << -_input_mutable_block.allocated_bytes(); |
525 | 8 | _input_mutable_block.clear(); |
526 | 8 | _insert_mem_tracker->release(_mem_usage); |
527 | 8 | _mem_usage = 0; |
528 | 8 | *res = vectorized::Block::create_unique(_output_mutable_block.to_block()); |
529 | 8 | return Status::OK(); |
530 | 8 | } |
531 | | |
532 | 8 | Status MemTable::to_block(std::unique_ptr<vectorized::Block>* res) { |
533 | 8 | RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_to_block(res)); |
534 | 8 | return Status::OK(); |
535 | 8 | } |
536 | | |
537 | | } // namespace doris |