/root/doris/be/src/olap/memtable.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "olap/memtable.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/olap_file.pb.h> |
22 | | #include <pdqsort.h> |
23 | | |
24 | | #include <algorithm> |
25 | | #include <limits> |
26 | | #include <string> |
27 | | #include <vector> |
28 | | |
29 | | #include "bvar/bvar.h" |
30 | | #include "common/config.h" |
31 | | #include "olap/memtable_memory_limiter.h" |
32 | | #include "olap/olap_define.h" |
33 | | #include "olap/tablet_schema.h" |
34 | | #include "runtime/descriptors.h" |
35 | | #include "runtime/exec_env.h" |
36 | | #include "runtime/thread_context.h" |
37 | | #include "util/debug_points.h" |
38 | | #include "util/runtime_profile.h" |
39 | | #include "util/stopwatch.hpp" |
40 | | #include "vec/aggregate_functions/aggregate_function_reader.h" |
41 | | #include "vec/aggregate_functions/aggregate_function_simple_factory.h" |
42 | | #include "vec/columns/column.h" |
43 | | |
44 | | namespace doris { |
45 | | |
46 | | bvar::Adder<int64_t> g_memtable_cnt("memtable_cnt"); |
47 | | |
48 | | using namespace ErrorCode; |
49 | | |
50 | | MemTable::MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schema, |
51 | | const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc, |
52 | | bool enable_unique_key_mow, PartialUpdateInfo* partial_update_info) |
53 | | : _mem_type(MemType::ACTIVE), |
54 | | _tablet_id(tablet_id), |
55 | | _enable_unique_key_mow(enable_unique_key_mow), |
56 | | _keys_type(tablet_schema->keys_type()), |
57 | | _tablet_schema(tablet_schema), |
58 | | _is_first_insertion(true), |
59 | | _agg_functions(tablet_schema->num_columns()), |
60 | | _offsets_of_aggregate_states(tablet_schema->num_columns()), |
61 | 459k | _total_size_of_aggregate_states(0) { |
62 | 459k | g_memtable_cnt << 1; |
63 | 459k | _query_thread_context.init_unlocked(); |
64 | 459k | _arena = std::make_unique<vectorized::Arena>(); |
65 | 459k | _vec_row_comparator = std::make_shared<RowInBlockComparator>(_tablet_schema); |
66 | 459k | _num_columns = _tablet_schema->num_columns(); |
67 | 459k | if (partial_update_info != nullptr) { |
68 | 458k | _partial_update_mode = partial_update_info->update_mode(); |
69 | 458k | if (_partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) { |
70 | 4.51k | _num_columns = partial_update_info->partial_update_input_columns.size(); |
71 | 4.51k | if (partial_update_info->is_schema_contains_auto_inc_column && |
72 | 4.51k | !partial_update_info->is_input_columns_contains_auto_inc_column) { |
73 | 386 | _is_partial_update_and_auto_inc = true; |
74 | 386 | _num_columns += 1; |
75 | 386 | } |
76 | 4.51k | } |
77 | 458k | } |
78 | | // TODO: Support ZOrderComparator in the future |
79 | 459k | _init_columns_offset_by_slot_descs(slot_descs, tuple_desc); |
80 | 459k | _mem_tracker = std::make_shared<MemTracker>(); |
81 | 459k | } |
82 | | |
83 | | void MemTable::_init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs, |
84 | 458k | const TupleDescriptor* tuple_desc) { |
85 | 5.32M | for (auto slot_desc : *slot_descs) { |
86 | 5.32M | const auto& slots = tuple_desc->slots(); |
87 | 77.1M | for (int j = 0; j < slots.size(); ++j) { |
88 | 77.1M | if (slot_desc->id() == slots[j]->id()) { |
89 | 5.31M | _column_offset.emplace_back(j); |
90 | 5.31M | break; |
91 | 5.31M | } |
92 | 77.1M | } |
93 | 5.32M | } |
94 | 458k | if (_is_partial_update_and_auto_inc) { |
95 | 386 | _column_offset.emplace_back(_column_offset.size()); |
96 | 386 | } |
97 | 458k | } |
98 | | |
99 | 52.0k | void MemTable::_init_agg_functions(const vectorized::Block* block) { |
100 | 563k | for (uint32_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) { |
101 | 511k | vectorized::AggregateFunctionPtr function; |
102 | 511k | if (_keys_type == KeysType::UNIQUE_KEYS && _enable_unique_key_mow) { |
103 | | // In such table, non-key column's aggregation type is NONE, so we need to construct |
104 | | // the aggregate function manually. |
105 | 432k | if (_skip_bitmap_col_idx != cid) { |
106 | 431k | function = vectorized::AggregateFunctionSimpleFactory::instance().get( |
107 | 431k | "replace_load", {block->get_data_type(cid)}, |
108 | 431k | block->get_data_type(cid)->is_nullable(), |
109 | 431k | BeExecVersionManager::get_newest_version()); |
110 | 431k | } else { |
111 | 522 | function = vectorized::AggregateFunctionSimpleFactory::instance().get( |
112 | 522 | "bitmap_intersect", {block->get_data_type(cid)}, false, |
113 | 522 | BeExecVersionManager::get_newest_version()); |
114 | 522 | } |
115 | 432k | } else { |
116 | 79.5k | function = _tablet_schema->column(cid).get_aggregate_function( |
117 | 79.5k | vectorized::AGG_LOAD_SUFFIX, _tablet_schema->column(cid).get_be_exec_version()); |
118 | 79.5k | if (function == nullptr) { |
119 | 0 | LOG(WARNING) << "column get aggregate function failed, column=" |
120 | 0 | << _tablet_schema->column(cid).name(); |
121 | 0 | } |
122 | 79.5k | } |
123 | | |
124 | 511k | DCHECK(function != nullptr); |
125 | 511k | _agg_functions[cid] = function; |
126 | 511k | } |
127 | | |
128 | 563k | for (uint32_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) { |
129 | 511k | _offsets_of_aggregate_states[cid] = _total_size_of_aggregate_states; |
130 | 511k | _total_size_of_aggregate_states += _agg_functions[cid]->size_of_data(); |
131 | | |
132 | | // If not the last aggregate_state, we need pad it so that next aggregate_state will be aligned. |
133 | 511k | if (cid + 1 < _num_columns) { |
134 | 459k | size_t alignment_of_next_state = _agg_functions[cid + 1]->align_of_data(); |
135 | | |
136 | | /// Extend total_size to next alignment requirement |
137 | | /// Add padding by rounding up 'total_size_of_aggregate_states' to be a multiplier of alignment_of_next_state. |
138 | 459k | _total_size_of_aggregate_states = |
139 | 459k | (_total_size_of_aggregate_states + alignment_of_next_state - 1) / |
140 | 459k | alignment_of_next_state * alignment_of_next_state; |
141 | 459k | } |
142 | 511k | } |
143 | 52.0k | } |
144 | | |
145 | 460k | MemTable::~MemTable() { |
146 | 460k | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker); |
147 | 460k | { |
148 | 460k | SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); |
149 | 460k | g_memtable_cnt << -1; |
150 | 460k | if (_keys_type != KeysType::DUP_KEYS) { |
151 | 21.1M | for (auto it = _row_in_blocks.begin(); it != _row_in_blocks.end(); it++) { |
152 | 21.0M | if (!(*it)->has_init_agg()) { |
153 | 21.0M | continue; |
154 | 21.0M | } |
155 | | // We should release agg_places here, because they are not released when a |
156 | | // load is canceled. |
157 | 18.4E | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { |
158 | 0 | auto function = _agg_functions[i]; |
159 | 0 | DCHECK(function != nullptr); |
160 | 0 | function->destroy((*it)->agg_places(i)); |
161 | 0 | } |
162 | 18.4E | } |
163 | 144k | } |
164 | | // Arena has to be destroyed after agg state, because some agg state's memory may be |
165 | | // allocated in arena. |
166 | 460k | _arena.reset(); |
167 | 460k | _vec_row_comparator.reset(); |
168 | 460k | _row_in_blocks.clear(); |
169 | 460k | _agg_functions.clear(); |
170 | 460k | _input_mutable_block.clear(); |
171 | 460k | _output_mutable_block.clear(); |
172 | 460k | } |
173 | 460k | if (_is_flush_success) { |
174 | | // If the memtable is flush success, then its memtracker's consumption should be 0 |
175 | 101k | if (_mem_tracker->consumption() != 0 && config::crash_in_memory_tracker_inaccurate) { |
176 | 0 | LOG(FATAL) << "memtable flush success but cosumption is not 0, it is " |
177 | 0 | << _mem_tracker->consumption(); |
178 | 0 | } |
179 | 101k | } |
180 | 460k | } |
181 | | |
182 | 613k | int RowInBlockComparator::operator()(const RowInBlock* left, const RowInBlock* right) const { |
183 | 613k | return _pblock->compare_at(left->_row_pos, right->_row_pos, _tablet_schema->num_key_columns(), |
184 | 613k | *_pblock, -1); |
185 | 613k | } |
186 | | |
187 | | Status MemTable::insert(const vectorized::Block* input_block, |
188 | 197k | const std::vector<uint32_t>& row_idxs) { |
189 | 197k | SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); |
190 | | |
191 | 197k | if (_is_first_insertion) { |
192 | 102k | _is_first_insertion = false; |
193 | 102k | auto clone_block = input_block->clone_without_columns(&_column_offset); |
194 | 102k | _input_mutable_block = vectorized::MutableBlock::build_mutable_block(&clone_block); |
195 | 102k | _vec_row_comparator->set_block(&_input_mutable_block); |
196 | 102k | _output_mutable_block = vectorized::MutableBlock::build_mutable_block(&clone_block); |
197 | 102k | if (_tablet_schema->has_sequence_col()) { |
198 | 1.77k | if (_partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) { |
199 | | // for unique key fixed partial update, sequence column index in block |
200 | | // may be different with the index in `_tablet_schema` |
201 | 916 | for (size_t i = 0; i < clone_block.columns(); i++) { |
202 | 716 | if (clone_block.get_by_position(i).name == SEQUENCE_COL) { |
203 | 52 | _seq_col_idx_in_block = i; |
204 | 52 | break; |
205 | 52 | } |
206 | 716 | } |
207 | 1.51k | } else { |
208 | 1.51k | _seq_col_idx_in_block = _tablet_schema->sequence_col_idx(); |
209 | 1.51k | } |
210 | 1.77k | } |
211 | 102k | if (_partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS && |
212 | 102k | _tablet_schema->has_skip_bitmap_col()) { |
213 | | // init of _skip_bitmap_col_idx and _delete_sign_col_idx must be before _init_agg_functions() |
214 | 520 | _skip_bitmap_col_idx = _tablet_schema->skip_bitmap_col_idx(); |
215 | 520 | _delete_sign_col_idx = _tablet_schema->delete_sign_idx(); |
216 | 520 | _delete_sign_col_unique_id = _tablet_schema->column(_delete_sign_col_idx).unique_id(); |
217 | 520 | if (_seq_col_idx_in_block != -1) { |
218 | 50 | _seq_col_unique_id = _tablet_schema->column(_seq_col_idx_in_block).unique_id(); |
219 | 50 | } |
220 | 520 | } |
221 | 102k | if (_keys_type != KeysType::DUP_KEYS) { |
222 | | // there may be additional intermediate columns in input_block |
223 | | // we only need columns indicated by column offset in the output |
224 | 52.0k | RETURN_IF_CATCH_EXCEPTION(_init_agg_functions(&clone_block)); |
225 | 52.0k | } |
226 | 102k | } |
227 | | |
228 | 197k | auto num_rows = row_idxs.size(); |
229 | 197k | size_t cursor_in_mutableblock = _input_mutable_block.rows(); |
230 | 197k | RETURN_IF_ERROR(_input_mutable_block.add_rows(input_block, row_idxs.data(), |
231 | 197k | row_idxs.data() + num_rows, &_column_offset)); |
232 | 87.8M | for (int i = 0; i < num_rows; i++) { |
233 | 87.6M | _row_in_blocks.emplace_back(std::make_shared<RowInBlock>(cursor_in_mutableblock + i)); |
234 | 87.6M | } |
235 | | |
236 | 197k | _stat.raw_rows += num_rows; |
237 | 197k | return Status::OK(); |
238 | 197k | } |
239 | | |
240 | | template <bool has_skip_bitmap_col> |
241 | | void MemTable::_aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, |
242 | 175k | RowInBlock* src_row, RowInBlock* dst_row) { |
243 | | // for flexible partial update, the caller must guarantees that either src_row and dst_row |
244 | | // both specify the sequence column, or src_row and dst_row both don't specify the |
245 | | // sequence column |
246 | 175k | if (_tablet_schema->has_sequence_col() && _seq_col_idx_in_block >= 0) { |
247 | 189 | DCHECK_LT(_seq_col_idx_in_block, mutable_block.columns()); |
248 | 189 | auto col_ptr = mutable_block.mutable_columns()[_seq_col_idx_in_block].get(); |
249 | 189 | auto res = col_ptr->compare_at(dst_row->_row_pos, src_row->_row_pos, *col_ptr, -1); |
250 | | // dst sequence column larger than src, don't need to update |
251 | 189 | if (res > 0) { |
252 | 103 | return; |
253 | 103 | } |
254 | | // need to update the row pos in dst row to the src row pos when has |
255 | | // sequence column |
256 | 86 | dst_row->_row_pos = src_row->_row_pos; |
257 | 86 | } |
258 | | // dst is non-sequence row, or dst sequence is smaller |
259 | 175k | if constexpr (!has_skip_bitmap_col) { |
260 | 7.91k | DCHECK(_skip_bitmap_col_idx == -1); |
261 | 723k | for (uint32_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) { |
262 | 555k | auto* col_ptr = mutable_block.mutable_columns()[cid].get(); |
263 | 555k | _agg_functions[cid]->add(dst_row->agg_places(cid), |
264 | 555k | const_cast<const doris::vectorized::IColumn**>(&col_ptr), |
265 | 555k | src_row->_row_pos, _arena.get()); |
266 | 555k | } |
267 | 167k | } else { |
268 | 7.91k | DCHECK(_skip_bitmap_col_idx != -1); |
269 | 7.91k | DCHECK_LT(_skip_bitmap_col_idx, mutable_block.columns()); |
270 | 7.91k | const BitmapValue& skip_bitmap = |
271 | 7.91k | assert_cast<vectorized::ColumnBitmap*, TypeCheckOnRelease::DISABLE>( |
272 | 7.91k | mutable_block.mutable_columns()[_skip_bitmap_col_idx].get()) |
273 | 7.91k | ->get_data()[src_row->_row_pos]; |
274 | 145k | for (uint32_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) { |
275 | 137k | const auto& col = _tablet_schema->column(cid); |
276 | 137k | if (cid != _skip_bitmap_col_idx && skip_bitmap.contains(col.unique_id())) { |
277 | 68.5k | continue; |
278 | 68.5k | } |
279 | 68.8k | auto* col_ptr = mutable_block.mutable_columns()[cid].get(); |
280 | 68.8k | _agg_functions[cid]->add(dst_row->agg_places(cid), |
281 | 68.8k | const_cast<const doris::vectorized::IColumn**>(&col_ptr), |
282 | 68.8k | src_row->_row_pos, _arena.get()); |
283 | 68.8k | } |
284 | 7.91k | } |
285 | 175k | } _ZN5doris8MemTable27_aggregate_two_row_in_blockILb0EEEvRNS_10vectorized12MutableBlockEPNS_10RowInBlockES6_ Line | Count | Source | 242 | 168k | RowInBlock* src_row, RowInBlock* dst_row) { | 243 | | // for flexible partial update, the caller must guarantees that either src_row and dst_row | 244 | | // both specify the sequence column, or src_row and dst_row both don't specify the | 245 | | // sequence column | 246 | 168k | if (_tablet_schema->has_sequence_col() && _seq_col_idx_in_block >= 0) { | 247 | 189 | DCHECK_LT(_seq_col_idx_in_block, mutable_block.columns()); | 248 | 189 | auto col_ptr = mutable_block.mutable_columns()[_seq_col_idx_in_block].get(); | 249 | 189 | auto res = col_ptr->compare_at(dst_row->_row_pos, src_row->_row_pos, *col_ptr, -1); | 250 | | // dst sequence column larger than src, don't need to update | 251 | 189 | if (res > 0) { | 252 | 103 | return; | 253 | 103 | } | 254 | | // need to update the row pos in dst row to the src row pos when has | 255 | | // sequence column | 256 | 86 | dst_row->_row_pos = src_row->_row_pos; | 257 | 86 | } | 258 | | // dst is non-sequence row, or dst sequence is smaller | 259 | 167k | if constexpr (!has_skip_bitmap_col) { | 260 | 167k | DCHECK(_skip_bitmap_col_idx == -1); | 261 | 723k | for (uint32_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) { | 262 | 555k | auto* col_ptr = mutable_block.mutable_columns()[cid].get(); | 263 | 555k | _agg_functions[cid]->add(dst_row->agg_places(cid), | 264 | 555k | const_cast<const doris::vectorized::IColumn**>(&col_ptr), | 265 | 555k | src_row->_row_pos, _arena.get()); | 266 | 555k | } | 267 | 167k | } else { | 268 | 167k | DCHECK(_skip_bitmap_col_idx != -1); | 269 | 167k | DCHECK_LT(_skip_bitmap_col_idx, mutable_block.columns()); | 270 | 167k | const BitmapValue& skip_bitmap = | 271 | 167k | assert_cast<vectorized::ColumnBitmap*, TypeCheckOnRelease::DISABLE>( | 272 | 167k | mutable_block.mutable_columns()[_skip_bitmap_col_idx].get()) | 273 | 167k | ->get_data()[src_row->_row_pos]; | 274 | 167k | for (uint32_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) { | 275 | 167k | const auto& col = _tablet_schema->column(cid); | 276 | 167k | if (cid != _skip_bitmap_col_idx && skip_bitmap.contains(col.unique_id())) { | 277 | 167k | continue; | 278 | 167k | } | 279 | 167k | auto* col_ptr = mutable_block.mutable_columns()[cid].get(); | 280 | 167k | _agg_functions[cid]->add(dst_row->agg_places(cid), | 281 | 167k | const_cast<const doris::vectorized::IColumn**>(&col_ptr), | 282 | 167k | src_row->_row_pos, _arena.get()); | 283 | 167k | } | 284 | 167k | } | 285 | 167k | } |
_ZN5doris8MemTable27_aggregate_two_row_in_blockILb1EEEvRNS_10vectorized12MutableBlockEPNS_10RowInBlockES6_ Line | Count | Source | 242 | 7.91k | RowInBlock* src_row, RowInBlock* dst_row) { | 243 | | // for flexible partial update, the caller must guarantees that either src_row and dst_row | 244 | | // both specify the sequence column, or src_row and dst_row both don't specify the | 245 | | // sequence column | 246 | 7.91k | if (_tablet_schema->has_sequence_col() && _seq_col_idx_in_block >= 0) { | 247 | 0 | DCHECK_LT(_seq_col_idx_in_block, mutable_block.columns()); | 248 | 0 | auto col_ptr = mutable_block.mutable_columns()[_seq_col_idx_in_block].get(); | 249 | 0 | auto res = col_ptr->compare_at(dst_row->_row_pos, src_row->_row_pos, *col_ptr, -1); | 250 | | // dst sequence column larger than src, don't need to update | 251 | 0 | if (res > 0) { | 252 | 0 | return; | 253 | 0 | } | 254 | | // need to update the row pos in dst row to the src row pos when has | 255 | | // sequence column | 256 | 0 | dst_row->_row_pos = src_row->_row_pos; | 257 | 0 | } | 258 | | // dst is non-sequence row, or dst sequence is smaller | 259 | 7.91k | if constexpr (!has_skip_bitmap_col) { | 260 | 7.91k | DCHECK(_skip_bitmap_col_idx == -1); | 261 | 7.91k | for (uint32_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) { | 262 | 7.91k | auto* col_ptr = mutable_block.mutable_columns()[cid].get(); | 263 | 7.91k | _agg_functions[cid]->add(dst_row->agg_places(cid), | 264 | 7.91k | const_cast<const doris::vectorized::IColumn**>(&col_ptr), | 265 | 7.91k | src_row->_row_pos, _arena.get()); | 266 | 7.91k | } | 267 | 7.91k | } else { | 268 | 7.91k | DCHECK(_skip_bitmap_col_idx != -1); | 269 | 7.91k | DCHECK_LT(_skip_bitmap_col_idx, mutable_block.columns()); | 270 | 7.91k | const BitmapValue& skip_bitmap = | 271 | 7.91k | assert_cast<vectorized::ColumnBitmap*, TypeCheckOnRelease::DISABLE>( | 272 | 7.91k | mutable_block.mutable_columns()[_skip_bitmap_col_idx].get()) | 273 | 7.91k | ->get_data()[src_row->_row_pos]; | 274 | 145k | for (uint32_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) { | 275 | 137k | const auto& col = _tablet_schema->column(cid); | 276 | 137k | if (cid != _skip_bitmap_col_idx && skip_bitmap.contains(col.unique_id())) { | 277 | 68.5k | continue; | 278 | 68.5k | } | 279 | 68.8k | auto* col_ptr = mutable_block.mutable_columns()[cid].get(); | 280 | 68.8k | _agg_functions[cid]->add(dst_row->agg_places(cid), | 281 | 68.8k | const_cast<const doris::vectorized::IColumn**>(&col_ptr), | 282 | 68.8k | src_row->_row_pos, _arena.get()); | 283 | 68.8k | } | 284 | 7.91k | } | 285 | 7.91k | } |
|
286 | 98.0k | Status MemTable::_put_into_output(vectorized::Block& in_block) { |
287 | 98.0k | SCOPED_RAW_TIMER(&_stat.put_into_output_ns); |
288 | 98.0k | std::vector<uint32_t> row_pos_vec; |
289 | 98.0k | DCHECK(in_block.rows() <= std::numeric_limits<int>::max()); |
290 | 98.0k | row_pos_vec.reserve(in_block.rows()); |
291 | 86.9M | for (int i = 0; i < _row_in_blocks.size(); i++) { |
292 | 86.8M | row_pos_vec.emplace_back(_row_in_blocks[i]->_row_pos); |
293 | 86.8M | } |
294 | 98.0k | return _output_mutable_block.add_rows(&in_block, row_pos_vec.data(), |
295 | 98.0k | row_pos_vec.data() + in_block.rows()); |
296 | 98.0k | } |
297 | | |
298 | 102k | size_t MemTable::_sort() { |
299 | 102k | SCOPED_RAW_TIMER(&_stat.sort_ns); |
300 | 102k | _stat.sort_times++; |
301 | 102k | size_t same_keys_num = 0; |
302 | | // sort new rows |
303 | 102k | Tie tie = Tie(_last_sorted_pos, _row_in_blocks.size()); |
304 | 437k | for (size_t i = 0; i < _tablet_schema->num_key_columns(); i++) { |
305 | 720M | auto cmp = [&](RowInBlock* lhs, RowInBlock* rhs) -> int { |
306 | 720M | return _input_mutable_block.compare_one_column(lhs->_row_pos, rhs->_row_pos, i, -1); |
307 | 720M | }; |
308 | 335k | _sort_one_column(_row_in_blocks, tie, cmp); |
309 | 335k | } |
310 | 102k | bool is_dup = (_keys_type == KeysType::DUP_KEYS); |
311 | | // sort extra round by _row_pos to make the sort stable |
312 | 102k | auto iter = tie.iter(); |
313 | 606k | while (iter.next()) { |
314 | 504k | pdqsort(std::next(_row_in_blocks.begin(), iter.left()), |
315 | 504k | std::next(_row_in_blocks.begin(), iter.right()), |
316 | 504k | [&is_dup](const std::shared_ptr<RowInBlock>& lhs, |
317 | 131M | const std::shared_ptr<RowInBlock>& rhs) -> bool { |
318 | 131M | return is_dup ? lhs->_row_pos > rhs->_row_pos : lhs->_row_pos < rhs->_row_pos; |
319 | 131M | }); |
320 | 504k | same_keys_num += iter.right() - iter.left(); |
321 | 504k | } |
322 | | // merge new rows and old rows |
323 | 102k | _vec_row_comparator->set_block(&_input_mutable_block); |
324 | 102k | auto cmp_func = [this, is_dup, &same_keys_num](const std::shared_ptr<RowInBlock>& l, |
325 | 102k | const std::shared_ptr<RowInBlock>& r) -> bool { |
326 | 0 | auto value = (*(this->_vec_row_comparator))(l.get(), r.get()); |
327 | 0 | if (value == 0) { |
328 | 0 | same_keys_num++; |
329 | 0 | return is_dup ? l->_row_pos > r->_row_pos : l->_row_pos < r->_row_pos; |
330 | 0 | } else { |
331 | 0 | return value < 0; |
332 | 0 | } |
333 | 0 | }; |
334 | 102k | auto new_row_it = std::next(_row_in_blocks.begin(), _last_sorted_pos); |
335 | 102k | std::inplace_merge(_row_in_blocks.begin(), new_row_it, _row_in_blocks.end(), cmp_func); |
336 | 102k | _last_sorted_pos = _row_in_blocks.size(); |
337 | 102k | return same_keys_num; |
338 | 102k | } |
339 | | |
340 | 1 | Status MemTable::_sort_by_cluster_keys() { |
341 | 1 | SCOPED_RAW_TIMER(&_stat.sort_ns); |
342 | 1 | _stat.sort_times++; |
343 | | // sort all rows |
344 | 1 | vectorized::Block in_block = _output_mutable_block.to_block(); |
345 | 1 | vectorized::MutableBlock mutable_block = |
346 | 1 | vectorized::MutableBlock::build_mutable_block(&in_block); |
347 | 1 | auto clone_block = in_block.clone_without_columns(); |
348 | 1 | _output_mutable_block = vectorized::MutableBlock::build_mutable_block(&clone_block); |
349 | | |
350 | 1 | std::vector<std::shared_ptr<RowInBlock>> row_in_blocks; |
351 | 1 | row_in_blocks.reserve(mutable_block.rows()); |
352 | 5 | for (size_t i = 0; i < mutable_block.rows(); i++) { |
353 | 4 | row_in_blocks.emplace_back(std::make_shared<RowInBlock>(i)); |
354 | 4 | } |
355 | 1 | Tie tie = Tie(0, mutable_block.rows()); |
356 | | |
357 | 2 | for (auto cid : _tablet_schema->cluster_key_idxes()) { |
358 | 2 | auto index = _tablet_schema->field_index(cid); |
359 | 2 | if (index == -1) { |
360 | 0 | return Status::InternalError("could not find cluster key column with unique_id=" + |
361 | 0 | std::to_string(cid) + " in tablet schema"); |
362 | 0 | } |
363 | 8 | auto cmp = [&](const RowInBlock* lhs, const RowInBlock* rhs) -> int { |
364 | 8 | return mutable_block.compare_one_column(lhs->_row_pos, rhs->_row_pos, index, -1); |
365 | 8 | }; |
366 | 2 | _sort_one_column(row_in_blocks, tie, cmp); |
367 | 2 | } |
368 | | |
369 | | // sort extra round by _row_pos to make the sort stable |
370 | 1 | auto iter = tie.iter(); |
371 | 1 | while (iter.next()) { |
372 | 0 | pdqsort(std::next(row_in_blocks.begin(), iter.left()), |
373 | 0 | std::next(row_in_blocks.begin(), iter.right()), |
374 | 0 | [](const std::shared_ptr<RowInBlock>& lhs, const std::shared_ptr<RowInBlock>& rhs) |
375 | 0 | -> bool { return lhs->_row_pos < rhs->_row_pos; }); |
376 | 0 | } |
377 | | |
378 | 1 | in_block = mutable_block.to_block(); |
379 | 1 | SCOPED_RAW_TIMER(&_stat.put_into_output_ns); |
380 | 1 | std::vector<uint32_t> row_pos_vec; |
381 | 1 | DCHECK(in_block.rows() <= std::numeric_limits<int>::max()); |
382 | 1 | row_pos_vec.reserve(in_block.rows()); |
383 | 5 | for (int i = 0; i < row_in_blocks.size(); i++) { |
384 | 4 | row_pos_vec.emplace_back(row_in_blocks[i]->_row_pos); |
385 | 4 | } |
386 | 1 | return _output_mutable_block.add_rows(&in_block, row_pos_vec.data(), |
387 | 1 | row_pos_vec.data() + in_block.rows(), &_column_offset); |
388 | 1 | } |
389 | | |
390 | | void MemTable::_sort_one_column(std::vector<std::shared_ptr<RowInBlock>>& row_in_blocks, Tie& tie, |
391 | 335k | std::function<int(RowInBlock*, RowInBlock*)> cmp) { |
392 | 335k | auto iter = tie.iter(); |
393 | 10.4M | while (iter.next()) { |
394 | 10.1M | pdqsort(std::next(row_in_blocks.begin(), static_cast<int>(iter.left())), |
395 | 10.1M | std::next(row_in_blocks.begin(), static_cast<int>(iter.right())), |
396 | 625M | [&cmp](auto lhs, auto rhs) -> bool { return cmp(lhs.get(), rhs.get()) < 0; }); |
397 | 10.1M | tie[iter.left()] = 0; |
398 | 134M | for (auto i = iter.left() + 1; i < iter.right(); i++) { |
399 | 124M | tie[i] = (cmp(row_in_blocks[i - 1].get(), row_in_blocks[i].get()) == 0); |
400 | 124M | } |
401 | 10.1M | } |
402 | 335k | } |
403 | | |
404 | | template <bool is_final> |
405 | | void MemTable::_finalize_one_row(RowInBlock* row, |
406 | | const vectorized::ColumnsWithTypeAndName& block_data, |
407 | 442k | int row_pos) { |
408 | | // move key columns |
409 | 1.29M | for (size_t i = 0; i < _tablet_schema->num_key_columns(); ++i) { |
410 | 848k | _output_mutable_block.get_column_by_position(i)->insert_from(*block_data[i].column.get(), |
411 | 848k | row->_row_pos); |
412 | 848k | } |
413 | 442k | if (row->has_init_agg()) { |
414 | | // get value columns from agg_places |
415 | 158k | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { |
416 | 143k | auto function = _agg_functions[i]; |
417 | 143k | auto* agg_place = row->agg_places(i); |
418 | 143k | auto* col_ptr = _output_mutable_block.get_column_by_position(i).get(); |
419 | 143k | function->insert_result_into(agg_place, *col_ptr); |
420 | | |
421 | 143k | if constexpr (is_final) { |
422 | 0 | function->destroy(agg_place); |
423 | 0 | } else { |
424 | 0 | function->reset(agg_place); |
425 | 0 | } |
426 | 143k | } |
427 | | |
428 | 14.8k | if constexpr (is_final) { |
429 | 0 | row->remove_init_agg(); |
430 | 0 | } else { |
431 | 0 | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { |
432 | 0 | auto function = _agg_functions[i]; |
433 | 0 | auto* agg_place = row->agg_places(i); |
434 | 0 | auto* col_ptr = _output_mutable_block.get_column_by_position(i).get(); |
435 | 0 | function->add(agg_place, const_cast<const doris::vectorized::IColumn**>(&col_ptr), |
436 | 0 | row_pos, _arena.get()); |
437 | 0 | } |
438 | 0 | } |
439 | 427k | } else { |
440 | | // move columns for rows do not need agg |
441 | 7.44M | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { |
442 | 7.01M | _output_mutable_block.get_column_by_position(i)->insert_from( |
443 | 7.01M | *block_data[i].column.get(), row->_row_pos); |
444 | 7.01M | } |
445 | 427k | } |
446 | 442k | if constexpr (!is_final) { |
447 | 0 | row->_row_pos = row_pos; |
448 | 0 | } |
449 | 442k | } Unexecuted instantiation: _ZN5doris8MemTable17_finalize_one_rowILb0EEEvPNS_10RowInBlockERKSt6vectorINS_10vectorized21ColumnWithTypeAndNameESaIS6_EEi _ZN5doris8MemTable17_finalize_one_rowILb1EEEvPNS_10RowInBlockERKSt6vectorINS_10vectorized21ColumnWithTypeAndNameESaIS6_EEi Line | Count | Source | 407 | 442k | int row_pos) { | 408 | | // move key columns | 409 | 1.29M | for (size_t i = 0; i < _tablet_schema->num_key_columns(); ++i) { | 410 | 848k | _output_mutable_block.get_column_by_position(i)->insert_from(*block_data[i].column.get(), | 411 | 848k | row->_row_pos); | 412 | 848k | } | 413 | 442k | if (row->has_init_agg()) { | 414 | | // get value columns from agg_places | 415 | 158k | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { | 416 | 143k | auto function = _agg_functions[i]; | 417 | 143k | auto* agg_place = row->agg_places(i); | 418 | 143k | auto* col_ptr = _output_mutable_block.get_column_by_position(i).get(); | 419 | 143k | function->insert_result_into(agg_place, *col_ptr); | 420 | | | 421 | 143k | if constexpr (is_final) { | 422 | 143k | function->destroy(agg_place); | 423 | 143k | } else { | 424 | 143k | function->reset(agg_place); | 425 | 143k | } | 426 | 143k | } | 427 | | | 428 | 14.8k | if constexpr (is_final) { | 429 | 14.8k | row->remove_init_agg(); | 430 | 14.8k | } else { | 431 | 14.8k | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { | 432 | 14.8k | auto function = _agg_functions[i]; | 433 | 14.8k | auto* agg_place = row->agg_places(i); | 434 | 14.8k | auto* col_ptr = _output_mutable_block.get_column_by_position(i).get(); | 435 | 14.8k | function->add(agg_place, const_cast<const doris::vectorized::IColumn**>(&col_ptr), | 436 | 14.8k | row_pos, _arena.get()); | 437 | 14.8k | } | 438 | 14.8k | } | 439 | 427k | } else { | 440 | | // move columns for rows do not need agg | 441 | 7.44M | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { | 442 | 7.01M | _output_mutable_block.get_column_by_position(i)->insert_from( | 443 | 7.01M | *block_data[i].column.get(), row->_row_pos); | 444 | 7.01M | } | 445 | 427k | } | 446 | 442k | if constexpr (!is_final) { | 447 | 442k | row->_row_pos = row_pos; | 448 | 442k | } | 449 | 442k | } |
|
450 | | |
451 | 14.8k | void MemTable::_init_row_for_agg(RowInBlock* row, vectorized::MutableBlock& mutable_block) { |
452 | 14.8k | row->init_agg_places(_arena->aligned_alloc(_total_size_of_aggregate_states, 16), |
453 | 14.8k | _offsets_of_aggregate_states.data()); |
454 | 158k | for (auto cid = _tablet_schema->num_key_columns(); cid < _num_columns; cid++) { |
455 | 143k | auto* col_ptr = mutable_block.mutable_columns()[cid].get(); |
456 | 143k | auto* data = row->agg_places(cid); |
457 | 143k | _agg_functions[cid]->create(data); |
458 | 143k | _agg_functions[cid]->add(data, const_cast<const doris::vectorized::IColumn**>(&col_ptr), |
459 | 143k | row->_row_pos, _arena.get()); |
460 | 143k | } |
461 | 14.8k | } |
462 | 26 | void MemTable::_clear_row_agg(RowInBlock* row) { |
463 | 26 | if (row->has_init_agg()) { |
464 | 108 | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { |
465 | 96 | auto function = _agg_functions[i]; |
466 | 96 | auto* agg_place = row->agg_places(i); |
467 | 96 | function->destroy(agg_place); |
468 | 96 | } |
469 | 12 | row->remove_init_agg(); |
470 | 12 | } |
471 | 26 | } |
472 | | |
473 | | template <bool is_final, bool has_skip_bitmap_col> |
474 | 3.78k | void MemTable::_aggregate() { |
475 | 3.78k | SCOPED_RAW_TIMER(&_stat.agg_ns); |
476 | 3.78k | _stat.agg_times++; |
477 | 3.78k | vectorized::Block in_block = _input_mutable_block.to_block(); |
478 | 3.78k | vectorized::MutableBlock mutable_block = |
479 | 3.78k | vectorized::MutableBlock::build_mutable_block(&in_block); |
480 | 3.78k | _vec_row_comparator->set_block(&mutable_block); |
481 | 3.78k | auto& block_data = in_block.get_columns_with_type_and_name(); |
482 | 3.78k | std::vector<std::shared_ptr<RowInBlock>> temp_row_in_blocks; |
483 | 3.78k | temp_row_in_blocks.reserve(_last_sorted_pos); |
484 | | //only init agg if needed |
485 | | |
486 | 3.78k | if constexpr (!has_skip_bitmap_col) { |
487 | 340 | RowInBlock* prev_row = nullptr; |
488 | 340 | int row_pos = -1; |
489 | 606k | for (const auto& cur_row_ptr : _row_in_blocks) { |
490 | 606k | RowInBlock* cur_row = cur_row_ptr.get(); |
491 | 606k | if (!temp_row_in_blocks.empty() && (*_vec_row_comparator)(prev_row, cur_row) == 0) { |
492 | 168k | if (!prev_row->has_init_agg()) { |
493 | 12.0k | _init_row_for_agg(prev_row, mutable_block); |
494 | 12.0k | } |
495 | 168k | _stat.merged_rows++; |
496 | 168k | _aggregate_two_row_in_block<has_skip_bitmap_col>(mutable_block, cur_row, prev_row); |
497 | 438k | } else { |
498 | 438k | prev_row = cur_row; |
499 | 438k | if (!temp_row_in_blocks.empty()) { |
500 | | // no more rows to merge for prev row, finalize it |
501 | 434k | _finalize_one_row<is_final>(temp_row_in_blocks.back().get(), block_data, |
502 | 434k | row_pos); |
503 | 434k | } |
504 | 438k | temp_row_in_blocks.push_back(cur_row_ptr); |
505 | 438k | row_pos++; |
506 | 438k | } |
507 | 606k | } |
508 | 3.44k | if (!temp_row_in_blocks.empty()) { |
509 | | // finalize the last low |
510 | 3.44k | _finalize_one_row<is_final>(temp_row_in_blocks.back().get(), block_data, row_pos); |
511 | 3.44k | } |
512 | 3.37k | } else { |
513 | 340 | DCHECK(_delete_sign_col_idx != -1); |
514 | 340 | if (_seq_col_idx_in_block == -1) { |
515 | 310 | _aggregate_for_flexible_partial_update_without_seq_col<is_final>( |
516 | 310 | block_data, mutable_block, temp_row_in_blocks); |
517 | 310 | } else { |
518 | 30 | _aggregate_for_flexible_partial_update_with_seq_col<is_final>(block_data, mutable_block, |
519 | 30 | temp_row_in_blocks); |
520 | 30 | } |
521 | 340 | } |
522 | 3.78k | if constexpr (!is_final) { |
523 | | // if is not final, we collect the agg results to input_block and then continue to insert |
524 | 0 | _input_mutable_block.swap(_output_mutable_block); |
525 | | //TODO(weixang):opt here. |
526 | 0 | std::unique_ptr<vectorized::Block> empty_input_block = in_block.create_same_struct_block(0); |
527 | 0 | _output_mutable_block = |
528 | 0 | vectorized::MutableBlock::build_mutable_block(empty_input_block.get()); |
529 | 0 | _output_mutable_block.clear_column_data(); |
530 | 0 | _row_in_blocks = temp_row_in_blocks; |
531 | 0 | _last_sorted_pos = _row_in_blocks.size(); |
532 | 0 | } |
533 | 3.78k | } Unexecuted instantiation: _ZN5doris8MemTable10_aggregateILb0ELb0EEEvv Unexecuted instantiation: _ZN5doris8MemTable10_aggregateILb0ELb1EEEvv _ZN5doris8MemTable10_aggregateILb1ELb0EEEvv Line | Count | Source | 474 | 3.44k | void MemTable::_aggregate() { | 475 | 3.44k | SCOPED_RAW_TIMER(&_stat.agg_ns); | 476 | 3.44k | _stat.agg_times++; | 477 | 3.44k | vectorized::Block in_block = _input_mutable_block.to_block(); | 478 | 3.44k | vectorized::MutableBlock mutable_block = | 479 | 3.44k | vectorized::MutableBlock::build_mutable_block(&in_block); | 480 | 3.44k | _vec_row_comparator->set_block(&mutable_block); | 481 | 3.44k | auto& block_data = in_block.get_columns_with_type_and_name(); | 482 | 3.44k | std::vector<std::shared_ptr<RowInBlock>> temp_row_in_blocks; | 483 | 3.44k | temp_row_in_blocks.reserve(_last_sorted_pos); | 484 | | //only init agg if needed | 485 | | | 486 | 3.44k | if constexpr (!has_skip_bitmap_col) { | 487 | 3.37k | RowInBlock* prev_row = nullptr; | 488 | 3.37k | int row_pos = -1; | 489 | 606k | for (const auto& cur_row_ptr : _row_in_blocks) { | 490 | 606k | RowInBlock* cur_row = cur_row_ptr.get(); | 491 | 606k | if (!temp_row_in_blocks.empty() && (*_vec_row_comparator)(prev_row, cur_row) == 0) { | 492 | 168k | if (!prev_row->has_init_agg()) { | 493 | 12.0k | _init_row_for_agg(prev_row, mutable_block); | 494 | 12.0k | } | 495 | 168k | _stat.merged_rows++; | 496 | 168k | _aggregate_two_row_in_block<has_skip_bitmap_col>(mutable_block, cur_row, prev_row); | 497 | 438k | } else { | 498 | 438k | prev_row = cur_row; | 499 | 438k | if (!temp_row_in_blocks.empty()) { | 500 | | // no more rows to merge for prev row, finalize it | 501 | 434k | _finalize_one_row<is_final>(temp_row_in_blocks.back().get(), block_data, | 502 | 434k | row_pos); | 503 | 434k | } | 504 | 438k | temp_row_in_blocks.push_back(cur_row_ptr); | 505 | 438k | row_pos++; | 506 | 438k | } | 507 | 606k | } | 508 | 3.44k | if (!temp_row_in_blocks.empty()) { | 509 | | // finalize the last low | 510 | 3.44k | _finalize_one_row<is_final>(temp_row_in_blocks.back().get(), block_data, row_pos); | 511 | 3.44k | } | 512 | 3.37k | } else { | 513 | 3.44k | DCHECK(_delete_sign_col_idx != -1); | 514 | 3.44k | if (_seq_col_idx_in_block == -1) { | 515 | 3.44k | _aggregate_for_flexible_partial_update_without_seq_col<is_final>( | 516 | 3.44k | block_data, mutable_block, temp_row_in_blocks); | 517 | 3.44k | } else { | 518 | 3.44k | _aggregate_for_flexible_partial_update_with_seq_col<is_final>(block_data, mutable_block, | 519 | 3.44k | temp_row_in_blocks); | 520 | 3.44k | } | 521 | 3.44k | } | 522 | 3.44k | if constexpr (!is_final) { | 523 | | // if is not final, we collect the agg results to input_block and then continue to insert | 524 | 3.44k | _input_mutable_block.swap(_output_mutable_block); | 525 | | //TODO(weixang):opt here. | 526 | 3.44k | std::unique_ptr<vectorized::Block> empty_input_block = in_block.create_same_struct_block(0); | 527 | 3.44k | _output_mutable_block = | 528 | 3.44k | vectorized::MutableBlock::build_mutable_block(empty_input_block.get()); | 529 | 3.44k | _output_mutable_block.clear_column_data(); | 530 | 3.44k | _row_in_blocks = temp_row_in_blocks; | 531 | 3.44k | _last_sorted_pos = _row_in_blocks.size(); | 532 | 3.44k | } | 533 | 3.44k | } |
_ZN5doris8MemTable10_aggregateILb1ELb1EEEvv Line | Count | Source | 474 | 340 | void MemTable::_aggregate() { | 475 | 340 | SCOPED_RAW_TIMER(&_stat.agg_ns); | 476 | 340 | _stat.agg_times++; | 477 | 340 | vectorized::Block in_block = _input_mutable_block.to_block(); | 478 | 340 | vectorized::MutableBlock mutable_block = | 479 | 340 | vectorized::MutableBlock::build_mutable_block(&in_block); | 480 | 340 | _vec_row_comparator->set_block(&mutable_block); | 481 | 340 | auto& block_data = in_block.get_columns_with_type_and_name(); | 482 | 340 | std::vector<std::shared_ptr<RowInBlock>> temp_row_in_blocks; | 483 | 340 | temp_row_in_blocks.reserve(_last_sorted_pos); | 484 | | //only init agg if needed | 485 | | | 486 | 340 | if constexpr (!has_skip_bitmap_col) { | 487 | 340 | RowInBlock* prev_row = nullptr; | 488 | 340 | int row_pos = -1; | 489 | 340 | for (const auto& cur_row_ptr : _row_in_blocks) { | 490 | 340 | RowInBlock* cur_row = cur_row_ptr.get(); | 491 | 340 | if (!temp_row_in_blocks.empty() && (*_vec_row_comparator)(prev_row, cur_row) == 0) { | 492 | 340 | if (!prev_row->has_init_agg()) { | 493 | 340 | _init_row_for_agg(prev_row, mutable_block); | 494 | 340 | } | 495 | 340 | _stat.merged_rows++; | 496 | 340 | _aggregate_two_row_in_block<has_skip_bitmap_col>(mutable_block, cur_row, prev_row); | 497 | 340 | } else { | 498 | 340 | prev_row = cur_row; | 499 | 340 | if (!temp_row_in_blocks.empty()) { | 500 | | // no more rows to merge for prev row, finalize it | 501 | 340 | _finalize_one_row<is_final>(temp_row_in_blocks.back().get(), block_data, | 502 | 340 | row_pos); | 503 | 340 | } | 504 | 340 | temp_row_in_blocks.push_back(cur_row_ptr); | 505 | 340 | row_pos++; | 506 | 340 | } | 507 | 340 | } | 508 | 340 | if (!temp_row_in_blocks.empty()) { | 509 | | // finalize the last low | 510 | 340 | _finalize_one_row<is_final>(temp_row_in_blocks.back().get(), block_data, row_pos); | 511 | 340 | } | 512 | 340 | } else { | 513 | 340 | DCHECK(_delete_sign_col_idx != -1); | 514 | 340 | if (_seq_col_idx_in_block == -1) { | 515 | 310 | _aggregate_for_flexible_partial_update_without_seq_col<is_final>( | 516 | 310 | block_data, mutable_block, temp_row_in_blocks); | 517 | 310 | } else { | 518 | 30 | _aggregate_for_flexible_partial_update_with_seq_col<is_final>(block_data, mutable_block, | 519 | 30 | temp_row_in_blocks); | 520 | 30 | } | 521 | 340 | } | 522 | 340 | if constexpr (!is_final) { | 523 | | // if is not final, we collect the agg results to input_block and then continue to insert | 524 | 340 | _input_mutable_block.swap(_output_mutable_block); | 525 | | //TODO(weixang):opt here. | 526 | 340 | std::unique_ptr<vectorized::Block> empty_input_block = in_block.create_same_struct_block(0); | 527 | 340 | _output_mutable_block = | 528 | 340 | vectorized::MutableBlock::build_mutable_block(empty_input_block.get()); | 529 | 340 | _output_mutable_block.clear_column_data(); | 530 | 340 | _row_in_blocks = temp_row_in_blocks; | 531 | 340 | _last_sorted_pos = _row_in_blocks.size(); | 532 | 340 | } | 533 | 340 | } |
|
534 | | |
535 | | template <bool is_final> |
536 | | void MemTable::_aggregate_for_flexible_partial_update_without_seq_col( |
537 | | const vectorized::ColumnsWithTypeAndName& block_data, |
538 | | vectorized::MutableBlock& mutable_block, |
539 | 310 | std::vector<std::shared_ptr<RowInBlock>>& temp_row_in_blocks) { |
540 | 310 | std::shared_ptr<RowInBlock> prev_row {nullptr}; |
541 | 310 | int row_pos = -1; |
542 | 310 | auto& skip_bitmaps = assert_cast<vectorized::ColumnBitmap*>( |
543 | 310 | mutable_block.mutable_columns()[_skip_bitmap_col_idx].get()) |
544 | 310 | ->get_data(); |
545 | 310 | auto& delete_signs = assert_cast<vectorized::ColumnInt8*>( |
546 | 310 | mutable_block.mutable_columns()[_delete_sign_col_idx].get()) |
547 | 310 | ->get_data(); |
548 | 310 | std::shared_ptr<RowInBlock> row_with_delete_sign {nullptr}; |
549 | 310 | std::shared_ptr<RowInBlock> row_without_delete_sign {nullptr}; |
550 | | |
551 | 3.76k | auto finalize_rows = [&]() { |
552 | 3.76k | if (row_with_delete_sign != nullptr) { |
553 | 28 | temp_row_in_blocks.push_back(row_with_delete_sign); |
554 | 28 | _finalize_one_row<is_final>(row_with_delete_sign.get(), block_data, ++row_pos); |
555 | 28 | row_with_delete_sign = nullptr; |
556 | 28 | } |
557 | 3.76k | if (row_without_delete_sign != nullptr) { |
558 | 3.43k | temp_row_in_blocks.push_back(row_without_delete_sign); |
559 | 3.43k | _finalize_one_row<is_final>(row_without_delete_sign.get(), block_data, ++row_pos); |
560 | 3.43k | row_without_delete_sign = nullptr; |
561 | 3.43k | } |
562 | | // _arena.clear(); |
563 | 3.76k | }; Unexecuted instantiation: _ZZN5doris8MemTable54_aggregate_for_flexible_partial_update_without_seq_colILb0EEEvRKSt6vectorINS_10vectorized21ColumnWithTypeAndNameESaIS4_EERNS3_12MutableBlockERS2_ISt10shared_ptrINS_10RowInBlockEESaISD_EEENKUlvE_clEv _ZZN5doris8MemTable54_aggregate_for_flexible_partial_update_without_seq_colILb1EEEvRKSt6vectorINS_10vectorized21ColumnWithTypeAndNameESaIS4_EERNS3_12MutableBlockERS2_ISt10shared_ptrINS_10RowInBlockEESaISD_EEENKUlvE_clEv Line | Count | Source | 551 | 3.76k | auto finalize_rows = [&]() { | 552 | 3.76k | if (row_with_delete_sign != nullptr) { | 553 | 28 | temp_row_in_blocks.push_back(row_with_delete_sign); | 554 | 28 | _finalize_one_row<is_final>(row_with_delete_sign.get(), block_data, ++row_pos); | 555 | 28 | row_with_delete_sign = nullptr; | 556 | 28 | } | 557 | 3.76k | if (row_without_delete_sign != nullptr) { | 558 | 3.43k | temp_row_in_blocks.push_back(row_without_delete_sign); | 559 | 3.43k | _finalize_one_row<is_final>(row_without_delete_sign.get(), block_data, ++row_pos); | 560 | 3.43k | row_without_delete_sign = nullptr; | 561 | 3.43k | } | 562 | | // _arena.clear(); | 563 | 3.76k | }; |
|
564 | | |
565 | 3.48k | auto add_row = [&](std::shared_ptr<RowInBlock> row, bool with_delete_sign) { |
566 | 3.48k | if (with_delete_sign) { |
567 | 28 | row_with_delete_sign = std::move(row); |
568 | 3.45k | } else { |
569 | 3.45k | row_without_delete_sign = std::move(row); |
570 | 3.45k | } |
571 | 3.48k | }; Unexecuted instantiation: _ZZN5doris8MemTable54_aggregate_for_flexible_partial_update_without_seq_colILb0EEEvRKSt6vectorINS_10vectorized21ColumnWithTypeAndNameESaIS4_EERNS3_12MutableBlockERS2_ISt10shared_ptrINS_10RowInBlockEESaISD_EEENKUlSD_bE_clESD_b _ZZN5doris8MemTable54_aggregate_for_flexible_partial_update_without_seq_colILb1EEEvRKSt6vectorINS_10vectorized21ColumnWithTypeAndNameESaIS4_EERNS3_12MutableBlockERS2_ISt10shared_ptrINS_10RowInBlockEESaISD_EEENKUlSD_bE_clESD_b Line | Count | Source | 565 | 3.48k | auto add_row = [&](std::shared_ptr<RowInBlock> row, bool with_delete_sign) { | 566 | 3.48k | if (with_delete_sign) { | 567 | 28 | row_with_delete_sign = std::move(row); | 568 | 3.45k | } else { | 569 | 3.45k | row_without_delete_sign = std::move(row); | 570 | 3.45k | } | 571 | 3.48k | }; |
|
572 | 11.4k | for (const auto& cur_row_ptr : _row_in_blocks) { |
573 | 11.4k | RowInBlock* cur_row = cur_row_ptr.get(); |
574 | 11.4k | const BitmapValue& skip_bitmap = skip_bitmaps[cur_row->_row_pos]; |
575 | 11.4k | bool cur_row_has_delete_sign = (!skip_bitmap.contains(_delete_sign_col_unique_id) && |
576 | 11.4k | delete_signs[cur_row->_row_pos] != 0); |
577 | 11.4k | prev_row = |
578 | 11.4k | (row_with_delete_sign == nullptr) ? row_without_delete_sign : row_with_delete_sign; |
579 | | // compare keys, the keys of row_with_delete_sign and row_without_delete_sign is the same, |
580 | | // choose any of them if it's valid |
581 | 11.4k | if (prev_row != nullptr && (*_vec_row_comparator)(prev_row.get(), cur_row) == 0) { |
582 | 7.95k | if (cur_row_has_delete_sign) { |
583 | 48 | if (row_without_delete_sign != nullptr) { |
584 | | // if there exits row without delete sign, remove it first |
585 | 26 | _clear_row_agg(row_without_delete_sign.get()); |
586 | 26 | _stat.merged_rows++; |
587 | 26 | row_without_delete_sign = nullptr; |
588 | 26 | } |
589 | | // and then unconditionally replace the previous row |
590 | 48 | prev_row = row_with_delete_sign; |
591 | 7.90k | } else { |
592 | 7.90k | prev_row = row_without_delete_sign; |
593 | 7.90k | } |
594 | | |
595 | 7.95k | if (prev_row == nullptr) { |
596 | 34 | add_row(cur_row_ptr, cur_row_has_delete_sign); |
597 | 7.91k | } else { |
598 | 7.91k | if (!prev_row->has_init_agg()) { |
599 | 2.86k | _init_row_for_agg(prev_row.get(), mutable_block); |
600 | 2.86k | } |
601 | 7.91k | _stat.merged_rows++; |
602 | 7.91k | _aggregate_two_row_in_block<true>(mutable_block, cur_row, prev_row.get()); |
603 | 7.91k | } |
604 | 7.95k | } else { |
605 | 3.44k | finalize_rows(); |
606 | 3.44k | add_row(cur_row_ptr, cur_row_has_delete_sign); |
607 | 3.44k | } |
608 | 11.4k | } |
609 | | // finalize the last lows |
610 | 310 | finalize_rows(); |
611 | 310 | } Unexecuted instantiation: _ZN5doris8MemTable54_aggregate_for_flexible_partial_update_without_seq_colILb0EEEvRKSt6vectorINS_10vectorized21ColumnWithTypeAndNameESaIS4_EERNS3_12MutableBlockERS2_ISt10shared_ptrINS_10RowInBlockEESaISD_EE _ZN5doris8MemTable54_aggregate_for_flexible_partial_update_without_seq_colILb1EEEvRKSt6vectorINS_10vectorized21ColumnWithTypeAndNameESaIS4_EERNS3_12MutableBlockERS2_ISt10shared_ptrINS_10RowInBlockEESaISD_EE Line | Count | Source | 539 | 310 | std::vector<std::shared_ptr<RowInBlock>>& temp_row_in_blocks) { | 540 | 310 | std::shared_ptr<RowInBlock> prev_row {nullptr}; | 541 | 310 | int row_pos = -1; | 542 | 310 | auto& skip_bitmaps = assert_cast<vectorized::ColumnBitmap*>( | 543 | 310 | mutable_block.mutable_columns()[_skip_bitmap_col_idx].get()) | 544 | 310 | ->get_data(); | 545 | 310 | auto& delete_signs = assert_cast<vectorized::ColumnInt8*>( | 546 | 310 | mutable_block.mutable_columns()[_delete_sign_col_idx].get()) | 547 | 310 | ->get_data(); | 548 | 310 | std::shared_ptr<RowInBlock> row_with_delete_sign {nullptr}; | 549 | 310 | std::shared_ptr<RowInBlock> row_without_delete_sign {nullptr}; | 550 | | | 551 | 310 | auto finalize_rows = [&]() { | 552 | 310 | if (row_with_delete_sign != nullptr) { | 553 | 310 | temp_row_in_blocks.push_back(row_with_delete_sign); | 554 | 310 | _finalize_one_row<is_final>(row_with_delete_sign.get(), block_data, ++row_pos); | 555 | 310 | row_with_delete_sign = nullptr; | 556 | 310 | } | 557 | 310 | if (row_without_delete_sign != nullptr) { | 558 | 310 | temp_row_in_blocks.push_back(row_without_delete_sign); | 559 | 310 | _finalize_one_row<is_final>(row_without_delete_sign.get(), block_data, ++row_pos); | 560 | 310 | row_without_delete_sign = nullptr; | 561 | 310 | } | 562 | | // _arena.clear(); | 563 | 310 | }; | 564 | | | 565 | 310 | auto add_row = [&](std::shared_ptr<RowInBlock> row, bool with_delete_sign) { | 566 | 310 | if (with_delete_sign) { | 567 | 310 | row_with_delete_sign = std::move(row); | 568 | 310 | } else { | 569 | 310 | row_without_delete_sign = std::move(row); | 570 | 310 | } | 571 | 310 | }; | 572 | 11.4k | for (const auto& cur_row_ptr : _row_in_blocks) { | 573 | 11.4k | RowInBlock* cur_row = cur_row_ptr.get(); | 574 | 11.4k | const BitmapValue& skip_bitmap = skip_bitmaps[cur_row->_row_pos]; | 575 | 11.4k | bool cur_row_has_delete_sign = (!skip_bitmap.contains(_delete_sign_col_unique_id) && | 576 | 11.4k | delete_signs[cur_row->_row_pos] != 0); | 577 | 11.4k | prev_row = | 578 | 11.4k | (row_with_delete_sign == nullptr) ? row_without_delete_sign : row_with_delete_sign; | 579 | | // compare keys, the keys of row_with_delete_sign and row_without_delete_sign is the same, | 580 | | // choose any of them if it's valid | 581 | 11.4k | if (prev_row != nullptr && (*_vec_row_comparator)(prev_row.get(), cur_row) == 0) { | 582 | 7.95k | if (cur_row_has_delete_sign) { | 583 | 48 | if (row_without_delete_sign != nullptr) { | 584 | | // if there exits row without delete sign, remove it first | 585 | 26 | _clear_row_agg(row_without_delete_sign.get()); | 586 | 26 | _stat.merged_rows++; | 587 | 26 | row_without_delete_sign = nullptr; | 588 | 26 | } | 589 | | // and then unconditionally replace the previous row | 590 | 48 | prev_row = row_with_delete_sign; | 591 | 7.90k | } else { | 592 | 7.90k | prev_row = row_without_delete_sign; | 593 | 7.90k | } | 594 | | | 595 | 7.95k | if (prev_row == nullptr) { | 596 | 34 | add_row(cur_row_ptr, cur_row_has_delete_sign); | 597 | 7.91k | } else { | 598 | 7.91k | if (!prev_row->has_init_agg()) { | 599 | 2.86k | _init_row_for_agg(prev_row.get(), mutable_block); | 600 | 2.86k | } | 601 | 7.91k | _stat.merged_rows++; | 602 | 7.91k | _aggregate_two_row_in_block<true>(mutable_block, cur_row, prev_row.get()); | 603 | 7.91k | } | 604 | 7.95k | } else { | 605 | 3.44k | finalize_rows(); | 606 | 3.44k | add_row(cur_row_ptr, cur_row_has_delete_sign); | 607 | 3.44k | } | 608 | 11.4k | } | 609 | | // finalize the last lows | 610 | 310 | finalize_rows(); | 611 | 310 | } |
|
612 | | |
613 | | template <bool is_final> |
614 | | void MemTable::_aggregate_for_flexible_partial_update_with_seq_col( |
615 | | const vectorized::ColumnsWithTypeAndName& block_data, |
616 | | vectorized::MutableBlock& mutable_block, |
617 | 30 | std::vector<std::shared_ptr<RowInBlock>>& temp_row_in_blocks) { |
618 | | // For flexible partial update, when table has sequence column, we don't do any aggregation |
619 | | // in memtable. These duplicate rows will be aggregated in VerticalSegmentWriter |
620 | 30 | int row_pos = -1; |
621 | 964 | for (const auto& row_ptr : _row_in_blocks) { |
622 | 964 | RowInBlock* row = row_ptr.get(); |
623 | 964 | temp_row_in_blocks.push_back(row_ptr); |
624 | 964 | _finalize_one_row<is_final>(row, block_data, ++row_pos); |
625 | 964 | } |
626 | 30 | } Unexecuted instantiation: _ZN5doris8MemTable51_aggregate_for_flexible_partial_update_with_seq_colILb0EEEvRKSt6vectorINS_10vectorized21ColumnWithTypeAndNameESaIS4_EERNS3_12MutableBlockERS2_ISt10shared_ptrINS_10RowInBlockEESaISD_EE _ZN5doris8MemTable51_aggregate_for_flexible_partial_update_with_seq_colILb1EEEvRKSt6vectorINS_10vectorized21ColumnWithTypeAndNameESaIS4_EERNS3_12MutableBlockERS2_ISt10shared_ptrINS_10RowInBlockEESaISD_EE Line | Count | Source | 617 | 30 | std::vector<std::shared_ptr<RowInBlock>>& temp_row_in_blocks) { | 618 | | // For flexible partial update, when table has sequence column, we don't do any aggregation | 619 | | // in memtable. These duplicate rows will be aggregated in VerticalSegmentWriter | 620 | 30 | int row_pos = -1; | 621 | 964 | for (const auto& row_ptr : _row_in_blocks) { | 622 | 964 | RowInBlock* row = row_ptr.get(); | 623 | 964 | temp_row_in_blocks.push_back(row_ptr); | 624 | 964 | _finalize_one_row<is_final>(row, block_data, ++row_pos); | 625 | 964 | } | 626 | 30 | } |
|
627 | | |
628 | 0 | void MemTable::shrink_memtable_by_agg() { |
629 | 0 | SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); |
630 | 0 | if (_keys_type == KeysType::DUP_KEYS) { |
631 | 0 | return; |
632 | 0 | } |
633 | 0 | size_t same_keys_num = _sort(); |
634 | 0 | if (same_keys_num != 0) { |
635 | 0 | (_skip_bitmap_col_idx == -1) ? _aggregate<false, false>() : _aggregate<false, true>(); |
636 | 0 | } |
637 | 0 | } |
638 | | |
639 | 197k | bool MemTable::need_flush() const { |
640 | 197k | DBUG_EXECUTE_IF("MemTable.need_flush", { return true; }); |
641 | 197k | auto max_size = config::write_buffer_size; |
642 | 197k | if (_partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) { |
643 | 1.69k | auto update_columns_size = _num_columns; |
644 | 1.69k | auto min_buffer_size = config::min_write_buffer_size_for_partial_update; |
645 | 1.69k | max_size = max_size * update_columns_size / _tablet_schema->num_columns(); |
646 | 1.69k | max_size = max_size > min_buffer_size ? max_size : min_buffer_size; |
647 | 1.69k | } |
648 | 197k | return memory_usage() >= max_size; |
649 | 197k | } |
650 | | |
651 | 197k | bool MemTable::need_agg() const { |
652 | 197k | if (_keys_type == KeysType::AGG_KEYS) { |
653 | 7.99k | auto max_size = config::write_buffer_size_for_agg; |
654 | 7.99k | return memory_usage() >= max_size; |
655 | 7.99k | } |
656 | 189k | return false; |
657 | 197k | } |
658 | | |
659 | 102k | Status MemTable::_to_block(std::unique_ptr<vectorized::Block>* res) { |
660 | 102k | size_t same_keys_num = _sort(); |
661 | 102k | if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0) { |
662 | 98.2k | if (_keys_type == KeysType::DUP_KEYS && _tablet_schema->num_key_columns() == 0) { |
663 | 30 | _output_mutable_block.swap(_input_mutable_block); |
664 | 98.2k | } else { |
665 | 98.2k | vectorized::Block in_block = _input_mutable_block.to_block(); |
666 | 98.2k | RETURN_IF_ERROR(_put_into_output(in_block)); |
667 | 98.2k | } |
668 | 98.2k | } else { |
669 | 3.78k | (_skip_bitmap_col_idx == -1) ? _aggregate<true, false>() : _aggregate<true, true>(); |
670 | 3.78k | } |
671 | 102k | if (_keys_type == KeysType::UNIQUE_KEYS && _enable_unique_key_mow && |
672 | 102k | !_tablet_schema->cluster_key_idxes().empty()) { |
673 | 1 | if (_partial_update_mode != UniqueKeyUpdateModePB::UPSERT) { |
674 | 0 | return Status::InternalError( |
675 | 0 | "Partial update for mow with cluster keys is not supported"); |
676 | 0 | } |
677 | 1 | RETURN_IF_ERROR(_sort_by_cluster_keys()); |
678 | 1 | } |
679 | 102k | _input_mutable_block.clear(); |
680 | 102k | *res = vectorized::Block::create_unique(_output_mutable_block.to_block()); |
681 | 102k | return Status::OK(); |
682 | 102k | } |
683 | | |
684 | 102k | Status MemTable::to_block(std::unique_ptr<vectorized::Block>* res) { |
685 | 102k | RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_to_block(res)); |
686 | 101k | return Status::OK(); |
687 | 102k | } |
688 | | |
689 | | } // namespace doris |