/root/doris/be/src/olap/memtable.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "olap/memtable.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/olap_file.pb.h> |
22 | | #include <pdqsort.h> |
23 | | |
24 | | #include <algorithm> |
25 | | #include <limits> |
26 | | #include <string> |
27 | | #include <vector> |
28 | | |
29 | | #include "bvar/bvar.h" |
30 | | #include "common/config.h" |
31 | | #include "olap/memtable_memory_limiter.h" |
32 | | #include "olap/olap_define.h" |
33 | | #include "olap/tablet_schema.h" |
34 | | #include "runtime/descriptors.h" |
35 | | #include "runtime/exec_env.h" |
36 | | #include "runtime/thread_context.h" |
37 | | #include "util/debug_points.h" |
38 | | #include "util/runtime_profile.h" |
39 | | #include "util/stopwatch.hpp" |
40 | | #include "vec/aggregate_functions/aggregate_function_reader.h" |
41 | | #include "vec/aggregate_functions/aggregate_function_simple_factory.h" |
42 | | #include "vec/columns/column.h" |
43 | | |
44 | | namespace doris { |
45 | | #include "common/compile_check_begin.h" |
46 | | |
47 | | bvar::Adder<int64_t> g_memtable_cnt("memtable_cnt"); |
48 | | bvar::Adder<uint64_t> g_flush_cuz_memtable_full("flush_cuz_memtable_full"); |
49 | | |
50 | | using namespace ErrorCode; |
51 | | |
52 | | MemTable::MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schema, |
53 | | const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc, |
54 | | bool enable_unique_key_mow, PartialUpdateInfo* partial_update_info, |
55 | | const std::shared_ptr<ResourceContext>& resource_ctx) |
56 | 54.7k | : _mem_type(MemType::ACTIVE), |
57 | 54.7k | _tablet_id(tablet_id), |
58 | 54.7k | _enable_unique_key_mow(enable_unique_key_mow), |
59 | 54.7k | _keys_type(tablet_schema->keys_type()), |
60 | 54.7k | _tablet_schema(tablet_schema), |
61 | 54.7k | _resource_ctx(resource_ctx), |
62 | 54.7k | _is_first_insertion(true), |
63 | 54.7k | _total_size_of_aggregate_states(0) { |
64 | 54.7k | g_memtable_cnt << 1; |
65 | 54.7k | _mem_tracker = std::make_shared<MemTracker>(); |
66 | 54.7k | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
67 | 54.7k | _resource_ctx->memory_context()->mem_tracker()->write_tracker()); |
68 | 54.7k | SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); |
69 | | // Initialize vectors here so their memory allocation is tracked by _mem_tracker |
70 | 54.7k | _agg_functions.resize(tablet_schema->num_columns()); |
71 | 54.7k | _offsets_of_aggregate_states.resize(tablet_schema->num_columns()); |
72 | 54.7k | _vec_row_comparator = std::make_shared<RowInBlockComparator>(_tablet_schema); |
73 | 54.7k | if (partial_update_info != nullptr) { |
74 | 54.6k | _partial_update_mode = partial_update_info->update_mode(); |
75 | 54.6k | if (_partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) { |
76 | 1.24k | if (partial_update_info->is_schema_contains_auto_inc_column && |
77 | 1.24k | !partial_update_info->is_input_columns_contains_auto_inc_column) { |
78 | 49 | _is_partial_update_and_auto_inc = true; |
79 | 49 | } |
80 | 1.24k | } |
81 | 54.6k | } |
82 | 54.7k | _init_columns_offset_by_slot_descs(slot_descs, tuple_desc); |
83 | | // TODO: Support ZOrderComparator in the future |
84 | 54.7k | _row_in_blocks = std::make_unique<DorisVector<std::shared_ptr<RowInBlock>>>(); |
85 | 54.7k | _load_mem_limit = MemInfo::mem_limit() * config::load_process_max_memory_limit_percent / 100; |
86 | 54.7k | } |
87 | | |
88 | | void MemTable::_init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs, |
89 | 54.5k | const TupleDescriptor* tuple_desc) { |
90 | 590k | for (auto slot_desc : *slot_descs) { |
91 | 590k | const auto& slots = tuple_desc->slots(); |
92 | 7.58M | for (int j = 0; j < slots.size(); ++j) { |
93 | 7.58M | if (slot_desc->id() == slots[j]->id()) { |
94 | 585k | _column_offset.emplace_back(j); |
95 | 585k | break; |
96 | 585k | } |
97 | 7.58M | } |
98 | 590k | } |
99 | 54.5k | if (_is_partial_update_and_auto_inc) { |
100 | 48 | _column_offset.emplace_back(_column_offset.size()); |
101 | 48 | } |
102 | 54.5k | _num_columns = _column_offset.size(); |
103 | 54.5k | } |
104 | | |
105 | 28.8k | void MemTable::_init_agg_functions(const vectorized::Block* block) { |
106 | 28.8k | if (_num_columns > _column_offset.size()) [[unlikely]] { |
107 | 0 | throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, |
108 | 0 | "num_columns {} is greater than block columns {}", _num_columns, |
109 | 0 | _column_offset.size()); |
110 | 0 | } |
111 | 330k | for (auto cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) { |
112 | 301k | vectorized::AggregateFunctionPtr function; |
113 | 301k | if (_keys_type == KeysType::UNIQUE_KEYS && _enable_unique_key_mow) { |
114 | | // In such table, non-key column's aggregation type is NONE, so we need to construct |
115 | | // the aggregate function manually. |
116 | 248k | if (_skip_bitmap_col_idx != cid) { |
117 | 248k | function = vectorized::AggregateFunctionSimpleFactory::instance().get( |
118 | 248k | "replace_load", {block->get_data_type(cid)}, block->get_data_type(cid), |
119 | 248k | block->get_data_type(cid)->is_nullable(), |
120 | 248k | BeExecVersionManager::get_newest_version()); |
121 | 248k | } else { |
122 | 279 | function = vectorized::AggregateFunctionSimpleFactory::instance().get( |
123 | 279 | "bitmap_intersect", {block->get_data_type(cid)}, block->get_data_type(cid), |
124 | 279 | false, BeExecVersionManager::get_newest_version()); |
125 | 279 | } |
126 | 248k | } else { |
127 | 52.8k | function = _tablet_schema->column(cid).get_aggregate_function( |
128 | 52.8k | vectorized::AGG_LOAD_SUFFIX, _tablet_schema->column(cid).get_be_exec_version()); |
129 | 52.8k | if (function == nullptr) { |
130 | 0 | LOG(WARNING) << "column get aggregate function failed, column=" |
131 | 0 | << _tablet_schema->column(cid).name(); |
132 | 0 | } |
133 | 52.8k | } |
134 | | |
135 | 301k | DCHECK(function != nullptr); |
136 | 301k | _agg_functions[cid] = function; |
137 | 301k | } |
138 | | |
139 | 329k | for (auto cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) { |
140 | 301k | _offsets_of_aggregate_states[cid] = _total_size_of_aggregate_states; |
141 | 301k | _total_size_of_aggregate_states += _agg_functions[cid]->size_of_data(); |
142 | | |
143 | | // If not the last aggregate_state, we need pad it so that next aggregate_state will be aligned. |
144 | 301k | if (cid + 1 < _num_columns) { |
145 | 272k | size_t alignment_of_next_state = _agg_functions[cid + 1]->align_of_data(); |
146 | | |
147 | | /// Extend total_size to next alignment requirement |
148 | | /// Add padding by rounding up 'total_size_of_aggregate_states' to be a multiplier of alignment_of_next_state. |
149 | 272k | _total_size_of_aggregate_states = |
150 | 272k | (_total_size_of_aggregate_states + alignment_of_next_state - 1) / |
151 | 272k | alignment_of_next_state * alignment_of_next_state; |
152 | 272k | } |
153 | 301k | } |
154 | 28.8k | } |
155 | | |
156 | 54.7k | MemTable::~MemTable() { |
157 | 54.7k | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
158 | 54.7k | _resource_ctx->memory_context()->mem_tracker()->write_tracker()); |
159 | 54.7k | { |
160 | 54.7k | SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); |
161 | 54.7k | g_memtable_cnt << -1; |
162 | 54.7k | if (_keys_type != KeysType::DUP_KEYS) { |
163 | 7.70M | for (auto it = _row_in_blocks->begin(); it != _row_in_blocks->end(); it++) { |
164 | 7.67M | if (!(*it)->has_init_agg()) { |
165 | 7.67M | continue; |
166 | 7.67M | } |
167 | | // We should release agg_places here, because they are not released when a |
168 | | // load is canceled. |
169 | 52 | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { |
170 | 0 | auto function = _agg_functions[i]; |
171 | 0 | DCHECK(function != nullptr); |
172 | 0 | function->destroy((*it)->agg_places(i)); |
173 | 0 | } |
174 | 52 | } |
175 | 28.8k | } |
176 | | |
177 | 54.7k | _arena.clear(true); |
178 | 54.7k | _vec_row_comparator.reset(); |
179 | 54.7k | _row_in_blocks.reset(); |
180 | 54.7k | _agg_functions.clear(); |
181 | 54.7k | _input_mutable_block.clear(); |
182 | 54.7k | _output_mutable_block.clear(); |
183 | 54.7k | } |
184 | 54.7k | if (_is_flush_success) { |
185 | | // If the memtable is flush success, then its memtracker's consumption should be 0 |
186 | 54.7k | if (_mem_tracker->consumption() != 0 && config::crash_in_memory_tracker_inaccurate) { |
187 | 0 | LOG(FATAL) << "memtable flush success but cosumption is not 0, it is " |
188 | 0 | << _mem_tracker->consumption(); |
189 | 0 | } |
190 | 54.7k | } |
191 | 54.7k | } |
192 | | |
193 | 401k | int RowInBlockComparator::operator()(const RowInBlock* left, const RowInBlock* right) const { |
194 | 401k | return _pblock->compare_at(left->_row_pos, right->_row_pos, _tablet_schema->num_key_columns(), |
195 | 401k | *_pblock, -1); |
196 | 401k | } |
197 | | |
198 | | Status MemTable::insert(const vectorized::Block* input_block, |
199 | 114k | const DorisVector<uint32_t>& row_idxs) { |
200 | 114k | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
201 | 114k | _resource_ctx->memory_context()->mem_tracker()->write_tracker()); |
202 | 114k | SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); |
203 | | |
204 | 114k | if (_is_first_insertion) { |
205 | 54.8k | _is_first_insertion = false; |
206 | 54.8k | auto clone_block = input_block->clone_without_columns(&_column_offset); |
207 | 54.8k | _input_mutable_block = vectorized::MutableBlock::build_mutable_block(&clone_block); |
208 | 54.8k | _vec_row_comparator->set_block(&_input_mutable_block); |
209 | 54.8k | _output_mutable_block = vectorized::MutableBlock::build_mutable_block(&clone_block); |
210 | 54.8k | if (_tablet_schema->has_sequence_col()) { |
211 | 1.63k | if (_partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) { |
212 | | // for unique key fixed partial update, sequence column index in block |
213 | | // may be different with the index in `_tablet_schema` |
214 | 852 | for (int32_t i = 0; i < clone_block.columns(); i++) { |
215 | 665 | if (clone_block.get_by_position(i).name == SEQUENCE_COL) { |
216 | 38 | _seq_col_idx_in_block = i; |
217 | 38 | break; |
218 | 38 | } |
219 | 665 | } |
220 | 1.41k | } else { |
221 | 1.41k | _seq_col_idx_in_block = _tablet_schema->sequence_col_idx(); |
222 | 1.41k | } |
223 | 1.63k | } |
224 | 54.8k | if (_partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS && |
225 | 54.8k | _tablet_schema->has_skip_bitmap_col()) { |
226 | | // init of _skip_bitmap_col_idx and _delete_sign_col_idx must be before _init_agg_functions() |
227 | 265 | _skip_bitmap_col_idx = _tablet_schema->skip_bitmap_col_idx(); |
228 | 265 | _delete_sign_col_idx = _tablet_schema->delete_sign_idx(); |
229 | 265 | _delete_sign_col_unique_id = _tablet_schema->column(_delete_sign_col_idx).unique_id(); |
230 | 265 | if (_seq_col_idx_in_block != -1) { |
231 | 25 | _seq_col_unique_id = _tablet_schema->column(_seq_col_idx_in_block).unique_id(); |
232 | 25 | } |
233 | 265 | } |
234 | 54.8k | if (_keys_type != KeysType::DUP_KEYS) { |
235 | | // there may be additional intermediate columns in input_block |
236 | | // we only need columns indicated by column offset in the output |
237 | 28.8k | RETURN_IF_CATCH_EXCEPTION(_init_agg_functions(&clone_block)); |
238 | 28.8k | } |
239 | 54.8k | } |
240 | | |
241 | 114k | auto num_rows = row_idxs.size(); |
242 | 114k | size_t cursor_in_mutableblock = _input_mutable_block.rows(); |
243 | 114k | RETURN_IF_ERROR(_input_mutable_block.add_rows(input_block, row_idxs.data(), |
244 | 114k | row_idxs.data() + num_rows, &_column_offset)); |
245 | 35.7M | for (int i = 0; i < num_rows; i++) { |
246 | 35.5M | _row_in_blocks->emplace_back(std::make_shared<RowInBlock>(cursor_in_mutableblock + i)); |
247 | 35.5M | } |
248 | | |
249 | 114k | _stat.raw_rows += num_rows; |
250 | 114k | return Status::OK(); |
251 | 114k | } |
252 | | |
253 | | void MemTable::_aggregate_two_row_with_sequence_map(vectorized::MutableBlock& mutable_block, |
254 | 3 | RowInBlock* src_row, RowInBlock* dst_row) { |
255 | | // for each mapping replace value columns according to the sequence column compare result |
256 | | // for example: a b c d s1 s2 (key:a , s1=>[b,c], s2=>[d]) |
257 | | // src row: 1 4 5 6 8 9 |
258 | | // dst row: 1 2 3 4 7 10 |
259 | | // after aggregate |
260 | | // dst row: 1 4 5 4 8 10 (b,c,s1 will be replaced, d,s2) |
261 | 3 | const auto& seq_map = _tablet_schema->seq_col_idx_to_value_cols_idx(); |
262 | 6 | for (const auto& it : seq_map) { |
263 | 6 | auto sequence = it.first; |
264 | 6 | auto* sequence_col_ptr = mutable_block.mutable_columns()[sequence].get(); |
265 | 6 | auto res = sequence_col_ptr->compare_at(dst_row->_row_pos, src_row->_row_pos, |
266 | 6 | *sequence_col_ptr, -1); |
267 | 6 | if (res > 0) { |
268 | 2 | continue; |
269 | 2 | } |
270 | 5 | for (auto cid : it.second) { |
271 | 5 | if (cid < _num_columns) { |
272 | 5 | auto* col_ptr = mutable_block.mutable_columns()[cid].get(); |
273 | 5 | _agg_functions[cid]->add(dst_row->agg_places(cid), |
274 | 5 | const_cast<const doris::vectorized::IColumn**>(&col_ptr), |
275 | 5 | src_row->_row_pos, _arena); |
276 | 5 | } |
277 | 5 | } |
278 | 4 | if (sequence < _num_columns) { |
279 | 4 | _agg_functions[sequence]->add( |
280 | 4 | dst_row->agg_places(sequence), |
281 | 4 | const_cast<const doris::vectorized::IColumn**>(&sequence_col_ptr), |
282 | 4 | src_row->_row_pos, _arena); |
283 | | // must use replace column instead of update row_pos |
284 | | // because one row may have multi sequence column |
285 | | // and agg function add method won't change the real column value |
286 | 4 | sequence_col_ptr->replace_column_data(*sequence_col_ptr, src_row->_row_pos, |
287 | 4 | dst_row->_row_pos); |
288 | 4 | } |
289 | 4 | } |
290 | 3 | } |
291 | | |
292 | | template <bool has_skip_bitmap_col> |
293 | | void MemTable::_aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, |
294 | 81.0k | RowInBlock* src_row, RowInBlock* dst_row) { |
295 | | // for flexible partial update, the caller must guarantees that either src_row and dst_row |
296 | | // both specify the sequence column, or src_row and dst_row both don't specify the |
297 | | // sequence column |
298 | 81.0k | if (_tablet_schema->has_sequence_col() && _seq_col_idx_in_block >= 0) { |
299 | 192 | DCHECK_LT(_seq_col_idx_in_block, mutable_block.columns()); |
300 | 192 | auto col_ptr = mutable_block.mutable_columns()[_seq_col_idx_in_block].get(); |
301 | 192 | auto res = col_ptr->compare_at(dst_row->_row_pos, src_row->_row_pos, *col_ptr, -1); |
302 | | // dst sequence column larger than src, don't need to update |
303 | 192 | if (res > 0) { |
304 | 91 | return; |
305 | 91 | } |
306 | | // need to update the row pos in dst row to the src row pos when has |
307 | | // sequence column |
308 | 101 | dst_row->_row_pos = src_row->_row_pos; |
309 | 101 | } |
310 | | // dst is non-sequence row, or dst sequence is smaller |
311 | 80.9k | if constexpr (!has_skip_bitmap_col) { |
312 | 76.9k | DCHECK(_skip_bitmap_col_idx == -1); |
313 | 354k | for (size_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) { |
314 | 277k | auto* col_ptr = mutable_block.mutable_columns()[cid].get(); |
315 | 277k | _agg_functions[cid]->add(dst_row->agg_places(cid), |
316 | 277k | const_cast<const doris::vectorized::IColumn**>(&col_ptr), |
317 | 277k | src_row->_row_pos, _arena); |
318 | 277k | } |
319 | 76.9k | } else { |
320 | 3.96k | DCHECK(_skip_bitmap_col_idx != -1); |
321 | 3.96k | DCHECK_LT(_skip_bitmap_col_idx, mutable_block.columns()); |
322 | 3.96k | const BitmapValue& skip_bitmap = |
323 | 3.96k | assert_cast<vectorized::ColumnBitmap*, TypeCheckOnRelease::DISABLE>( |
324 | 3.96k | mutable_block.mutable_columns()[_skip_bitmap_col_idx].get()) |
325 | 3.96k | ->get_data()[src_row->_row_pos]; |
326 | 72.3k | for (size_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) { |
327 | 68.4k | const auto& col = _tablet_schema->column(cid); |
328 | 68.4k | if (cid != _skip_bitmap_col_idx && skip_bitmap.contains(col.unique_id())) { |
329 | 34.2k | continue; |
330 | 34.2k | } |
331 | 34.1k | auto* col_ptr = mutable_block.mutable_columns()[cid].get(); |
332 | 34.1k | _agg_functions[cid]->add(dst_row->agg_places(cid), |
333 | 34.1k | const_cast<const doris::vectorized::IColumn**>(&col_ptr), |
334 | 34.1k | src_row->_row_pos, _arena); |
335 | 34.1k | } |
336 | 3.96k | } |
337 | 80.9k | } _ZN5doris8MemTable27_aggregate_two_row_in_blockILb0EEEvRNS_10vectorized12MutableBlockEPNS_10RowInBlockES6_ Line | Count | Source | 294 | 77.0k | RowInBlock* src_row, RowInBlock* dst_row) { | 295 | | // for flexible partial update, the caller must guarantees that either src_row and dst_row | 296 | | // both specify the sequence column, or src_row and dst_row both don't specify the | 297 | | // sequence column | 298 | 77.0k | if (_tablet_schema->has_sequence_col() && _seq_col_idx_in_block >= 0) { | 299 | 192 | DCHECK_LT(_seq_col_idx_in_block, mutable_block.columns()); | 300 | 192 | auto col_ptr = mutable_block.mutable_columns()[_seq_col_idx_in_block].get(); | 301 | 192 | auto res = col_ptr->compare_at(dst_row->_row_pos, src_row->_row_pos, *col_ptr, -1); | 302 | | // dst sequence column larger than src, don't need to update | 303 | 192 | if (res > 0) { | 304 | 91 | return; | 305 | 91 | } | 306 | | // need to update the row pos in dst row to the src row pos when has | 307 | | // sequence column | 308 | 101 | dst_row->_row_pos = src_row->_row_pos; | 309 | 101 | } | 310 | | // dst is non-sequence row, or dst sequence is smaller | 311 | 76.9k | if constexpr (!has_skip_bitmap_col) { | 312 | 76.9k | DCHECK(_skip_bitmap_col_idx == -1); | 313 | 354k | for (size_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) { | 314 | 277k | auto* col_ptr = mutable_block.mutable_columns()[cid].get(); | 315 | 277k | _agg_functions[cid]->add(dst_row->agg_places(cid), | 316 | 277k | const_cast<const doris::vectorized::IColumn**>(&col_ptr), | 317 | 277k | src_row->_row_pos, _arena); | 318 | 277k | } | 319 | | } else { | 320 | | DCHECK(_skip_bitmap_col_idx != -1); | 321 | | DCHECK_LT(_skip_bitmap_col_idx, mutable_block.columns()); | 322 | | const BitmapValue& skip_bitmap = | 323 | | assert_cast<vectorized::ColumnBitmap*, TypeCheckOnRelease::DISABLE>( | 324 | | mutable_block.mutable_columns()[_skip_bitmap_col_idx].get()) | 325 | | ->get_data()[src_row->_row_pos]; | 326 | | for (size_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) { | 327 | | const auto& col = _tablet_schema->column(cid); | 328 | | if (cid != _skip_bitmap_col_idx && skip_bitmap.contains(col.unique_id())) { | 329 | | continue; | 330 | | } | 331 | | auto* col_ptr = mutable_block.mutable_columns()[cid].get(); | 332 | | _agg_functions[cid]->add(dst_row->agg_places(cid), | 333 | | const_cast<const doris::vectorized::IColumn**>(&col_ptr), | 334 | | src_row->_row_pos, _arena); | 335 | | } | 336 | | } | 337 | 76.9k | } |
_ZN5doris8MemTable27_aggregate_two_row_in_blockILb1EEEvRNS_10vectorized12MutableBlockEPNS_10RowInBlockES6_ Line | Count | Source | 294 | 3.96k | RowInBlock* src_row, RowInBlock* dst_row) { | 295 | | // for flexible partial update, the caller must guarantees that either src_row and dst_row | 296 | | // both specify the sequence column, or src_row and dst_row both don't specify the | 297 | | // sequence column | 298 | 3.96k | if (_tablet_schema->has_sequence_col() && _seq_col_idx_in_block >= 0) { | 299 | 0 | DCHECK_LT(_seq_col_idx_in_block, mutable_block.columns()); | 300 | 0 | auto col_ptr = mutable_block.mutable_columns()[_seq_col_idx_in_block].get(); | 301 | 0 | auto res = col_ptr->compare_at(dst_row->_row_pos, src_row->_row_pos, *col_ptr, -1); | 302 | | // dst sequence column larger than src, don't need to update | 303 | 0 | if (res > 0) { | 304 | 0 | return; | 305 | 0 | } | 306 | | // need to update the row pos in dst row to the src row pos when has | 307 | | // sequence column | 308 | 0 | dst_row->_row_pos = src_row->_row_pos; | 309 | 0 | } | 310 | | // dst is non-sequence row, or dst sequence is smaller | 311 | | if constexpr (!has_skip_bitmap_col) { | 312 | | DCHECK(_skip_bitmap_col_idx == -1); | 313 | | for (size_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) { | 314 | | auto* col_ptr = mutable_block.mutable_columns()[cid].get(); | 315 | | _agg_functions[cid]->add(dst_row->agg_places(cid), | 316 | | const_cast<const doris::vectorized::IColumn**>(&col_ptr), | 317 | | src_row->_row_pos, _arena); | 318 | | } | 319 | 3.96k | } else { | 320 | 3.96k | DCHECK(_skip_bitmap_col_idx != -1); | 321 | 3.96k | DCHECK_LT(_skip_bitmap_col_idx, mutable_block.columns()); | 322 | 3.96k | const BitmapValue& skip_bitmap = | 323 | 3.96k | assert_cast<vectorized::ColumnBitmap*, TypeCheckOnRelease::DISABLE>( | 324 | 3.96k | mutable_block.mutable_columns()[_skip_bitmap_col_idx].get()) | 325 | 3.96k | ->get_data()[src_row->_row_pos]; | 326 | 72.3k | for (size_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) { | 327 | 68.4k | const auto& col = _tablet_schema->column(cid); | 328 | 68.4k | if (cid != _skip_bitmap_col_idx && skip_bitmap.contains(col.unique_id())) { | 329 | 34.2k | continue; | 330 | 34.2k | } | 331 | 34.1k | auto* col_ptr = mutable_block.mutable_columns()[cid].get(); | 332 | 34.1k | _agg_functions[cid]->add(dst_row->agg_places(cid), | 333 | 34.1k | const_cast<const doris::vectorized::IColumn**>(&col_ptr), | 334 | 34.1k | src_row->_row_pos, _arena); | 335 | 34.1k | } | 336 | 3.96k | } | 337 | 3.96k | } |
|
338 | 52.3k | Status MemTable::_put_into_output(vectorized::Block& in_block) { |
339 | 52.3k | SCOPED_RAW_TIMER(&_stat.put_into_output_ns); |
340 | 52.3k | DorisVector<uint32_t> row_pos_vec; |
341 | 52.3k | DCHECK(in_block.rows() <= std::numeric_limits<int>::max()); |
342 | 52.3k | row_pos_vec.reserve(in_block.rows()); |
343 | 35.1M | for (int i = 0; i < _row_in_blocks->size(); i++) { |
344 | 35.0M | row_pos_vec.emplace_back((*_row_in_blocks)[i]->_row_pos); |
345 | 35.0M | } |
346 | 52.3k | return _output_mutable_block.add_rows(&in_block, row_pos_vec.data(), |
347 | 52.3k | row_pos_vec.data() + in_block.rows()); |
348 | 52.3k | } |
349 | | |
350 | 54.6k | size_t MemTable::_sort() { |
351 | 54.6k | SCOPED_RAW_TIMER(&_stat.sort_ns); |
352 | 54.6k | _stat.sort_times++; |
353 | 54.6k | size_t same_keys_num = 0; |
354 | | // sort new rows |
355 | 54.6k | Tie tie = Tie(_last_sorted_pos, _row_in_blocks->size()); |
356 | 209k | for (size_t i = 0; i < _tablet_schema->num_key_columns(); i++) { |
357 | 318M | auto cmp = [&](RowInBlock* lhs, RowInBlock* rhs) -> int { |
358 | 318M | return _input_mutable_block.compare_one_column(lhs->_row_pos, rhs->_row_pos, i, -1); |
359 | 318M | }; |
360 | 154k | _sort_one_column(*_row_in_blocks, tie, cmp); |
361 | 154k | } |
362 | 54.6k | bool is_dup = (_keys_type == KeysType::DUP_KEYS); |
363 | | // sort extra round by _row_pos to make the sort stable |
364 | 54.6k | auto iter = tie.iter(); |
365 | 242k | while (iter.next()) { |
366 | 187k | pdqsort(std::next(_row_in_blocks->begin(), iter.left()), |
367 | 187k | std::next(_row_in_blocks->begin(), iter.right()), |
368 | 187k | [&is_dup](const std::shared_ptr<RowInBlock>& lhs, |
369 | 64.9M | const std::shared_ptr<RowInBlock>& rhs) -> bool { |
370 | 64.9M | return is_dup ? lhs->_row_pos > rhs->_row_pos : lhs->_row_pos < rhs->_row_pos; |
371 | 64.9M | }); |
372 | 187k | same_keys_num += iter.right() - iter.left(); |
373 | 187k | } |
374 | | // merge new rows and old rows |
375 | 54.6k | _vec_row_comparator->set_block(&_input_mutable_block); |
376 | 54.6k | auto cmp_func = [this, is_dup, &same_keys_num](const std::shared_ptr<RowInBlock>& l, |
377 | 54.6k | const std::shared_ptr<RowInBlock>& r) -> bool { |
378 | 0 | auto value = (*(this->_vec_row_comparator))(l.get(), r.get()); |
379 | 0 | if (value == 0) { |
380 | 0 | same_keys_num++; |
381 | 0 | return is_dup ? l->_row_pos > r->_row_pos : l->_row_pos < r->_row_pos; |
382 | 0 | } else { |
383 | 0 | return value < 0; |
384 | 0 | } |
385 | 0 | }; |
386 | 54.6k | auto new_row_it = std::next(_row_in_blocks->begin(), _last_sorted_pos); |
387 | 54.6k | std::inplace_merge(_row_in_blocks->begin(), new_row_it, _row_in_blocks->end(), cmp_func); |
388 | 54.6k | _last_sorted_pos = _row_in_blocks->size(); |
389 | 54.6k | return same_keys_num; |
390 | 54.6k | } |
391 | | |
392 | 1.55k | Status MemTable::_sort_by_cluster_keys() { |
393 | 1.55k | SCOPED_RAW_TIMER(&_stat.sort_ns); |
394 | 1.55k | _stat.sort_times++; |
395 | | // sort all rows |
396 | 1.55k | vectorized::Block in_block = _output_mutable_block.to_block(); |
397 | 1.55k | vectorized::MutableBlock mutable_block = |
398 | 1.55k | vectorized::MutableBlock::build_mutable_block(&in_block); |
399 | 1.55k | auto clone_block = in_block.clone_without_columns(); |
400 | 1.55k | _output_mutable_block = vectorized::MutableBlock::build_mutable_block(&clone_block); |
401 | | |
402 | 1.55k | DorisVector<std::shared_ptr<RowInBlock>> row_in_blocks; |
403 | 1.55k | row_in_blocks.reserve(mutable_block.rows()); |
404 | 814k | for (size_t i = 0; i < mutable_block.rows(); i++) { |
405 | 812k | row_in_blocks.emplace_back(std::make_shared<RowInBlock>(i)); |
406 | 812k | } |
407 | 1.55k | Tie tie = Tie(0, mutable_block.rows()); |
408 | | |
409 | 3.77k | for (auto cid : _tablet_schema->cluster_key_uids()) { |
410 | 3.77k | auto index = _tablet_schema->field_index(cid); |
411 | 3.77k | if (index == -1) { |
412 | 0 | return Status::InternalError("could not find cluster key column with unique_id=" + |
413 | 0 | std::to_string(cid) + " in tablet schema"); |
414 | 0 | } |
415 | 8.55M | auto cmp = [&](const RowInBlock* lhs, const RowInBlock* rhs) -> int { |
416 | 8.55M | return mutable_block.compare_one_column(lhs->_row_pos, rhs->_row_pos, index, -1); |
417 | 8.55M | }; |
418 | 3.77k | _sort_one_column(row_in_blocks, tie, cmp); |
419 | 3.77k | } |
420 | | |
421 | | // sort extra round by _row_pos to make the sort stable |
422 | 1.55k | auto iter = tie.iter(); |
423 | 119k | while (iter.next()) { |
424 | 118k | pdqsort(std::next(row_in_blocks.begin(), iter.left()), |
425 | 118k | std::next(row_in_blocks.begin(), iter.right()), |
426 | 118k | [](const std::shared_ptr<RowInBlock>& lhs, const std::shared_ptr<RowInBlock>& rhs) |
427 | 849k | -> bool { return lhs->_row_pos < rhs->_row_pos; }); |
428 | 118k | } |
429 | | |
430 | 1.55k | in_block = mutable_block.to_block(); |
431 | 1.55k | SCOPED_RAW_TIMER(&_stat.put_into_output_ns); |
432 | 1.55k | DorisVector<uint32_t> row_pos_vec; |
433 | 1.55k | DCHECK(in_block.rows() <= std::numeric_limits<int>::max()); |
434 | 1.55k | row_pos_vec.reserve(in_block.rows()); |
435 | 821k | for (int i = 0; i < row_in_blocks.size(); i++) { |
436 | 820k | row_pos_vec.emplace_back(row_in_blocks[i]->_row_pos); |
437 | 820k | } |
438 | 1.55k | std::vector<int> column_offset; |
439 | 21.5k | for (int i = 0; i < _column_offset.size(); ++i) { |
440 | 20.0k | column_offset.emplace_back(i); |
441 | 20.0k | } |
442 | 1.55k | return _output_mutable_block.add_rows(&in_block, row_pos_vec.data(), |
443 | 1.55k | row_pos_vec.data() + in_block.rows(), &column_offset); |
444 | 1.55k | } |
445 | | |
446 | | void MemTable::_sort_one_column(DorisVector<std::shared_ptr<RowInBlock>>& row_in_blocks, Tie& tie, |
447 | 158k | std::function<int(RowInBlock*, RowInBlock*)> cmp) { |
448 | 158k | auto iter = tie.iter(); |
449 | 3.67M | while (iter.next()) { |
450 | 3.51M | pdqsort(std::next(row_in_blocks.begin(), static_cast<int>(iter.left())), |
451 | 3.51M | std::next(row_in_blocks.begin(), static_cast<int>(iter.right())), |
452 | 287M | [&cmp](auto lhs, auto rhs) -> bool { return cmp(lhs.get(), rhs.get()) < 0; }); |
453 | 3.51M | tie[iter.left()] = 0; |
454 | 52.2M | for (auto i = iter.left() + 1; i < iter.right(); i++) { |
455 | 48.7M | tie[i] = (cmp(row_in_blocks[i - 1].get(), row_in_blocks[i].get()) == 0); |
456 | 48.7M | } |
457 | 3.51M | } |
458 | 158k | } |
459 | | |
460 | | template <bool is_final> |
461 | | void MemTable::_finalize_one_row(RowInBlock* row, |
462 | | const vectorized::ColumnsWithTypeAndName& block_data, |
463 | 322k | int row_pos) { |
464 | | // move key columns |
465 | 848k | for (size_t i = 0; i < _tablet_schema->num_key_columns(); ++i) { |
466 | 525k | _output_mutable_block.get_column_by_position(i)->insert_from(*block_data[i].column.get(), |
467 | 525k | row->_row_pos); |
468 | 525k | } |
469 | 322k | if (row->has_init_agg()) { |
470 | | // get value columns from agg_places |
471 | 81.8k | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { |
472 | 74.5k | auto function = _agg_functions[i]; |
473 | 74.5k | auto* agg_place = row->agg_places(i); |
474 | 74.5k | auto* col_ptr = _output_mutable_block.get_column_by_position(i).get(); |
475 | 74.5k | function->insert_result_into(agg_place, *col_ptr); |
476 | | |
477 | 74.5k | if constexpr (is_final) { |
478 | 74.5k | function->destroy(agg_place); |
479 | 74.5k | } else { |
480 | 0 | function->reset(agg_place); |
481 | 0 | } |
482 | 74.5k | } |
483 | | |
484 | 7.31k | if constexpr (is_final) { |
485 | 7.31k | row->remove_init_agg(); |
486 | 7.31k | } else { |
487 | 0 | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { |
488 | 0 | auto function = _agg_functions[i]; |
489 | 0 | auto* agg_place = row->agg_places(i); |
490 | 0 | auto* col_ptr = _output_mutable_block.get_column_by_position(i).get(); |
491 | 0 | function->add(agg_place, const_cast<const doris::vectorized::IColumn**>(&col_ptr), |
492 | 0 | row_pos, _arena); |
493 | 0 | } |
494 | 0 | } |
495 | 315k | } else { |
496 | | // move columns for rows do not need agg |
497 | 5.74M | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { |
498 | 5.43M | _output_mutable_block.get_column_by_position(i)->insert_from( |
499 | 5.43M | *block_data[i].column.get(), row->_row_pos); |
500 | 5.43M | } |
501 | 315k | } |
502 | 322k | if constexpr (!is_final) { |
503 | 0 | row->_row_pos = row_pos; |
504 | 0 | } |
505 | 322k | } Unexecuted instantiation: _ZN5doris8MemTable17_finalize_one_rowILb0EEEvPNS_10RowInBlockERKSt6vectorINS_10vectorized21ColumnWithTypeAndNameESaIS6_EEi _ZN5doris8MemTable17_finalize_one_rowILb1EEEvPNS_10RowInBlockERKSt6vectorINS_10vectorized21ColumnWithTypeAndNameESaIS6_EEi Line | Count | Source | 463 | 322k | int row_pos) { | 464 | | // move key columns | 465 | 848k | for (size_t i = 0; i < _tablet_schema->num_key_columns(); ++i) { | 466 | 525k | _output_mutable_block.get_column_by_position(i)->insert_from(*block_data[i].column.get(), | 467 | 525k | row->_row_pos); | 468 | 525k | } | 469 | 322k | if (row->has_init_agg()) { | 470 | | // get value columns from agg_places | 471 | 81.8k | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { | 472 | 74.5k | auto function = _agg_functions[i]; | 473 | 74.5k | auto* agg_place = row->agg_places(i); | 474 | 74.5k | auto* col_ptr = _output_mutable_block.get_column_by_position(i).get(); | 475 | 74.5k | function->insert_result_into(agg_place, *col_ptr); | 476 | | | 477 | 74.5k | if constexpr (is_final) { | 478 | 74.5k | function->destroy(agg_place); | 479 | | } else { | 480 | | function->reset(agg_place); | 481 | | } | 482 | 74.5k | } | 483 | | | 484 | 7.31k | if constexpr (is_final) { | 485 | 7.31k | row->remove_init_agg(); | 486 | | } else { | 487 | | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { | 488 | | auto function = _agg_functions[i]; | 489 | | auto* agg_place = row->agg_places(i); | 490 | | auto* col_ptr = _output_mutable_block.get_column_by_position(i).get(); | 491 | | function->add(agg_place, const_cast<const doris::vectorized::IColumn**>(&col_ptr), | 492 | | row_pos, _arena); | 493 | | } | 494 | | } | 495 | 315k | } else { | 496 | | // move columns for rows do not need agg | 497 | 5.74M | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { | 498 | 5.43M | _output_mutable_block.get_column_by_position(i)->insert_from( | 499 | 5.43M | *block_data[i].column.get(), row->_row_pos); | 500 | 5.43M | } | 501 | 315k | } | 502 | | if constexpr (!is_final) { | 503 | | row->_row_pos = row_pos; | 504 | | } | 505 | 322k | } |
|
506 | | |
507 | 7.31k | void MemTable::_init_row_for_agg(RowInBlock* row, vectorized::MutableBlock& mutable_block) { |
508 | 7.31k | row->init_agg_places(_arena.aligned_alloc(_total_size_of_aggregate_states, 16), |
509 | 7.31k | _offsets_of_aggregate_states.data()); |
510 | 81.8k | for (auto cid = _tablet_schema->num_key_columns(); cid < _num_columns; cid++) { |
511 | 74.5k | auto* col_ptr = mutable_block.mutable_columns()[cid].get(); |
512 | 74.5k | auto* data = row->agg_places(cid); |
513 | 74.5k | _agg_functions[cid]->create(data); |
514 | 74.5k | _agg_functions[cid]->add(data, const_cast<const doris::vectorized::IColumn**>(&col_ptr), |
515 | 74.5k | row->_row_pos, _arena); |
516 | 74.5k | } |
517 | 7.31k | } |
518 | 77.0k | void MemTable::_clear_row_agg(RowInBlock* row) { |
519 | 77.0k | if (row->has_init_agg()) { |
520 | 54 | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { |
521 | 48 | auto function = _agg_functions[i]; |
522 | 48 | auto* agg_place = row->agg_places(i); |
523 | 48 | function->destroy(agg_place); |
524 | 48 | } |
525 | 6 | row->remove_init_agg(); |
526 | 6 | } |
527 | 77.0k | } |
528 | | // only in `to_block` the `is_final` flag will be true, in other cases, it will be false |
529 | | template <bool is_final, bool has_skip_bitmap_col> |
530 | 2.15k | void MemTable::_aggregate() { |
531 | 2.15k | SCOPED_RAW_TIMER(&_stat.agg_ns); |
532 | 2.15k | _stat.agg_times++; |
533 | 2.15k | vectorized::Block in_block = _input_mutable_block.to_block(); |
534 | 2.15k | vectorized::MutableBlock mutable_block = |
535 | 2.15k | vectorized::MutableBlock::build_mutable_block(&in_block); |
536 | 2.15k | _vec_row_comparator->set_block(&mutable_block); |
537 | 2.15k | auto& block_data = in_block.get_columns_with_type_and_name(); |
538 | 2.15k | DorisVector<std::shared_ptr<RowInBlock>> temp_row_in_blocks; |
539 | 2.15k | temp_row_in_blocks.reserve(_last_sorted_pos); |
540 | | //only init agg if needed |
541 | | |
542 | 2.15k | if constexpr (!has_skip_bitmap_col) { |
543 | 1.98k | RowInBlock* prev_row = nullptr; |
544 | 1.98k | int row_pos = -1; |
545 | 397k | for (const auto& cur_row_ptr : *_row_in_blocks) { |
546 | 397k | RowInBlock* cur_row = cur_row_ptr.get(); |
547 | 397k | if (!temp_row_in_blocks.empty() && (*_vec_row_comparator)(prev_row, cur_row) == 0) { |
548 | 76.8k | if (!prev_row->has_init_agg()) { |
549 | 5.87k | _init_row_for_agg(prev_row, mutable_block); |
550 | 5.87k | } |
551 | 76.8k | _stat.merged_rows++; |
552 | 76.8k | if (_tablet_schema->has_seq_map()) { |
553 | 3 | _aggregate_two_row_with_sequence_map(mutable_block, cur_row, prev_row); |
554 | 76.8k | } else { |
555 | 76.8k | _aggregate_two_row_in_block<has_skip_bitmap_col>(mutable_block, cur_row, |
556 | 76.8k | prev_row); |
557 | 76.8k | } |
558 | | |
559 | | // Clean up aggregation state of the merged row to avoid memory leak |
560 | 77.0k | if (cur_row) { |
561 | 77.0k | _clear_row_agg(cur_row); |
562 | 77.0k | } |
563 | 320k | } else { |
564 | 320k | prev_row = cur_row; |
565 | 320k | if (!temp_row_in_blocks.empty()) { |
566 | | // The rows from the previous batch of _row_in_blocks have been merged into temp_row_in_blocks, |
567 | | // now call finalize to write the aggregation results into _output_mutable_block. |
568 | 318k | _finalize_one_row<is_final>(temp_row_in_blocks.back().get(), block_data, |
569 | 318k | row_pos); |
570 | 318k | } |
571 | 320k | temp_row_in_blocks.push_back(cur_row_ptr); |
572 | 320k | row_pos++; |
573 | 320k | } |
574 | 397k | } |
575 | 1.99k | if (!temp_row_in_blocks.empty()) { |
576 | | // finalize the last low |
577 | 1.99k | _finalize_one_row<is_final>(temp_row_in_blocks.back().get(), block_data, row_pos); |
578 | 1.99k | } |
579 | 1.98k | } else { |
580 | 175 | DCHECK(_delete_sign_col_idx != -1); |
581 | 175 | if (_seq_col_idx_in_block == -1) { |
582 | 160 | _aggregate_for_flexible_partial_update_without_seq_col<is_final>( |
583 | 160 | block_data, mutable_block, temp_row_in_blocks); |
584 | 160 | } else { |
585 | 15 | _aggregate_for_flexible_partial_update_with_seq_col<is_final>(block_data, mutable_block, |
586 | 15 | temp_row_in_blocks); |
587 | 15 | } |
588 | 175 | } |
589 | 2.15k | if constexpr (!is_final) { |
590 | | // if is not final, we collect the agg results to input_block and then continue to insert |
591 | 0 | _input_mutable_block.swap(_output_mutable_block); |
592 | | //TODO(weixang):opt here. |
593 | 0 | std::unique_ptr<vectorized::Block> empty_input_block = in_block.create_same_struct_block(0); |
594 | 0 | _output_mutable_block = |
595 | 0 | vectorized::MutableBlock::build_mutable_block(empty_input_block.get()); |
596 | 0 | _output_mutable_block.clear_column_data(); |
597 | 0 | *_row_in_blocks = temp_row_in_blocks; |
598 | 0 | _last_sorted_pos = _row_in_blocks->size(); |
599 | 0 | } |
600 | 2.15k | } Unexecuted instantiation: _ZN5doris8MemTable10_aggregateILb0ELb0EEEvv Unexecuted instantiation: _ZN5doris8MemTable10_aggregateILb0ELb1EEEvv _ZN5doris8MemTable10_aggregateILb1ELb0EEEvv Line | Count | Source | 530 | 1.98k | void MemTable::_aggregate() { | 531 | 1.98k | SCOPED_RAW_TIMER(&_stat.agg_ns); | 532 | 1.98k | _stat.agg_times++; | 533 | 1.98k | vectorized::Block in_block = _input_mutable_block.to_block(); | 534 | 1.98k | vectorized::MutableBlock mutable_block = | 535 | 1.98k | vectorized::MutableBlock::build_mutable_block(&in_block); | 536 | 1.98k | _vec_row_comparator->set_block(&mutable_block); | 537 | 1.98k | auto& block_data = in_block.get_columns_with_type_and_name(); | 538 | 1.98k | DorisVector<std::shared_ptr<RowInBlock>> temp_row_in_blocks; | 539 | 1.98k | temp_row_in_blocks.reserve(_last_sorted_pos); | 540 | | //only init agg if needed | 541 | | | 542 | 1.98k | if constexpr (!has_skip_bitmap_col) { | 543 | 1.98k | RowInBlock* prev_row = nullptr; | 544 | 1.98k | int row_pos = -1; | 545 | 397k | for (const auto& cur_row_ptr : *_row_in_blocks) { | 546 | 397k | RowInBlock* cur_row = cur_row_ptr.get(); | 547 | 397k | if (!temp_row_in_blocks.empty() && (*_vec_row_comparator)(prev_row, cur_row) == 0) { | 548 | 76.8k | if (!prev_row->has_init_agg()) { | 549 | 5.87k | _init_row_for_agg(prev_row, mutable_block); | 550 | 5.87k | } | 551 | 76.8k | _stat.merged_rows++; | 552 | 76.8k | if (_tablet_schema->has_seq_map()) { | 553 | 3 | _aggregate_two_row_with_sequence_map(mutable_block, cur_row, prev_row); | 554 | 76.8k | } else { | 555 | 76.8k | _aggregate_two_row_in_block<has_skip_bitmap_col>(mutable_block, cur_row, | 556 | 76.8k | prev_row); | 557 | 76.8k | } | 558 | | | 559 | | // Clean up aggregation state of the merged row to avoid memory leak | 560 | 77.0k | if (cur_row) { | 561 | 77.0k | _clear_row_agg(cur_row); | 562 | 77.0k | } | 563 | 320k | } else { | 564 | 320k | prev_row = cur_row; | 565 | 320k | if (!temp_row_in_blocks.empty()) { | 566 | | // The rows from the previous batch of _row_in_blocks have been merged into temp_row_in_blocks, | 567 | | // now call finalize to write the aggregation results into _output_mutable_block. | 568 | 318k | _finalize_one_row<is_final>(temp_row_in_blocks.back().get(), block_data, | 569 | 318k | row_pos); | 570 | 318k | } | 571 | 320k | temp_row_in_blocks.push_back(cur_row_ptr); | 572 | 320k | row_pos++; | 573 | 320k | } | 574 | 397k | } | 575 | 1.99k | if (!temp_row_in_blocks.empty()) { | 576 | | // finalize the last low | 577 | 1.99k | _finalize_one_row<is_final>(temp_row_in_blocks.back().get(), block_data, row_pos); | 578 | 1.99k | } | 579 | | } else { | 580 | | DCHECK(_delete_sign_col_idx != -1); | 581 | | if (_seq_col_idx_in_block == -1) { | 582 | | _aggregate_for_flexible_partial_update_without_seq_col<is_final>( | 583 | | block_data, mutable_block, temp_row_in_blocks); | 584 | | } else { | 585 | | _aggregate_for_flexible_partial_update_with_seq_col<is_final>(block_data, mutable_block, | 586 | | temp_row_in_blocks); | 587 | | } | 588 | | } | 589 | | if constexpr (!is_final) { | 590 | | // if is not final, we collect the agg results to input_block and then continue to insert | 591 | | _input_mutable_block.swap(_output_mutable_block); | 592 | | //TODO(weixang):opt here. | 593 | | std::unique_ptr<vectorized::Block> empty_input_block = in_block.create_same_struct_block(0); | 594 | | _output_mutable_block = | 595 | | vectorized::MutableBlock::build_mutable_block(empty_input_block.get()); | 596 | | _output_mutable_block.clear_column_data(); | 597 | | *_row_in_blocks = temp_row_in_blocks; | 598 | | _last_sorted_pos = _row_in_blocks->size(); | 599 | | } | 600 | 1.98k | } |
_ZN5doris8MemTable10_aggregateILb1ELb1EEEvv Line | Count | Source | 530 | 175 | void MemTable::_aggregate() { | 531 | 175 | SCOPED_RAW_TIMER(&_stat.agg_ns); | 532 | 175 | _stat.agg_times++; | 533 | 175 | vectorized::Block in_block = _input_mutable_block.to_block(); | 534 | 175 | vectorized::MutableBlock mutable_block = | 535 | 175 | vectorized::MutableBlock::build_mutable_block(&in_block); | 536 | 175 | _vec_row_comparator->set_block(&mutable_block); | 537 | 175 | auto& block_data = in_block.get_columns_with_type_and_name(); | 538 | 175 | DorisVector<std::shared_ptr<RowInBlock>> temp_row_in_blocks; | 539 | 175 | temp_row_in_blocks.reserve(_last_sorted_pos); | 540 | | //only init agg if needed | 541 | | | 542 | | if constexpr (!has_skip_bitmap_col) { | 543 | | RowInBlock* prev_row = nullptr; | 544 | | int row_pos = -1; | 545 | | for (const auto& cur_row_ptr : *_row_in_blocks) { | 546 | | RowInBlock* cur_row = cur_row_ptr.get(); | 547 | | if (!temp_row_in_blocks.empty() && (*_vec_row_comparator)(prev_row, cur_row) == 0) { | 548 | | if (!prev_row->has_init_agg()) { | 549 | | _init_row_for_agg(prev_row, mutable_block); | 550 | | } | 551 | | _stat.merged_rows++; | 552 | | if (_tablet_schema->has_seq_map()) { | 553 | | _aggregate_two_row_with_sequence_map(mutable_block, cur_row, prev_row); | 554 | | } else { | 555 | | _aggregate_two_row_in_block<has_skip_bitmap_col>(mutable_block, cur_row, | 556 | | prev_row); | 557 | | } | 558 | | | 559 | | // Clean up aggregation state of the merged row to avoid memory leak | 560 | | if (cur_row) { | 561 | | _clear_row_agg(cur_row); | 562 | | } | 563 | | } else { | 564 | | prev_row = cur_row; | 565 | | if (!temp_row_in_blocks.empty()) { | 566 | | // The rows from the previous batch of _row_in_blocks have been merged into temp_row_in_blocks, | 567 | | // now call finalize to write the aggregation results into _output_mutable_block. | 568 | | _finalize_one_row<is_final>(temp_row_in_blocks.back().get(), block_data, | 569 | | row_pos); | 570 | | } | 571 | | temp_row_in_blocks.push_back(cur_row_ptr); | 572 | | row_pos++; | 573 | | } | 574 | | } | 575 | | if (!temp_row_in_blocks.empty()) { | 576 | | // finalize the last low | 577 | | _finalize_one_row<is_final>(temp_row_in_blocks.back().get(), block_data, row_pos); | 578 | | } | 579 | 175 | } else { | 580 | 175 | DCHECK(_delete_sign_col_idx != -1); | 581 | 175 | if (_seq_col_idx_in_block == -1) { | 582 | 160 | _aggregate_for_flexible_partial_update_without_seq_col<is_final>( | 583 | 160 | block_data, mutable_block, temp_row_in_blocks); | 584 | 160 | } else { | 585 | 15 | _aggregate_for_flexible_partial_update_with_seq_col<is_final>(block_data, mutable_block, | 586 | 15 | temp_row_in_blocks); | 587 | 15 | } | 588 | 175 | } | 589 | | if constexpr (!is_final) { | 590 | | // if is not final, we collect the agg results to input_block and then continue to insert | 591 | | _input_mutable_block.swap(_output_mutable_block); | 592 | | //TODO(weixang):opt here. | 593 | | std::unique_ptr<vectorized::Block> empty_input_block = in_block.create_same_struct_block(0); | 594 | | _output_mutable_block = | 595 | | vectorized::MutableBlock::build_mutable_block(empty_input_block.get()); | 596 | | _output_mutable_block.clear_column_data(); | 597 | | *_row_in_blocks = temp_row_in_blocks; | 598 | | _last_sorted_pos = _row_in_blocks->size(); | 599 | | } | 600 | 175 | } |
|
601 | | |
602 | | template <bool is_final> |
603 | | void MemTable::_aggregate_for_flexible_partial_update_without_seq_col( |
604 | | const vectorized::ColumnsWithTypeAndName& block_data, |
605 | | vectorized::MutableBlock& mutable_block, |
606 | 160 | DorisVector<std::shared_ptr<RowInBlock>>& temp_row_in_blocks) { |
607 | 160 | std::shared_ptr<RowInBlock> prev_row {nullptr}; |
608 | 160 | int row_pos = -1; |
609 | 160 | auto& skip_bitmaps = assert_cast<vectorized::ColumnBitmap*>( |
610 | 160 | mutable_block.mutable_columns()[_skip_bitmap_col_idx].get()) |
611 | 160 | ->get_data(); |
612 | 160 | auto& delete_signs = assert_cast<vectorized::ColumnInt8*>( |
613 | 160 | mutable_block.mutable_columns()[_delete_sign_col_idx].get()) |
614 | 160 | ->get_data(); |
615 | 160 | std::shared_ptr<RowInBlock> row_with_delete_sign {nullptr}; |
616 | 160 | std::shared_ptr<RowInBlock> row_without_delete_sign {nullptr}; |
617 | | |
618 | 1.91k | auto finalize_rows = [&]() { |
619 | 1.91k | if (row_with_delete_sign != nullptr) { |
620 | 14 | temp_row_in_blocks.push_back(row_with_delete_sign); |
621 | 14 | _finalize_one_row<is_final>(row_with_delete_sign.get(), block_data, ++row_pos); |
622 | 14 | row_with_delete_sign = nullptr; |
623 | 14 | } |
624 | 1.91k | if (row_without_delete_sign != nullptr) { |
625 | 1.74k | temp_row_in_blocks.push_back(row_without_delete_sign); |
626 | 1.74k | _finalize_one_row<is_final>(row_without_delete_sign.get(), block_data, ++row_pos); |
627 | 1.74k | row_without_delete_sign = nullptr; |
628 | 1.74k | } |
629 | | // _arena.clear(); |
630 | 1.91k | }; Unexecuted instantiation: _ZZN5doris8MemTable54_aggregate_for_flexible_partial_update_without_seq_colILb0EEEvRKSt6vectorINS_10vectorized21ColumnWithTypeAndNameESaIS4_EERNS3_12MutableBlockERS2_ISt10shared_ptrINS_10RowInBlockEENS_18CustomStdAllocatorISD_NS_9AllocatorILb0ELb0ELb0ENS_22DefaultMemoryAllocatorELb1EEEEEEENKUlvE_clEv _ZZN5doris8MemTable54_aggregate_for_flexible_partial_update_without_seq_colILb1EEEvRKSt6vectorINS_10vectorized21ColumnWithTypeAndNameESaIS4_EERNS3_12MutableBlockERS2_ISt10shared_ptrINS_10RowInBlockEENS_18CustomStdAllocatorISD_NS_9AllocatorILb0ELb0ELb0ENS_22DefaultMemoryAllocatorELb1EEEEEEENKUlvE_clEv Line | Count | Source | 618 | 1.91k | auto finalize_rows = [&]() { | 619 | 1.91k | if (row_with_delete_sign != nullptr) { | 620 | 14 | temp_row_in_blocks.push_back(row_with_delete_sign); | 621 | 14 | _finalize_one_row<is_final>(row_with_delete_sign.get(), block_data, ++row_pos); | 622 | 14 | row_with_delete_sign = nullptr; | 623 | 14 | } | 624 | 1.91k | if (row_without_delete_sign != nullptr) { | 625 | 1.74k | temp_row_in_blocks.push_back(row_without_delete_sign); | 626 | 1.74k | _finalize_one_row<is_final>(row_without_delete_sign.get(), block_data, ++row_pos); | 627 | 1.74k | row_without_delete_sign = nullptr; | 628 | 1.74k | } | 629 | | // _arena.clear(); | 630 | 1.91k | }; |
|
631 | | |
632 | 1.77k | auto add_row = [&](std::shared_ptr<RowInBlock> row, bool with_delete_sign) { |
633 | 1.77k | if (with_delete_sign) { |
634 | 14 | row_with_delete_sign = std::move(row); |
635 | 1.75k | } else { |
636 | 1.75k | row_without_delete_sign = std::move(row); |
637 | 1.75k | } |
638 | 1.77k | }; Unexecuted instantiation: _ZZN5doris8MemTable54_aggregate_for_flexible_partial_update_without_seq_colILb0EEEvRKSt6vectorINS_10vectorized21ColumnWithTypeAndNameESaIS4_EERNS3_12MutableBlockERS2_ISt10shared_ptrINS_10RowInBlockEENS_18CustomStdAllocatorISD_NS_9AllocatorILb0ELb0ELb0ENS_22DefaultMemoryAllocatorELb1EEEEEEENKUlSD_bE_clESD_b _ZZN5doris8MemTable54_aggregate_for_flexible_partial_update_without_seq_colILb1EEEvRKSt6vectorINS_10vectorized21ColumnWithTypeAndNameESaIS4_EERNS3_12MutableBlockERS2_ISt10shared_ptrINS_10RowInBlockEENS_18CustomStdAllocatorISD_NS_9AllocatorILb0ELb0ELb0ENS_22DefaultMemoryAllocatorELb1EEEEEEENKUlSD_bE_clESD_b Line | Count | Source | 632 | 1.77k | auto add_row = [&](std::shared_ptr<RowInBlock> row, bool with_delete_sign) { | 633 | 1.77k | if (with_delete_sign) { | 634 | 14 | row_with_delete_sign = std::move(row); | 635 | 1.75k | } else { | 636 | 1.75k | row_without_delete_sign = std::move(row); | 637 | 1.75k | } | 638 | 1.77k | }; |
|
639 | 5.73k | for (const auto& cur_row_ptr : *_row_in_blocks) { |
640 | 5.73k | RowInBlock* cur_row = cur_row_ptr.get(); |
641 | 5.73k | const BitmapValue& skip_bitmap = skip_bitmaps[cur_row->_row_pos]; |
642 | 5.73k | bool cur_row_has_delete_sign = (!skip_bitmap.contains(_delete_sign_col_unique_id) && |
643 | 5.73k | delete_signs[cur_row->_row_pos] != 0); |
644 | 5.73k | prev_row = |
645 | 5.73k | (row_with_delete_sign == nullptr) ? row_without_delete_sign : row_with_delete_sign; |
646 | | // compare keys, the keys of row_with_delete_sign and row_without_delete_sign is the same, |
647 | | // choose any of them if it's valid |
648 | 5.73k | if (prev_row != nullptr && (*_vec_row_comparator)(prev_row.get(), cur_row) == 0) { |
649 | 3.98k | if (cur_row_has_delete_sign) { |
650 | 24 | if (row_without_delete_sign != nullptr) { |
651 | | // if there exits row without delete sign, remove it first |
652 | 13 | _clear_row_agg(row_without_delete_sign.get()); |
653 | 13 | _stat.merged_rows++; |
654 | 13 | row_without_delete_sign = nullptr; |
655 | 13 | } |
656 | | // and then unconditionally replace the previous row |
657 | 24 | prev_row = row_with_delete_sign; |
658 | 3.95k | } else { |
659 | 3.95k | prev_row = row_without_delete_sign; |
660 | 3.95k | } |
661 | | |
662 | 3.98k | if (prev_row == nullptr) { |
663 | 17 | add_row(cur_row_ptr, cur_row_has_delete_sign); |
664 | 3.96k | } else { |
665 | 3.96k | if (!prev_row->has_init_agg()) { |
666 | 1.44k | _init_row_for_agg(prev_row.get(), mutable_block); |
667 | 1.44k | } |
668 | 3.96k | _stat.merged_rows++; |
669 | 3.96k | _aggregate_two_row_in_block<true>(mutable_block, cur_row, prev_row.get()); |
670 | 3.96k | } |
671 | 3.98k | } else { |
672 | 1.75k | finalize_rows(); |
673 | 1.75k | add_row(cur_row_ptr, cur_row_has_delete_sign); |
674 | 1.75k | } |
675 | 5.73k | } |
676 | | // finalize the last lows |
677 | 160 | finalize_rows(); |
678 | 160 | } Unexecuted instantiation: _ZN5doris8MemTable54_aggregate_for_flexible_partial_update_without_seq_colILb0EEEvRKSt6vectorINS_10vectorized21ColumnWithTypeAndNameESaIS4_EERNS3_12MutableBlockERS2_ISt10shared_ptrINS_10RowInBlockEENS_18CustomStdAllocatorISD_NS_9AllocatorILb0ELb0ELb0ENS_22DefaultMemoryAllocatorELb1EEEEEE _ZN5doris8MemTable54_aggregate_for_flexible_partial_update_without_seq_colILb1EEEvRKSt6vectorINS_10vectorized21ColumnWithTypeAndNameESaIS4_EERNS3_12MutableBlockERS2_ISt10shared_ptrINS_10RowInBlockEENS_18CustomStdAllocatorISD_NS_9AllocatorILb0ELb0ELb0ENS_22DefaultMemoryAllocatorELb1EEEEEE Line | Count | Source | 606 | 160 | DorisVector<std::shared_ptr<RowInBlock>>& temp_row_in_blocks) { | 607 | 160 | std::shared_ptr<RowInBlock> prev_row {nullptr}; | 608 | 160 | int row_pos = -1; | 609 | 160 | auto& skip_bitmaps = assert_cast<vectorized::ColumnBitmap*>( | 610 | 160 | mutable_block.mutable_columns()[_skip_bitmap_col_idx].get()) | 611 | 160 | ->get_data(); | 612 | 160 | auto& delete_signs = assert_cast<vectorized::ColumnInt8*>( | 613 | 160 | mutable_block.mutable_columns()[_delete_sign_col_idx].get()) | 614 | 160 | ->get_data(); | 615 | 160 | std::shared_ptr<RowInBlock> row_with_delete_sign {nullptr}; | 616 | 160 | std::shared_ptr<RowInBlock> row_without_delete_sign {nullptr}; | 617 | | | 618 | 160 | auto finalize_rows = [&]() { | 619 | 160 | if (row_with_delete_sign != nullptr) { | 620 | 160 | temp_row_in_blocks.push_back(row_with_delete_sign); | 621 | 160 | _finalize_one_row<is_final>(row_with_delete_sign.get(), block_data, ++row_pos); | 622 | 160 | row_with_delete_sign = nullptr; | 623 | 160 | } | 624 | 160 | if (row_without_delete_sign != nullptr) { | 625 | 160 | temp_row_in_blocks.push_back(row_without_delete_sign); | 626 | 160 | _finalize_one_row<is_final>(row_without_delete_sign.get(), block_data, ++row_pos); | 627 | 160 | row_without_delete_sign = nullptr; | 628 | 160 | } | 629 | | // _arena.clear(); | 630 | 160 | }; | 631 | | | 632 | 160 | auto add_row = [&](std::shared_ptr<RowInBlock> row, bool with_delete_sign) { | 633 | 160 | if (with_delete_sign) { | 634 | 160 | row_with_delete_sign = std::move(row); | 635 | 160 | } else { | 636 | 160 | row_without_delete_sign = std::move(row); | 637 | 160 | } | 638 | 160 | }; | 639 | 5.73k | for (const auto& cur_row_ptr : *_row_in_blocks) { | 640 | 5.73k | RowInBlock* cur_row = cur_row_ptr.get(); | 641 | 5.73k | const BitmapValue& skip_bitmap = skip_bitmaps[cur_row->_row_pos]; | 642 | 5.73k | bool cur_row_has_delete_sign = (!skip_bitmap.contains(_delete_sign_col_unique_id) && | 643 | 5.73k | delete_signs[cur_row->_row_pos] != 0); | 644 | 5.73k | prev_row = | 645 | 5.73k | (row_with_delete_sign == nullptr) ? row_without_delete_sign : row_with_delete_sign; | 646 | | // compare keys, the keys of row_with_delete_sign and row_without_delete_sign is the same, | 647 | | // choose any of them if it's valid | 648 | 5.73k | if (prev_row != nullptr && (*_vec_row_comparator)(prev_row.get(), cur_row) == 0) { | 649 | 3.98k | if (cur_row_has_delete_sign) { | 650 | 24 | if (row_without_delete_sign != nullptr) { | 651 | | // if there exits row without delete sign, remove it first | 652 | 13 | _clear_row_agg(row_without_delete_sign.get()); | 653 | 13 | _stat.merged_rows++; | 654 | 13 | row_without_delete_sign = nullptr; | 655 | 13 | } | 656 | | // and then unconditionally replace the previous row | 657 | 24 | prev_row = row_with_delete_sign; | 658 | 3.95k | } else { | 659 | 3.95k | prev_row = row_without_delete_sign; | 660 | 3.95k | } | 661 | | | 662 | 3.98k | if (prev_row == nullptr) { | 663 | 17 | add_row(cur_row_ptr, cur_row_has_delete_sign); | 664 | 3.96k | } else { | 665 | 3.96k | if (!prev_row->has_init_agg()) { | 666 | 1.44k | _init_row_for_agg(prev_row.get(), mutable_block); | 667 | 1.44k | } | 668 | 3.96k | _stat.merged_rows++; | 669 | 3.96k | _aggregate_two_row_in_block<true>(mutable_block, cur_row, prev_row.get()); | 670 | 3.96k | } | 671 | 3.98k | } else { | 672 | 1.75k | finalize_rows(); | 673 | 1.75k | add_row(cur_row_ptr, cur_row_has_delete_sign); | 674 | 1.75k | } | 675 | 5.73k | } | 676 | | // finalize the last lows | 677 | 160 | finalize_rows(); | 678 | 160 | } |
|
679 | | |
680 | | template <bool is_final> |
681 | | void MemTable::_aggregate_for_flexible_partial_update_with_seq_col( |
682 | | const vectorized::ColumnsWithTypeAndName& block_data, |
683 | | vectorized::MutableBlock& mutable_block, |
684 | 15 | DorisVector<std::shared_ptr<RowInBlock>>& temp_row_in_blocks) { |
685 | | // For flexible partial update, when table has sequence column, we don't do any aggregation |
686 | | // in memtable. These duplicate rows will be aggregated in VerticalSegmentWriter |
687 | 15 | int row_pos = -1; |
688 | 482 | for (const auto& row_ptr : *_row_in_blocks) { |
689 | 482 | RowInBlock* row = row_ptr.get(); |
690 | 482 | temp_row_in_blocks.push_back(row_ptr); |
691 | 482 | _finalize_one_row<is_final>(row, block_data, ++row_pos); |
692 | 482 | } |
693 | 15 | } Unexecuted instantiation: _ZN5doris8MemTable51_aggregate_for_flexible_partial_update_with_seq_colILb0EEEvRKSt6vectorINS_10vectorized21ColumnWithTypeAndNameESaIS4_EERNS3_12MutableBlockERS2_ISt10shared_ptrINS_10RowInBlockEENS_18CustomStdAllocatorISD_NS_9AllocatorILb0ELb0ELb0ENS_22DefaultMemoryAllocatorELb1EEEEEE _ZN5doris8MemTable51_aggregate_for_flexible_partial_update_with_seq_colILb1EEEvRKSt6vectorINS_10vectorized21ColumnWithTypeAndNameESaIS4_EERNS3_12MutableBlockERS2_ISt10shared_ptrINS_10RowInBlockEENS_18CustomStdAllocatorISD_NS_9AllocatorILb0ELb0ELb0ENS_22DefaultMemoryAllocatorELb1EEEEEE Line | Count | Source | 684 | 15 | DorisVector<std::shared_ptr<RowInBlock>>& temp_row_in_blocks) { | 685 | | // For flexible partial update, when table has sequence column, we don't do any aggregation | 686 | | // in memtable. These duplicate rows will be aggregated in VerticalSegmentWriter | 687 | 15 | int row_pos = -1; | 688 | 482 | for (const auto& row_ptr : *_row_in_blocks) { | 689 | 482 | RowInBlock* row = row_ptr.get(); | 690 | 482 | temp_row_in_blocks.push_back(row_ptr); | 691 | 482 | _finalize_one_row<is_final>(row, block_data, ++row_pos); | 692 | 482 | } | 693 | 15 | } |
|
694 | | |
695 | 0 | void MemTable::shrink_memtable_by_agg() { |
696 | 0 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
697 | 0 | _resource_ctx->memory_context()->mem_tracker()->write_tracker()); |
698 | 0 | SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); |
699 | 0 | if (_keys_type == KeysType::DUP_KEYS) { |
700 | 0 | return; |
701 | 0 | } |
702 | 0 | size_t same_keys_num = _sort(); |
703 | 0 | if (same_keys_num != 0) { |
704 | 0 | (_skip_bitmap_col_idx == -1) ? _aggregate<false, false>() : _aggregate<false, true>(); |
705 | 0 | } |
706 | 0 | _last_agg_pos = memory_usage(); |
707 | 0 | } |
708 | | |
709 | 114k | bool MemTable::need_flush() const { |
710 | 114k | DBUG_EXECUTE_IF("MemTable.need_flush", { return true; }); |
711 | 114k | auto max_size = _adaptive_write_buffer_size(); |
712 | 114k | if (_partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) { |
713 | 1.22k | auto update_columns_size = _num_columns; |
714 | 1.22k | auto min_buffer_size = config::min_write_buffer_size_for_partial_update; |
715 | 1.22k | max_size = max_size * update_columns_size / _tablet_schema->num_columns(); |
716 | 1.22k | max_size = max_size > min_buffer_size ? max_size : min_buffer_size; |
717 | 1.22k | } |
718 | | |
719 | 114k | if (memory_usage() >= max_size) { |
720 | 0 | g_flush_cuz_memtable_full << 1; |
721 | 0 | return true; |
722 | 0 | } |
723 | 114k | return false; |
724 | 114k | } |
725 | | |
726 | 114k | int64_t MemTable::_adaptive_write_buffer_size() const { |
727 | 114k | if (!config::enable_adaptive_write_buffer_size) [[unlikely]] { |
728 | 0 | return config::write_buffer_size; |
729 | 0 | } |
730 | 114k | const int64_t current_load_mem_value = MemoryProfile::load_current_usage(); |
731 | 114k | int64_t factor = 4; |
732 | | // Memory usage intervals: |
733 | | // (80 %, 100 %] → 1× buffer |
734 | | // (50 %, 80 %] → 2× buffer |
735 | | // [0 %, 50 %] → 4× buffer |
736 | 114k | if (current_load_mem_value > (_load_mem_limit * 4) / 5) { // > 80 % |
737 | 0 | factor = 1; |
738 | 114k | } else if (current_load_mem_value > _load_mem_limit / 2) { // > 50 % |
739 | 0 | factor = 2; |
740 | 0 | } |
741 | 114k | return config::write_buffer_size * factor; |
742 | 114k | } |
743 | | |
744 | 114k | bool MemTable::need_agg() const { |
745 | 114k | if (_keys_type == KeysType::AGG_KEYS) { |
746 | 4.95k | auto max_size = _last_agg_pos + config::write_buffer_size_for_agg; |
747 | 4.95k | return memory_usage() >= max_size; |
748 | 4.95k | } |
749 | 109k | return false; |
750 | 114k | } |
751 | | |
752 | 54.7k | size_t MemTable::get_flush_reserve_memory_size() const { |
753 | 54.7k | if (_keys_type == KeysType::DUP_KEYS && _tablet_schema->num_key_columns() == 0) { |
754 | 15 | return 0; // no need to reserve |
755 | 15 | } |
756 | 54.7k | return static_cast<size_t>(static_cast<double>(_input_mutable_block.allocated_bytes()) * 1.2); |
757 | 54.7k | } |
758 | | |
759 | 54.6k | Status MemTable::_to_block(std::unique_ptr<vectorized::Block>* res) { |
760 | 54.6k | size_t same_keys_num = _sort(); |
761 | 54.6k | if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0) { |
762 | 52.5k | if (_keys_type == KeysType::DUP_KEYS && _tablet_schema->num_key_columns() == 0) { |
763 | 15 | _output_mutable_block.swap(_input_mutable_block); |
764 | 52.5k | } else { |
765 | 52.5k | vectorized::Block in_block = _input_mutable_block.to_block(); |
766 | 52.5k | RETURN_IF_ERROR(_put_into_output(in_block)); |
767 | 52.5k | } |
768 | 52.5k | } else { |
769 | 2.09k | (_skip_bitmap_col_idx == -1) ? _aggregate<true, false>() : _aggregate<true, true>(); |
770 | 2.09k | } |
771 | 54.6k | if (_keys_type == KeysType::UNIQUE_KEYS && _enable_unique_key_mow && |
772 | 54.6k | !_tablet_schema->cluster_key_uids().empty()) { |
773 | 1.55k | if (_partial_update_mode != UniqueKeyUpdateModePB::UPSERT) { |
774 | 0 | return Status::InternalError( |
775 | 0 | "Partial update for mow with cluster keys is not supported"); |
776 | 0 | } |
777 | 1.55k | RETURN_IF_ERROR(_sort_by_cluster_keys()); |
778 | 1.55k | } |
779 | 54.6k | _input_mutable_block.clear(); |
780 | 54.6k | *res = vectorized::Block::create_unique(_output_mutable_block.to_block()); |
781 | 54.6k | return Status::OK(); |
782 | 54.6k | } |
783 | | |
784 | 54.6k | Status MemTable::to_block(std::unique_ptr<vectorized::Block>* res) { |
785 | 54.6k | RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_to_block(res)); |
786 | 54.7k | return Status::OK(); |
787 | 54.6k | } |
788 | | |
789 | | #include "common/compile_check_end.h" |
790 | | } // namespace doris |