/root/doris/be/src/olap/memtable.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "olap/memtable.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/olap_file.pb.h> |
22 | | #include <pdqsort.h> |
23 | | |
24 | | #include <algorithm> |
25 | | #include <limits> |
26 | | #include <string> |
27 | | #include <vector> |
28 | | |
29 | | #include "bvar/bvar.h" |
30 | | #include "common/config.h" |
31 | | #include "olap/memtable_memory_limiter.h" |
32 | | #include "olap/olap_define.h" |
33 | | #include "olap/tablet_schema.h" |
34 | | #include "runtime/descriptors.h" |
35 | | #include "runtime/exec_env.h" |
36 | | #include "runtime/thread_context.h" |
37 | | #include "util/debug_points.h" |
38 | | #include "util/runtime_profile.h" |
39 | | #include "util/stopwatch.hpp" |
40 | | #include "vec/aggregate_functions/aggregate_function_reader.h" |
41 | | #include "vec/aggregate_functions/aggregate_function_simple_factory.h" |
42 | | #include "vec/columns/column.h" |
43 | | |
44 | | namespace doris { |
45 | | #include "common/compile_check_begin.h" |
46 | | |
47 | | bvar::Adder<int64_t> g_memtable_cnt("memtable_cnt"); |
48 | | |
49 | | using namespace ErrorCode; |
50 | | |
51 | | MemTable::MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schema, |
52 | | const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc, |
53 | | bool enable_unique_key_mow, PartialUpdateInfo* partial_update_info, |
54 | | const std::shared_ptr<ResourceContext>& resource_ctx) |
55 | | : _mem_type(MemType::ACTIVE), |
56 | | _tablet_id(tablet_id), |
57 | | _enable_unique_key_mow(enable_unique_key_mow), |
58 | | _keys_type(tablet_schema->keys_type()), |
59 | | _tablet_schema(tablet_schema), |
60 | | _resource_ctx(resource_ctx), |
61 | | _is_first_insertion(true), |
62 | 84.1k | _agg_functions(tablet_schema->num_columns()), |
63 | 84.1k | _offsets_of_aggregate_states(tablet_schema->num_columns()), |
64 | 84.1k | _total_size_of_aggregate_states(0) { |
65 | 84.1k | g_memtable_cnt << 1; |
66 | 84.1k | _mem_tracker = std::make_shared<MemTracker>(); |
67 | 84.1k | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
68 | 84.1k | _resource_ctx->memory_context()->mem_tracker()->write_tracker()); |
69 | 84.1k | SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); |
70 | 84.1k | _arena = std::make_unique<vectorized::Arena>(); |
71 | 84.1k | _vec_row_comparator = std::make_shared<RowInBlockComparator>(_tablet_schema); |
72 | 84.1k | _num_columns = _tablet_schema->num_columns(); |
73 | 84.1k | if (partial_update_info != nullptr) { |
74 | 84.1k | _partial_update_mode = partial_update_info->update_mode(); |
75 | 2.45k | if (_partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) { |
76 | 2.45k | _num_columns = partial_update_info->partial_update_input_columns.size(); |
77 | 2.45k | if (partial_update_info->is_schema_contains_auto_inc_column && |
78 | 193 | !partial_update_info->is_input_columns_contains_auto_inc_column) { |
79 | 193 | _is_partial_update_and_auto_inc = true; |
80 | 193 | _num_columns += 1; |
81 | 2.45k | } |
82 | 84.1k | } |
83 | | } |
84 | 84.1k | // TODO: Support ZOrderComparator in the future |
85 | 84.1k | _init_columns_offset_by_slot_descs(slot_descs, tuple_desc); |
86 | 84.1k | _row_in_blocks = std::make_unique<DorisVector<RowInBlock*>>(); |
87 | | } |
88 | | |
89 | 84.1k | void MemTable::_init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs, |
90 | 1.17M | const TupleDescriptor* tuple_desc) { |
91 | 1.17M | for (auto slot_desc : *slot_descs) { |
92 | 17.8M | const auto& slots = tuple_desc->slots(); |
93 | 17.8M | for (int j = 0; j < slots.size(); ++j) { |
94 | 1.17M | if (slot_desc->id() == slots[j]->id()) { |
95 | 1.17M | _column_offset.emplace_back(j); |
96 | 1.17M | break; |
97 | 17.8M | } |
98 | 1.17M | } |
99 | 84.1k | } |
100 | 193 | if (_is_partial_update_and_auto_inc) { |
101 | 193 | _column_offset.emplace_back(_column_offset.size()); |
102 | 84.1k | } |
103 | | } |
104 | 29.4k | |
105 | 352k | void MemTable::_init_agg_functions(const vectorized::Block* block) { |
106 | 322k | for (auto cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) { |
107 | 322k | vectorized::AggregateFunctionPtr function; |
108 | | if (_keys_type == KeysType::UNIQUE_KEYS && _enable_unique_key_mow) { |
109 | | // In such table, non-key column's aggregation type is NONE, so we need to construct |
110 | 274k | // the aggregate function manually. |
111 | 273k | if (_skip_bitmap_col_idx != cid) { |
112 | 273k | function = vectorized::AggregateFunctionSimpleFactory::instance().get( |
113 | 273k | "replace_load", {block->get_data_type(cid)}, |
114 | 273k | block->get_data_type(cid)->is_nullable(), |
115 | 273k | BeExecVersionManager::get_newest_version()); |
116 | 278 | } else { |
117 | 278 | function = vectorized::AggregateFunctionSimpleFactory::instance().get( |
118 | 278 | "bitmap_intersect", {block->get_data_type(cid)}, false, |
119 | 278 | BeExecVersionManager::get_newest_version()); |
120 | 274k | } |
121 | 48.5k | } else { |
122 | 48.5k | function = _tablet_schema->column(cid).get_aggregate_function( |
123 | 48.5k | vectorized::AGG_LOAD_SUFFIX, _tablet_schema->column(cid).get_be_exec_version()); |
124 | 0 | if (function == nullptr) { |
125 | 0 | LOG(WARNING) << "column get aggregate function failed, column=" |
126 | 0 | << _tablet_schema->column(cid).name(); |
127 | 48.5k | } |
128 | | } |
129 | 322k | |
130 | 322k | DCHECK(function != nullptr); |
131 | 322k | _agg_functions[cid] = function; |
132 | | } |
133 | 352k | |
134 | 322k | for (auto cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) { |
135 | 322k | _offsets_of_aggregate_states[cid] = _total_size_of_aggregate_states; |
136 | | _total_size_of_aggregate_states += _agg_functions[cid]->size_of_data(); |
137 | | |
138 | 322k | // If not the last aggregate_state, we need pad it so that next aggregate_state will be aligned. |
139 | 293k | if (cid + 1 < _num_columns) { |
140 | | size_t alignment_of_next_state = _agg_functions[cid + 1]->align_of_data(); |
141 | | |
142 | | /// Extend total_size to next alignment requirement |
143 | 293k | /// Add padding by rounding up 'total_size_of_aggregate_states' to be a multiplier of alignment_of_next_state. |
144 | 293k | _total_size_of_aggregate_states = |
145 | 293k | (_total_size_of_aggregate_states + alignment_of_next_state - 1) / |
146 | 293k | alignment_of_next_state * alignment_of_next_state; |
147 | 322k | } |
148 | 29.4k | } |
149 | | } |
150 | 84.1k | |
151 | 84.1k | MemTable::~MemTable() { |
152 | 84.1k | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
153 | 84.1k | _resource_ctx->memory_context()->mem_tracker()->write_tracker()); |
154 | 84.1k | { |
155 | 84.1k | SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); |
156 | 84.1k | g_memtable_cnt << -1; |
157 | 14.1M | if (_keys_type != KeysType::DUP_KEYS) { |
158 | 14.1M | for (auto it = _row_in_blocks->begin(); it != _row_in_blocks->end(); it++) { |
159 | 14.1M | if (!(*it)->has_init_agg()) { |
160 | 14.1M | continue; |
161 | | } |
162 | | // We should release agg_places here, because they are not released when a |
163 | 18.4E | // load is canceled. |
164 | 0 | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { |
165 | 0 | auto function = _agg_functions[i]; |
166 | 0 | DCHECK(function != nullptr); |
167 | 0 | function->destroy((*it)->agg_places(i)); |
168 | 18.4E | } |
169 | 42.7k | } |
170 | | } |
171 | 84.1k | |
172 | 84.1k | std::for_each(_row_in_blocks->begin(), _row_in_blocks->end(), |
173 | | std::default_delete<RowInBlock>()); |
174 | | // Arena has to be destroyed after agg state, because some agg state's memory may be |
175 | 84.1k | // allocated in arena. |
176 | 84.1k | _arena.reset(); |
177 | 84.1k | _vec_row_comparator.reset(); |
178 | 84.1k | _row_in_blocks.reset(); |
179 | 84.1k | _agg_functions.clear(); |
180 | 84.1k | _input_mutable_block.clear(); |
181 | 84.1k | _output_mutable_block.clear(); |
182 | 84.1k | } |
183 | | if (_is_flush_success) { |
184 | 63.4k | // If the memtable is flush success, then its memtracker's consumption should be 0 |
185 | 0 | if (_mem_tracker->consumption() != 0 && config::crash_in_memory_tracker_inaccurate) { |
186 | 0 | LOG(FATAL) << "memtable flush success but cosumption is not 0, it is " |
187 | 0 | << _mem_tracker->consumption(); |
188 | 63.4k | } |
189 | 84.1k | } |
190 | | } |
191 | 418k | |
192 | 418k | int RowInBlockComparator::operator()(const RowInBlock* left, const RowInBlock* right) const { |
193 | 418k | return _pblock->compare_at(left->_row_pos, right->_row_pos, _tablet_schema->num_key_columns(), |
194 | 418k | *_pblock, -1); |
195 | | } |
196 | | |
197 | 137k | Status MemTable::insert(const vectorized::Block* input_block, |
198 | 137k | const DorisVector<uint32_t>& row_idxs) { |
199 | 137k | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
200 | 137k | _resource_ctx->memory_context()->mem_tracker()->write_tracker()); |
201 | | SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); |
202 | 137k | |
203 | 63.4k | if (_is_first_insertion) { |
204 | 63.4k | _is_first_insertion = false; |
205 | 63.4k | auto clone_block = input_block->clone_without_columns(&_column_offset); |
206 | 63.4k | _input_mutable_block = vectorized::MutableBlock::build_mutable_block(&clone_block); |
207 | 63.4k | _vec_row_comparator->set_block(&_input_mutable_block); |
208 | 63.4k | _output_mutable_block = vectorized::MutableBlock::build_mutable_block(&clone_block); |
209 | 1.26k | if (_tablet_schema->has_sequence_col()) { |
210 | | if (_partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) { |
211 | | // for unique key fixed partial update, sequence column index in block |
212 | 510 | // may be different with the index in `_tablet_schema` |
213 | 411 | for (int32_t i = 0; i < clone_block.columns(); i++) { |
214 | 38 | if (clone_block.get_by_position(i).name == SEQUENCE_COL) { |
215 | 38 | _seq_col_idx_in_block = i; |
216 | 38 | break; |
217 | 411 | } |
218 | 1.13k | } |
219 | 1.13k | } else { |
220 | 1.13k | _seq_col_idx_in_block = _tablet_schema->sequence_col_idx(); |
221 | 1.26k | } |
222 | 63.4k | } |
223 | 63.4k | if (_partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS && |
224 | | _tablet_schema->has_skip_bitmap_col()) { |
225 | 278 | // init of _skip_bitmap_col_idx must be before _init_agg_functions() |
226 | 278 | _skip_bitmap_col_idx = _tablet_schema->skip_bitmap_col_idx(); |
227 | 24 | if (_seq_col_idx_in_block != -1) { |
228 | 24 | _seq_col_unique_id = _tablet_schema->column(_seq_col_idx_in_block).unique_id(); |
229 | 278 | } |
230 | 63.4k | } |
231 | | if (_keys_type != KeysType::DUP_KEYS) { |
232 | | // there may be additional intermediate columns in input_block |
233 | 29.4k | // we only need columns indicated by column offset in the output |
234 | 29.4k | RETURN_IF_CATCH_EXCEPTION(_init_agg_functions(&clone_block)); |
235 | 63.4k | } |
236 | | } |
237 | 137k | |
238 | 137k | auto num_rows = row_idxs.size(); |
239 | 137k | size_t cursor_in_mutableblock = _input_mutable_block.rows(); |
240 | 137k | RETURN_IF_ERROR(_input_mutable_block.add_rows(input_block, row_idxs.data(), |
241 | 37.5M | row_idxs.data() + num_rows, &_column_offset)); |
242 | 37.3M | for (int i = 0; i < num_rows; i++) { |
243 | 37.3M | _row_in_blocks->emplace_back(new RowInBlock {cursor_in_mutableblock + i}); |
244 | | } |
245 | 137k | |
246 | 137k | _stat.raw_rows += num_rows; |
247 | 137k | return Status::OK(); |
248 | | } |
249 | | |
250 | | template <bool has_skip_bitmap_col> |
251 | 98.7k | void MemTable::_aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, |
252 | | RowInBlock* src_row, RowInBlock* dst_row) { |
253 | | // for flexible partial update, the caller must guarantees that either src_row and dst_row |
254 | | // both specify the sequence column, or src_row and dst_row both don't specify the |
255 | 98.7k | // sequence column |
256 | 326 | if (_tablet_schema->has_sequence_col() && _seq_col_idx_in_block >= 0) { |
257 | 326 | DCHECK_LT(_seq_col_idx_in_block, mutable_block.columns()); |
258 | 326 | auto col_ptr = mutable_block.mutable_columns()[_seq_col_idx_in_block].get(); |
259 | | auto res = col_ptr->compare_at(dst_row->_row_pos, src_row->_row_pos, *col_ptr, -1); |
260 | 326 | // dst sequence column larger than src, don't need to update |
261 | 106 | if (res > 0) { |
262 | 106 | return; |
263 | | } |
264 | | // need to update the row pos in dst row to the src row pos when has |
265 | 220 | // sequence column |
266 | 220 | dst_row->_row_pos = src_row->_row_pos; |
267 | | } |
268 | 98.6k | // dst is non-sequence row, or dst sequence is smaller |
269 | 4.07k | if constexpr (!has_skip_bitmap_col) { |
270 | 401k | DCHECK(_skip_bitmap_col_idx == -1); |
271 | 307k | for (size_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) { |
272 | 307k | auto* col_ptr = mutable_block.mutable_columns()[cid].get(); |
273 | 307k | _agg_functions[cid]->add(dst_row->agg_places(cid), |
274 | 307k | const_cast<const doris::vectorized::IColumn**>(&col_ptr), |
275 | 307k | src_row->_row_pos, _arena.get()); |
276 | 94.4k | } |
277 | 4.07k | } else { |
278 | 4.07k | DCHECK(_skip_bitmap_col_idx != -1); |
279 | 4.07k | DCHECK_LT(_skip_bitmap_col_idx, mutable_block.columns()); |
280 | 4.07k | const BitmapValue& skip_bitmap = |
281 | 4.07k | assert_cast<vectorized::ColumnBitmap*, TypeCheckOnRelease::DISABLE>( |
282 | 4.07k | mutable_block.mutable_columns()[_skip_bitmap_col_idx].get()) |
283 | 74.0k | ->get_data()[src_row->_row_pos]; |
284 | 70.0k | for (size_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) { |
285 | 70.0k | const auto& col = _tablet_schema->column(cid); |
286 | 34.9k | if (cid != _skip_bitmap_col_idx && skip_bitmap.contains(col.unique_id())) { |
287 | 34.9k | continue; |
288 | 35.0k | } |
289 | 35.0k | auto* col_ptr = mutable_block.mutable_columns()[cid].get(); |
290 | 35.0k | _agg_functions[cid]->add(dst_row->agg_places(cid), |
291 | 35.0k | const_cast<const doris::vectorized::IColumn**>(&col_ptr), |
292 | 35.0k | src_row->_row_pos, _arena.get()); |
293 | 4.07k | } |
294 | 98.6k | } _ZN5doris8MemTable27_aggregate_two_row_in_blockILb0EEEvRNS_10vectorized12MutableBlockEPNS_10RowInBlockES6_ Line | Count | Source | 251 | 94.6k | void MemTable::_aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, | 252 | | RowInBlock* src_row, RowInBlock* dst_row) { | 253 | | // for flexible partial update, the caller must guarantees that either src_row and dst_row | 254 | | // both specify the sequence column, or src_row and dst_row both don't specify the | 255 | 94.6k | // sequence column | 256 | 170 | if (_tablet_schema->has_sequence_col() && _seq_col_idx_in_block >= 0) { | 257 | 170 | DCHECK_LT(_seq_col_idx_in_block, mutable_block.columns()); | 258 | 170 | auto col_ptr = mutable_block.mutable_columns()[_seq_col_idx_in_block].get(); | 259 | | auto res = col_ptr->compare_at(dst_row->_row_pos, src_row->_row_pos, *col_ptr, -1); | 260 | 170 | // dst sequence column larger than src, don't need to update | 261 | 88 | if (res > 0) { | 262 | 88 | return; | 263 | | } | 264 | | // need to update the row pos in dst row to the src row pos when has | 265 | 82 | // sequence column | 266 | 82 | dst_row->_row_pos = src_row->_row_pos; | 267 | | } | 268 | 94.5k | // dst is non-sequence row, or dst sequence is smaller | 269 | 94.4k | if constexpr (!has_skip_bitmap_col) { | 270 | 401k | DCHECK(_skip_bitmap_col_idx == -1); | 271 | 307k | for (size_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) { | 272 | 307k | auto* col_ptr = mutable_block.mutable_columns()[cid].get(); | 273 | 307k | _agg_functions[cid]->add(dst_row->agg_places(cid), | 274 | 307k | const_cast<const doris::vectorized::IColumn**>(&col_ptr), | 275 | 307k | src_row->_row_pos, _arena.get()); | 276 | 94.4k | } | 277 | 94.5k | } else { | 278 | 94.5k | DCHECK(_skip_bitmap_col_idx != -1); | 279 | 94.5k | DCHECK_LT(_skip_bitmap_col_idx, mutable_block.columns()); | 280 | 94.5k | const BitmapValue& skip_bitmap = | 281 | 94.5k | assert_cast<vectorized::ColumnBitmap*, TypeCheckOnRelease::DISABLE>( | 282 | 94.5k | mutable_block.mutable_columns()[_skip_bitmap_col_idx].get()) | 283 | 94.5k | ->get_data()[src_row->_row_pos]; | 284 | 94.5k | for (size_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) { | 285 | 94.5k | const auto& col = _tablet_schema->column(cid); | 286 | 94.5k | if (cid != _skip_bitmap_col_idx && skip_bitmap.contains(col.unique_id())) { | 287 | 94.5k | continue; | 288 | 94.5k | } | 289 | 94.5k | auto* col_ptr = mutable_block.mutable_columns()[cid].get(); | 290 | 94.5k | _agg_functions[cid]->add(dst_row->agg_places(cid), | 291 | 94.5k | const_cast<const doris::vectorized::IColumn**>(&col_ptr), | 292 | 94.5k | src_row->_row_pos, _arena.get()); | 293 | 94.5k | } | 294 | 94.5k | } |
_ZN5doris8MemTable27_aggregate_two_row_in_blockILb1EEEvRNS_10vectorized12MutableBlockEPNS_10RowInBlockES6_ Line | Count | Source | 251 | 4.09k | void MemTable::_aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, | 252 | | RowInBlock* src_row, RowInBlock* dst_row) { | 253 | | // for flexible partial update, the caller must guarantees that either src_row and dst_row | 254 | | // both specify the sequence column, or src_row and dst_row both don't specify the | 255 | 4.09k | // sequence column | 256 | 156 | if (_tablet_schema->has_sequence_col() && _seq_col_idx_in_block >= 0) { | 257 | 156 | DCHECK_LT(_seq_col_idx_in_block, mutable_block.columns()); | 258 | 156 | auto col_ptr = mutable_block.mutable_columns()[_seq_col_idx_in_block].get(); | 259 | | auto res = col_ptr->compare_at(dst_row->_row_pos, src_row->_row_pos, *col_ptr, -1); | 260 | 156 | // dst sequence column larger than src, don't need to update | 261 | 18 | if (res > 0) { | 262 | 18 | return; | 263 | | } | 264 | | // need to update the row pos in dst row to the src row pos when has | 265 | 138 | // sequence column | 266 | 138 | dst_row->_row_pos = src_row->_row_pos; | 267 | | } | 268 | 4.07k | // dst is non-sequence row, or dst sequence is smaller | 269 | 4.07k | if constexpr (!has_skip_bitmap_col) { | 270 | 4.07k | DCHECK(_skip_bitmap_col_idx == -1); | 271 | 4.07k | for (size_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) { | 272 | 4.07k | auto* col_ptr = mutable_block.mutable_columns()[cid].get(); | 273 | 4.07k | _agg_functions[cid]->add(dst_row->agg_places(cid), | 274 | 4.07k | const_cast<const doris::vectorized::IColumn**>(&col_ptr), | 275 | 4.07k | src_row->_row_pos, _arena.get()); | 276 | 4.07k | } | 277 | 4.07k | } else { | 278 | 4.07k | DCHECK(_skip_bitmap_col_idx != -1); | 279 | 4.07k | DCHECK_LT(_skip_bitmap_col_idx, mutable_block.columns()); | 280 | 4.07k | const BitmapValue& skip_bitmap = | 281 | 4.07k | assert_cast<vectorized::ColumnBitmap*, TypeCheckOnRelease::DISABLE>( | 282 | 4.07k | mutable_block.mutable_columns()[_skip_bitmap_col_idx].get()) | 283 | 74.0k | ->get_data()[src_row->_row_pos]; | 284 | 70.0k | for (size_t cid = _tablet_schema->num_key_columns(); cid < _num_columns; ++cid) { | 285 | 70.0k | const auto& col = _tablet_schema->column(cid); | 286 | 34.9k | if (cid != _skip_bitmap_col_idx && skip_bitmap.contains(col.unique_id())) { | 287 | 34.9k | continue; | 288 | 35.0k | } | 289 | 35.0k | auto* col_ptr = mutable_block.mutable_columns()[cid].get(); | 290 | 35.0k | _agg_functions[cid]->add(dst_row->agg_places(cid), | 291 | 35.0k | const_cast<const doris::vectorized::IColumn**>(&col_ptr), | 292 | 35.0k | src_row->_row_pos, _arena.get()); | 293 | 4.07k | } | 294 | 4.07k | } |
|
295 | 61.2k | } |
296 | 61.2k | Status MemTable::_put_into_output(vectorized::Block& in_block) { |
297 | 61.2k | SCOPED_RAW_TIMER(&_stat.put_into_output_ns); |
298 | 61.2k | DorisVector<uint32_t> row_pos_vec; |
299 | 61.2k | DCHECK(in_block.rows() <= std::numeric_limits<int>::max()); |
300 | 36.6M | row_pos_vec.reserve(in_block.rows()); |
301 | 36.5M | for (int i = 0; i < _row_in_blocks->size(); i++) { |
302 | 36.5M | row_pos_vec.emplace_back((*_row_in_blocks)[i]->_row_pos); |
303 | 61.2k | } |
304 | 61.2k | return _output_mutable_block.add_rows(&in_block, row_pos_vec.data(), |
305 | 61.2k | row_pos_vec.data() + in_block.rows()); |
306 | | } |
307 | 63.4k | |
308 | 63.4k | size_t MemTable::_sort() { |
309 | 63.4k | SCOPED_RAW_TIMER(&_stat.sort_ns); |
310 | 63.4k | _stat.sort_times++; |
311 | | size_t same_keys_num = 0; |
312 | 63.4k | // sort new rows |
313 | 249k | Tie tie = Tie(_last_sorted_pos, _row_in_blocks->size()); |
314 | 371M | for (size_t i = 0; i < _tablet_schema->num_key_columns(); i++) { |
315 | 371M | auto cmp = [&](const RowInBlock* lhs, const RowInBlock* rhs) -> int { |
316 | 371M | return _input_mutable_block.compare_one_column(lhs->_row_pos, rhs->_row_pos, i, -1); |
317 | 185k | }; |
318 | 185k | _sort_one_column(*_row_in_blocks, tie, cmp); |
319 | 63.4k | } |
320 | | bool is_dup = (_keys_type == KeysType::DUP_KEYS); |
321 | 63.4k | // sort extra round by _row_pos to make the sort stable |
322 | 253k | auto iter = tie.iter(); |
323 | 190k | while (iter.next()) { |
324 | 190k | pdqsort(std::next(_row_in_blocks->begin(), iter.left()), |
325 | 18.6M | std::next(_row_in_blocks->begin(), iter.right()), |
326 | 18.6M | [&is_dup](const RowInBlock* lhs, const RowInBlock* rhs) -> bool { |
327 | 18.6M | return is_dup ? lhs->_row_pos > rhs->_row_pos : lhs->_row_pos < rhs->_row_pos; |
328 | 190k | }); |
329 | 190k | same_keys_num += iter.right() - iter.left(); |
330 | | } |
331 | 63.4k | // merge new rows and old rows |
332 | 63.4k | _vec_row_comparator->set_block(&_input_mutable_block); |
333 | 63.4k | auto cmp_func = [this, is_dup, &same_keys_num](const RowInBlock* l, |
334 | 0 | const RowInBlock* r) -> bool { |
335 | 0 | auto value = (*(this->_vec_row_comparator))(l, r); |
336 | 0 | if (value == 0) { |
337 | 0 | same_keys_num++; |
338 | 0 | return is_dup ? l->_row_pos > r->_row_pos : l->_row_pos < r->_row_pos; |
339 | 0 | } else { |
340 | 0 | return value < 0; |
341 | 0 | } |
342 | 63.4k | }; |
343 | 63.4k | auto new_row_it = std::next(_row_in_blocks->begin(), _last_sorted_pos); |
344 | 63.4k | std::inplace_merge(_row_in_blocks->begin(), new_row_it, _row_in_blocks->end(), cmp_func); |
345 | 63.4k | _last_sorted_pos = _row_in_blocks->size(); |
346 | 63.4k | return same_keys_num; |
347 | | } |
348 | 1.55k | |
349 | 1.55k | Status MemTable::_sort_by_cluster_keys() { |
350 | 1.55k | SCOPED_RAW_TIMER(&_stat.sort_ns); |
351 | | _stat.sort_times++; |
352 | 1.55k | // sort all rows |
353 | 1.55k | vectorized::Block in_block = _output_mutable_block.to_block(); |
354 | 1.55k | vectorized::MutableBlock mutable_block = |
355 | 1.55k | vectorized::MutableBlock::build_mutable_block(&in_block); |
356 | 1.55k | auto clone_block = in_block.clone_without_columns(); |
357 | | _output_mutable_block = vectorized::MutableBlock::build_mutable_block(&clone_block); |
358 | 1.55k | |
359 | 1.55k | DorisVector<RowInBlock*> row_in_blocks; |
360 | 1.55k | std::unique_ptr<int, std::function<void(int*)>> row_in_blocks_deleter((int*)0x01, [&](int*) { |
361 | 1.55k | std::for_each(row_in_blocks.begin(), row_in_blocks.end(), |
362 | 1.55k | std::default_delete<RowInBlock>()); |
363 | 1.55k | }); |
364 | 807k | row_in_blocks.reserve(mutable_block.rows()); |
365 | 806k | for (size_t i = 0; i < mutable_block.rows(); i++) { |
366 | 806k | row_in_blocks.emplace_back(new RowInBlock {i}); |
367 | 1.55k | } |
368 | | Tie tie = Tie(0, mutable_block.rows()); |
369 | 3.78k | |
370 | 3.78k | for (auto cid : _tablet_schema->cluster_key_uids()) { |
371 | 3.78k | auto index = _tablet_schema->field_index(cid); |
372 | 0 | if (index == -1) { |
373 | 0 | return Status::InternalError("could not find cluster key column with unique_id=" + |
374 | 0 | std::to_string(cid) + " in tablet schema"); |
375 | 8.47M | } |
376 | 8.47M | auto cmp = [&](const RowInBlock* lhs, const RowInBlock* rhs) -> int { |
377 | 8.47M | return mutable_block.compare_one_column(lhs->_row_pos, rhs->_row_pos, index, -1); |
378 | 3.78k | }; |
379 | 3.78k | _sort_one_column(row_in_blocks, tie, cmp); |
380 | | } |
381 | | |
382 | 1.55k | // sort extra round by _row_pos to make the sort stable |
383 | 119k | auto iter = tie.iter(); |
384 | 118k | while (iter.next()) { |
385 | 118k | pdqsort(std::next(row_in_blocks.begin(), iter.left()), |
386 | 848k | std::next(row_in_blocks.begin(), iter.right()), |
387 | 848k | [](const RowInBlock* lhs, const RowInBlock* rhs) -> bool { |
388 | 848k | return lhs->_row_pos < rhs->_row_pos; |
389 | 118k | }); |
390 | | } |
391 | 1.55k | |
392 | 1.55k | in_block = mutable_block.to_block(); |
393 | 1.55k | SCOPED_RAW_TIMER(&_stat.put_into_output_ns); |
394 | 1.55k | DorisVector<uint32_t> row_pos_vec; |
395 | 1.55k | DCHECK(in_block.rows() <= std::numeric_limits<int>::max()); |
396 | 822k | row_pos_vec.reserve(in_block.rows()); |
397 | 820k | for (int i = 0; i < row_in_blocks.size(); i++) { |
398 | 820k | row_pos_vec.emplace_back(row_in_blocks[i]->_row_pos); |
399 | 1.55k | } |
400 | 21.5k | std::vector<int> column_offset; |
401 | 20.0k | for (int i = 0; i < _column_offset.size(); ++i) { |
402 | 20.0k | column_offset.emplace_back(i); |
403 | 1.55k | } |
404 | 1.55k | return _output_mutable_block.add_rows(&in_block, row_pos_vec.data(), |
405 | 1.55k | row_pos_vec.data() + in_block.rows(), &column_offset); |
406 | | } |
407 | | |
408 | 189k | void MemTable::_sort_one_column(DorisVector<RowInBlock*>& row_in_blocks, Tie& tie, |
409 | 189k | std::function<int(const RowInBlock*, const RowInBlock*)> cmp) { |
410 | 6.18M | auto iter = tie.iter(); |
411 | 5.99M | while (iter.next()) { |
412 | 5.99M | pdqsort(std::next(row_in_blocks.begin(), static_cast<int>(iter.left())), |
413 | 291M | std::next(row_in_blocks.begin(), static_cast<int>(iter.right())), |
414 | 5.99M | [&cmp](auto lhs, auto rhs) -> bool { return cmp(lhs, rhs) < 0; }); |
415 | 64.3M | tie[iter.left()] = 0; |
416 | 58.3M | for (auto i = iter.left() + 1; i < iter.right(); i++) { |
417 | 58.3M | tie[i] = (cmp(row_in_blocks[i - 1], row_in_blocks[i]) == 0); |
418 | 5.99M | } |
419 | 189k | } |
420 | | } |
421 | | |
422 | | template <bool is_final> |
423 | | void MemTable::_finalize_one_row(RowInBlock* row, |
424 | 321k | const vectorized::ColumnsWithTypeAndName& block_data, |
425 | | int row_pos) { |
426 | 846k | // move key columns |
427 | 524k | for (size_t i = 0; i < _tablet_schema->num_key_columns(); ++i) { |
428 | 524k | _output_mutable_block.get_column_by_position(i)->insert_from(*block_data[i].column.get(), |
429 | 524k | row->_row_pos); |
430 | 321k | } |
431 | | if (row->has_init_agg()) { |
432 | 84.9k | // get value columns from agg_places |
433 | 76.6k | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { |
434 | 76.6k | auto function = _agg_functions[i]; |
435 | 76.6k | auto* agg_place = row->agg_places(i); |
436 | 76.6k | auto* col_ptr = _output_mutable_block.get_column_by_position(i).get(); |
437 | | function->insert_result_into(agg_place, *col_ptr); |
438 | 76.6k |
|
439 | 0 | if constexpr (is_final) { |
440 | 0 | function->destroy(agg_place); |
441 | 0 | } else { |
442 | 0 | function->reset(agg_place); |
443 | 76.6k | } |
444 | | } |
445 | 8.32k |
|
446 | 0 | if constexpr (is_final) { |
447 | 0 | row->remove_init_agg(); |
448 | 0 | } else { |
449 | 0 | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { |
450 | 0 | auto function = _agg_functions[i]; |
451 | 0 | auto* agg_place = row->agg_places(i); |
452 | 0 | auto* col_ptr = _output_mutable_block.get_column_by_position(i).get(); |
453 | 0 | function->add(agg_place, const_cast<const doris::vectorized::IColumn**>(&col_ptr), |
454 | 0 | row_pos, _arena.get()); |
455 | 0 | } |
456 | 313k | } |
457 | | } else { |
458 | 5.75M | // move columns for rows do not need agg |
459 | 5.43M | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { |
460 | 5.43M | _output_mutable_block.get_column_by_position(i)->insert_from( |
461 | 5.43M | *block_data[i].column.get(), row->_row_pos); |
462 | 313k | } |
463 | 321k | } |
464 | 0 | if constexpr (!is_final) { |
465 | 0 | row->_row_pos = row_pos; |
466 | 321k | } Unexecuted instantiation: _ZN5doris8MemTable17_finalize_one_rowILb0EEEvPNS_10RowInBlockERKSt6vectorINS_10vectorized21ColumnWithTypeAndNameESaIS6_EEi _ZN5doris8MemTable17_finalize_one_rowILb1EEEvPNS_10RowInBlockERKSt6vectorINS_10vectorized21ColumnWithTypeAndNameESaIS6_EEi Line | Count | Source | 424 | 321k | const vectorized::ColumnsWithTypeAndName& block_data, | 425 | | int row_pos) { | 426 | 846k | // move key columns | 427 | 524k | for (size_t i = 0; i < _tablet_schema->num_key_columns(); ++i) { | 428 | 524k | _output_mutable_block.get_column_by_position(i)->insert_from(*block_data[i].column.get(), | 429 | 524k | row->_row_pos); | 430 | 321k | } | 431 | | if (row->has_init_agg()) { | 432 | 84.9k | // get value columns from agg_places | 433 | 76.6k | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { | 434 | 76.6k | auto function = _agg_functions[i]; | 435 | 76.6k | auto* agg_place = row->agg_places(i); | 436 | 76.6k | auto* col_ptr = _output_mutable_block.get_column_by_position(i).get(); | 437 | | function->insert_result_into(agg_place, *col_ptr); | 438 | 76.6k | | 439 | 76.5k | if constexpr (is_final) { | 440 | 76.5k | function->destroy(agg_place); | 441 | 76.6k | } else { | 442 | 76.6k | function->reset(agg_place); | 443 | 76.6k | } | 444 | | } | 445 | 8.32k | | 446 | 8.32k | if constexpr (is_final) { | 447 | 8.32k | row->remove_init_agg(); | 448 | 8.32k | } else { | 449 | 8.32k | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { | 450 | 8.32k | auto function = _agg_functions[i]; | 451 | 8.32k | auto* agg_place = row->agg_places(i); | 452 | 8.32k | auto* col_ptr = _output_mutable_block.get_column_by_position(i).get(); | 453 | 8.32k | function->add(agg_place, const_cast<const doris::vectorized::IColumn**>(&col_ptr), | 454 | 8.32k | row_pos, _arena.get()); | 455 | 8.32k | } | 456 | 313k | } | 457 | | } else { | 458 | 5.75M | // move columns for rows do not need agg | 459 | 5.43M | for (size_t i = _tablet_schema->num_key_columns(); i < _num_columns; ++i) { | 460 | 5.43M | _output_mutable_block.get_column_by_position(i)->insert_from( | 461 | 5.43M | *block_data[i].column.get(), row->_row_pos); | 462 | 313k | } | 463 | 321k | } | 464 | 321k | if constexpr (!is_final) { | 465 | 321k | row->_row_pos = row_pos; | 466 | 321k | } |
|
467 | | } |
468 | | |
469 | 2.09k | template <bool is_final, bool has_skip_bitmap_col> |
470 | 2.09k | void MemTable::_aggregate() { |
471 | 2.09k | SCOPED_RAW_TIMER(&_stat.agg_ns); |
472 | 2.09k | _stat.agg_times++; |
473 | 2.09k | vectorized::Block in_block = _input_mutable_block.to_block(); |
474 | 2.09k | vectorized::MutableBlock mutable_block = |
475 | 2.09k | vectorized::MutableBlock::build_mutable_block(&in_block); |
476 | 2.09k | _vec_row_comparator->set_block(&mutable_block); |
477 | 2.09k | auto& block_data = in_block.get_columns_with_type_and_name(); |
478 | 2.09k | DorisVector<RowInBlock*> temp_row_in_blocks; |
479 | 2.09k | temp_row_in_blocks.reserve(_last_sorted_pos); |
480 | 2.09k | RowInBlock* prev_row = nullptr; |
481 | | int row_pos = -1; |
482 | | //only init agg if needed |
483 | 8.32k | |
484 | 8.32k | auto init_for_agg = [&](RowInBlock* row) { |
485 | 8.32k | row->init_agg_places(_arena->aligned_alloc(_total_size_of_aggregate_states, 16), |
486 | 84.9k | _offsets_of_aggregate_states.data()); |
487 | 76.5k | for (auto cid = _tablet_schema->num_key_columns(); cid < _num_columns; cid++) { |
488 | 76.5k | auto* col_ptr = mutable_block.mutable_columns()[cid].get(); |
489 | 76.5k | auto* data = prev_row->agg_places(cid); |
490 | 76.5k | _agg_functions[cid]->create(data); |
491 | 76.5k | _agg_functions[cid]->add(data, const_cast<const doris::vectorized::IColumn**>(&col_ptr), |
492 | 76.5k | prev_row->_row_pos, _arena.get()); |
493 | 8.32k | } Unexecuted instantiation: _ZZN5doris8MemTable10_aggregateILb0ELb0EEEvvENKUlPNS_10RowInBlockEE_clES3_ Unexecuted instantiation: _ZZN5doris8MemTable10_aggregateILb0ELb1EEEvvENKUlPNS_10RowInBlockEE_clES3_ _ZZN5doris8MemTable10_aggregateILb1ELb0EEEvvENKUlPNS_10RowInBlockEE_clES3_ Line | Count | Source | 483 | 6.82k | | 484 | 6.82k | auto init_for_agg = [&](RowInBlock* row) { | 485 | 6.82k | row->init_agg_places(_arena->aligned_alloc(_total_size_of_aggregate_states, 16), | 486 | 57.8k | _offsets_of_aggregate_states.data()); | 487 | 51.0k | for (auto cid = _tablet_schema->num_key_columns(); cid < _num_columns; cid++) { | 488 | 51.0k | auto* col_ptr = mutable_block.mutable_columns()[cid].get(); | 489 | 51.0k | auto* data = prev_row->agg_places(cid); | 490 | 51.0k | _agg_functions[cid]->create(data); | 491 | 51.0k | _agg_functions[cid]->add(data, const_cast<const doris::vectorized::IColumn**>(&col_ptr), | 492 | 51.0k | prev_row->_row_pos, _arena.get()); | 493 | 6.82k | } |
_ZZN5doris8MemTable10_aggregateILb1ELb1EEEvvENKUlPNS_10RowInBlockEE_clES3_ Line | Count | Source | 483 | 1.50k | | 484 | 1.50k | auto init_for_agg = [&](RowInBlock* row) { | 485 | 1.50k | row->init_agg_places(_arena->aligned_alloc(_total_size_of_aggregate_states, 16), | 486 | 27.0k | _offsets_of_aggregate_states.data()); | 487 | 25.5k | for (auto cid = _tablet_schema->num_key_columns(); cid < _num_columns; cid++) { | 488 | 25.5k | auto* col_ptr = mutable_block.mutable_columns()[cid].get(); | 489 | 25.5k | auto* data = prev_row->agg_places(cid); | 490 | 25.5k | _agg_functions[cid]->create(data); | 491 | 25.5k | _agg_functions[cid]->add(data, const_cast<const doris::vectorized::IColumn**>(&col_ptr), | 492 | 25.5k | prev_row->_row_pos, _arena.get()); | 493 | 1.50k | } |
|
494 | | }; |
495 | 2.09k | |
496 | 420k | if (!has_skip_bitmap_col || _seq_col_idx_in_block == -1) { |
497 | 420k | for (RowInBlock* cur_row : *_row_in_blocks) { |
498 | 98.9k | if (!temp_row_in_blocks.empty() && (*_vec_row_comparator)(prev_row, cur_row) == 0) { |
499 | 8.25k | if (!prev_row->has_init_agg()) { |
500 | 8.25k | init_for_agg(prev_row); |
501 | 98.9k | } |
502 | 98.9k | _stat.merged_rows++; |
503 | 321k | _aggregate_two_row_in_block<has_skip_bitmap_col>(mutable_block, cur_row, prev_row); |
504 | 321k | } else { |
505 | 321k | prev_row = cur_row; |
506 | | if (!temp_row_in_blocks.empty()) { |
507 | 319k | // no more rows to merge for prev row, finalize it |
508 | 319k | _finalize_one_row<is_final>(temp_row_in_blocks.back(), block_data, row_pos); |
509 | 321k | } |
510 | 321k | temp_row_in_blocks.push_back(prev_row); |
511 | 321k | row_pos++; |
512 | 420k | } |
513 | 2.07k | } |
514 | | if (!temp_row_in_blocks.empty()) { |
515 | 2.07k | // finalize the last low |
516 | 2.07k | _finalize_one_row<is_final>(temp_row_in_blocks.back(), block_data, row_pos); |
517 | 2.07k | } |
518 | | } else { |
519 | | // For flexible partial update and the table has sequence column, considering the following situation: |
520 | | // there are multiple rows with the same keys in memtable, some of them specify the sequence column, |
521 | | // some of them don't. We can't do the de-duplication in memtable becasue we can only know the value |
522 | | // of the sequence column of the row which don't specify seqeuence column in SegmentWriter after we |
523 | | // probe the historical data. So at here we can only merge rows that have sequence column together and |
524 | | // merge rows without sequence column together, and finally, perform deduplication on them in SegmentWriter. |
525 | | |
526 | 16 | // !!ATTENTION!!: there may be rows with the same keys after MemTable::_aggregate() in this situation. |
527 | 16 | RowInBlock* row_with_seq_col = nullptr; |
528 | 16 | int row_pos_with_seq = -1; |
529 | 16 | RowInBlock* row_without_seq_col = nullptr; |
530 | | int row_pos_without_seq = -1; |
531 | 80 | |
532 | 80 | auto finalize_rows = [&]() { |
533 | 64 | if (row_with_seq_col != nullptr) { |
534 | 64 | _finalize_one_row<is_final>(row_with_seq_col, block_data, row_pos_with_seq); |
535 | 64 | row_with_seq_col = nullptr; |
536 | 80 | } |
537 | 64 | if (row_without_seq_col != nullptr) { |
538 | 64 | _finalize_one_row<is_final>(row_without_seq_col, block_data, row_pos_without_seq); |
539 | 64 | row_without_seq_col = nullptr; |
540 | 80 | } Unexecuted instantiation: _ZZN5doris8MemTable10_aggregateILb0ELb1EEEvvENKUlvE_clEv _ZZN5doris8MemTable10_aggregateILb1ELb1EEEvvENKUlvE_clEv Line | Count | Source | 531 | 80 | | 532 | 80 | auto finalize_rows = [&]() { | 533 | 64 | if (row_with_seq_col != nullptr) { | 534 | 64 | _finalize_one_row<is_final>(row_with_seq_col, block_data, row_pos_with_seq); | 535 | 64 | row_with_seq_col = nullptr; | 536 | 80 | } | 537 | 64 | if (row_without_seq_col != nullptr) { | 538 | 64 | _finalize_one_row<is_final>(row_without_seq_col, block_data, row_pos_without_seq); | 539 | 64 | row_without_seq_col = nullptr; | 540 | 80 | } |
|
541 | 128 | }; |
542 | 128 | auto add_row = [&](RowInBlock* row, bool with_seq_col) { |
543 | 128 | temp_row_in_blocks.push_back(row); |
544 | 128 | row_pos++; |
545 | 64 | if (with_seq_col) { |
546 | 64 | row_with_seq_col = row; |
547 | 64 | row_pos_with_seq = row_pos; |
548 | 64 | } else { |
549 | 64 | row_without_seq_col = row; |
550 | 64 | row_pos_without_seq = row_pos; |
551 | 128 | } Unexecuted instantiation: _ZZN5doris8MemTable10_aggregateILb0ELb1EEEvvENKUlPNS_10RowInBlockEbE_clES3_b _ZZN5doris8MemTable10_aggregateILb1ELb1EEEvvENKUlPNS_10RowInBlockEbE_clES3_b Line | Count | Source | 541 | 128 | }; | 542 | 128 | auto add_row = [&](RowInBlock* row, bool with_seq_col) { | 543 | 128 | temp_row_in_blocks.push_back(row); | 544 | 128 | row_pos++; | 545 | 64 | if (with_seq_col) { | 546 | 64 | row_with_seq_col = row; | 547 | 64 | row_pos_with_seq = row_pos; | 548 | 64 | } else { | 549 | 64 | row_without_seq_col = row; | 550 | 64 | row_pos_without_seq = row_pos; | 551 | 128 | } |
|
552 | 16 | }; |
553 | 16 | auto& skip_bitmaps = assert_cast<vectorized::ColumnBitmap*>( |
554 | 16 | mutable_block.mutable_columns()[_skip_bitmap_col_idx].get()) |
555 | 284 | ->get_data(); |
556 | 284 | for (auto* cur_row : *_row_in_blocks) { |
557 | 284 | const BitmapValue& skip_bitmap = skip_bitmaps[cur_row->_row_pos]; |
558 | | bool with_seq_col = !skip_bitmap.contains(_seq_col_unique_id); |
559 | | // compare keys, the keys of row_with_seq_col and row_with_seq_col is the same, |
560 | 284 | // choose any of them if it's valid |
561 | 284 | prev_row = (row_with_seq_col == nullptr) ? row_without_seq_col : row_with_seq_col; |
562 | 220 | if (prev_row != nullptr && (*_vec_row_comparator)(prev_row, cur_row) == 0) { |
563 | 220 | prev_row = (with_seq_col ? row_with_seq_col : row_without_seq_col); |
564 | 64 | if (prev_row == nullptr) { |
565 | 64 | add_row(cur_row, with_seq_col); |
566 | 64 | continue; |
567 | 156 | } |
568 | 72 | if (!prev_row->has_init_agg()) { |
569 | 72 | init_for_agg(prev_row); |
570 | 156 | } |
571 | 156 | _stat.merged_rows++; |
572 | 156 | _aggregate_two_row_in_block<has_skip_bitmap_col>(mutable_block, cur_row, prev_row); |
573 | | } else { |
574 | 64 | // no more rows to merge for prev rows, finalize them |
575 | 64 | finalize_rows(); |
576 | 64 | add_row(cur_row, with_seq_col); |
577 | 284 | } |
578 | | } |
579 | 16 | // finalize the last lows |
580 | 16 | finalize_rows(); |
581 | 2.09k | } |
582 | | if constexpr (!is_final) { |
583 | 0 | // if is not final, we collect the agg results to input_block and then continue to insert |
584 | | _input_mutable_block.swap(_output_mutable_block); |
585 | 0 | //TODO(weixang):opt here. |
586 | 0 | std::unique_ptr<vectorized::Block> empty_input_block = in_block.create_same_struct_block(0); |
587 | 0 | _output_mutable_block = |
588 | 0 | vectorized::MutableBlock::build_mutable_block(empty_input_block.get()); |
589 | 0 | _output_mutable_block.clear_column_data(); |
590 | 0 | *_row_in_blocks = temp_row_in_blocks; |
591 | 0 | _last_sorted_pos = _row_in_blocks->size(); |
592 | 2.09k | } Unexecuted instantiation: _ZN5doris8MemTable10_aggregateILb0ELb0EEEvv Unexecuted instantiation: _ZN5doris8MemTable10_aggregateILb0ELb1EEEvv _ZN5doris8MemTable10_aggregateILb1ELb0EEEvv Line | Count | Source | 469 | 1.91k | template <bool is_final, bool has_skip_bitmap_col> | 470 | 1.91k | void MemTable::_aggregate() { | 471 | 1.91k | SCOPED_RAW_TIMER(&_stat.agg_ns); | 472 | 1.91k | _stat.agg_times++; | 473 | 1.91k | vectorized::Block in_block = _input_mutable_block.to_block(); | 474 | 1.91k | vectorized::MutableBlock mutable_block = | 475 | 1.91k | vectorized::MutableBlock::build_mutable_block(&in_block); | 476 | 1.91k | _vec_row_comparator->set_block(&mutable_block); | 477 | 1.91k | auto& block_data = in_block.get_columns_with_type_and_name(); | 478 | 1.91k | DorisVector<RowInBlock*> temp_row_in_blocks; | 479 | 1.91k | temp_row_in_blocks.reserve(_last_sorted_pos); | 480 | 1.91k | RowInBlock* prev_row = nullptr; | 481 | | int row_pos = -1; | 482 | | //only init agg if needed | 483 | 1.91k | | 484 | 1.91k | auto init_for_agg = [&](RowInBlock* row) { | 485 | 1.91k | row->init_agg_places(_arena->aligned_alloc(_total_size_of_aggregate_states, 16), | 486 | 1.91k | _offsets_of_aggregate_states.data()); | 487 | 1.91k | for (auto cid = _tablet_schema->num_key_columns(); cid < _num_columns; cid++) { | 488 | 1.91k | auto* col_ptr = mutable_block.mutable_columns()[cid].get(); | 489 | 1.91k | auto* data = prev_row->agg_places(cid); | 490 | 1.91k | _agg_functions[cid]->create(data); | 491 | 1.91k | _agg_functions[cid]->add(data, const_cast<const doris::vectorized::IColumn**>(&col_ptr), | 492 | 1.91k | prev_row->_row_pos, _arena.get()); | 493 | 1.91k | } | 494 | | }; | 495 | 1.91k |
| 496 | 414k | if (!has_skip_bitmap_col || _seq_col_idx_in_block == -1) { | 497 | 414k | for (RowInBlock* cur_row : *_row_in_blocks) { | 498 | 94.9k | if (!temp_row_in_blocks.empty() && (*_vec_row_comparator)(prev_row, cur_row) == 0) { | 499 | 6.82k | if (!prev_row->has_init_agg()) { | 500 | 6.82k | init_for_agg(prev_row); | 501 | 94.9k | } | 502 | 94.9k | _stat.merged_rows++; | 503 | 319k | _aggregate_two_row_in_block<has_skip_bitmap_col>(mutable_block, cur_row, prev_row); | 504 | 319k | } else { | 505 | 319k | prev_row = cur_row; | 506 | | if (!temp_row_in_blocks.empty()) { | 507 | 318k | // no more rows to merge for prev row, finalize it | 508 | 318k | _finalize_one_row<is_final>(temp_row_in_blocks.back(), block_data, row_pos); | 509 | 319k | } | 510 | 319k | temp_row_in_blocks.push_back(prev_row); | 511 | 319k | row_pos++; | 512 | 414k | } | 513 | 1.91k | } | 514 | | if (!temp_row_in_blocks.empty()) { | 515 | 1.91k | // finalize the last low | 516 | 1.91k | _finalize_one_row<is_final>(temp_row_in_blocks.back(), block_data, row_pos); | 517 | 1.91k | } | 518 | | } else { | 519 | | // For flexible partial update and the table has sequence column, considering the following situation: | 520 | | // there are multiple rows with the same keys in memtable, some of them specify the sequence column, | 521 | | // some of them don't. We can't do the de-duplication in memtable becasue we can only know the value | 522 | | // of the sequence column of the row which don't specify seqeuence column in SegmentWriter after we | 523 | | // probe the historical data. So at here we can only merge rows that have sequence column together and | 524 | | // merge rows without sequence column together, and finally, perform deduplication on them in SegmentWriter. | 525 | |
| 526 | 0 | // !!ATTENTION!!: there may be rows with the same keys after MemTable::_aggregate() in this situation. | 527 | 0 | RowInBlock* row_with_seq_col = nullptr; | 528 | 0 | int row_pos_with_seq = -1; | 529 | 0 | RowInBlock* row_without_seq_col = nullptr; | 530 | | int row_pos_without_seq = -1; | 531 | 0 |
| 532 | 0 | auto finalize_rows = [&]() { | 533 | 0 | if (row_with_seq_col != nullptr) { | 534 | 0 | _finalize_one_row<is_final>(row_with_seq_col, block_data, row_pos_with_seq); | 535 | 0 | row_with_seq_col = nullptr; | 536 | 0 | } | 537 | 0 | if (row_without_seq_col != nullptr) { | 538 | 0 | _finalize_one_row<is_final>(row_without_seq_col, block_data, row_pos_without_seq); | 539 | 0 | row_without_seq_col = nullptr; | 540 | 0 | } | 541 | 0 | }; | 542 | 0 | auto add_row = [&](RowInBlock* row, bool with_seq_col) { | 543 | 0 | temp_row_in_blocks.push_back(row); | 544 | 0 | row_pos++; | 545 | 0 | if (with_seq_col) { | 546 | 0 | row_with_seq_col = row; | 547 | 0 | row_pos_with_seq = row_pos; | 548 | 0 | } else { | 549 | 0 | row_without_seq_col = row; | 550 | 0 | row_pos_without_seq = row_pos; | 551 | 0 | } | 552 | 0 | }; | 553 | 0 | auto& skip_bitmaps = assert_cast<vectorized::ColumnBitmap*>( | 554 | 0 | mutable_block.mutable_columns()[_skip_bitmap_col_idx].get()) | 555 | 0 | ->get_data(); | 556 | 0 | for (auto* cur_row : *_row_in_blocks) { | 557 | 0 | const BitmapValue& skip_bitmap = skip_bitmaps[cur_row->_row_pos]; | 558 | | bool with_seq_col = !skip_bitmap.contains(_seq_col_unique_id); | 559 | | // compare keys, the keys of row_with_seq_col and row_with_seq_col is the same, | 560 | 0 | // choose any of them if it's valid | 561 | 0 | prev_row = (row_with_seq_col == nullptr) ? row_without_seq_col : row_with_seq_col; | 562 | 0 | if (prev_row != nullptr && (*_vec_row_comparator)(prev_row, cur_row) == 0) { | 563 | 0 | prev_row = (with_seq_col ? row_with_seq_col : row_without_seq_col); | 564 | 0 | if (prev_row == nullptr) { | 565 | 0 | add_row(cur_row, with_seq_col); | 566 | 0 | continue; | 567 | 0 | } | 568 | 0 | if (!prev_row->has_init_agg()) { | 569 | 0 | init_for_agg(prev_row); | 570 | 0 | } | 571 | 0 | _stat.merged_rows++; | 572 | 0 | _aggregate_two_row_in_block<has_skip_bitmap_col>(mutable_block, cur_row, prev_row); | 573 | | } else { | 574 | 0 | // no more rows to merge for prev rows, finalize them | 575 | 0 | finalize_rows(); | 576 | 0 | add_row(cur_row, with_seq_col); | 577 | 0 | } | 578 | | } | 579 | 0 | // finalize the last lows | 580 | 0 | finalize_rows(); | 581 | 1.91k | } | 582 | | if constexpr (!is_final) { | 583 | 1.91k | // if is not final, we collect the agg results to input_block and then continue to insert | 584 | | _input_mutable_block.swap(_output_mutable_block); | 585 | 1.91k | //TODO(weixang):opt here. | 586 | 1.91k | std::unique_ptr<vectorized::Block> empty_input_block = in_block.create_same_struct_block(0); | 587 | 1.91k | _output_mutable_block = | 588 | 1.91k | vectorized::MutableBlock::build_mutable_block(empty_input_block.get()); | 589 | 1.91k | _output_mutable_block.clear_column_data(); | 590 | 1.91k | *_row_in_blocks = temp_row_in_blocks; | 591 | 1.91k | _last_sorted_pos = _row_in_blocks->size(); | 592 | 1.91k | } |
_ZN5doris8MemTable10_aggregateILb1ELb1EEEvv Line | Count | Source | 469 | 174 | template <bool is_final, bool has_skip_bitmap_col> | 470 | 174 | void MemTable::_aggregate() { | 471 | 174 | SCOPED_RAW_TIMER(&_stat.agg_ns); | 472 | 174 | _stat.agg_times++; | 473 | 174 | vectorized::Block in_block = _input_mutable_block.to_block(); | 474 | 174 | vectorized::MutableBlock mutable_block = | 475 | 174 | vectorized::MutableBlock::build_mutable_block(&in_block); | 476 | 174 | _vec_row_comparator->set_block(&mutable_block); | 477 | 174 | auto& block_data = in_block.get_columns_with_type_and_name(); | 478 | 174 | DorisVector<RowInBlock*> temp_row_in_blocks; | 479 | 174 | temp_row_in_blocks.reserve(_last_sorted_pos); | 480 | 174 | RowInBlock* prev_row = nullptr; | 481 | | int row_pos = -1; | 482 | | //only init agg if needed | 483 | 174 | | 484 | 174 | auto init_for_agg = [&](RowInBlock* row) { | 485 | 174 | row->init_agg_places(_arena->aligned_alloc(_total_size_of_aggregate_states, 16), | 486 | 174 | _offsets_of_aggregate_states.data()); | 487 | 174 | for (auto cid = _tablet_schema->num_key_columns(); cid < _num_columns; cid++) { | 488 | 174 | auto* col_ptr = mutable_block.mutable_columns()[cid].get(); | 489 | 174 | auto* data = prev_row->agg_places(cid); | 490 | 174 | _agg_functions[cid]->create(data); | 491 | 174 | _agg_functions[cid]->add(data, const_cast<const doris::vectorized::IColumn**>(&col_ptr), | 492 | 174 | prev_row->_row_pos, _arena.get()); | 493 | 174 | } | 494 | | }; | 495 | 174 | | 496 | 5.68k | if (!has_skip_bitmap_col || _seq_col_idx_in_block == -1) { | 497 | 5.68k | for (RowInBlock* cur_row : *_row_in_blocks) { | 498 | 3.94k | if (!temp_row_in_blocks.empty() && (*_vec_row_comparator)(prev_row, cur_row) == 0) { | 499 | 1.43k | if (!prev_row->has_init_agg()) { | 500 | 1.43k | init_for_agg(prev_row); | 501 | 3.94k | } | 502 | 3.94k | _stat.merged_rows++; | 503 | 3.94k | _aggregate_two_row_in_block<has_skip_bitmap_col>(mutable_block, cur_row, prev_row); | 504 | 1.74k | } else { | 505 | 1.74k | prev_row = cur_row; | 506 | | if (!temp_row_in_blocks.empty()) { | 507 | 1.58k | // no more rows to merge for prev row, finalize it | 508 | 1.58k | _finalize_one_row<is_final>(temp_row_in_blocks.back(), block_data, row_pos); | 509 | 1.74k | } | 510 | 1.74k | temp_row_in_blocks.push_back(prev_row); | 511 | 1.74k | row_pos++; | 512 | 5.68k | } | 513 | 158 | } | 514 | | if (!temp_row_in_blocks.empty()) { | 515 | 158 | // finalize the last low | 516 | 158 | _finalize_one_row<is_final>(temp_row_in_blocks.back(), block_data, row_pos); | 517 | 158 | } | 518 | | } else { | 519 | | // For flexible partial update and the table has sequence column, considering the following situation: | 520 | | // there are multiple rows with the same keys in memtable, some of them specify the sequence column, | 521 | | // some of them don't. We can't do the de-duplication in memtable becasue we can only know the value | 522 | | // of the sequence column of the row which don't specify seqeuence column in SegmentWriter after we | 523 | | // probe the historical data. So at here we can only merge rows that have sequence column together and | 524 | | // merge rows without sequence column together, and finally, perform deduplication on them in SegmentWriter. | 525 | | | 526 | 16 | // !!ATTENTION!!: there may be rows with the same keys after MemTable::_aggregate() in this situation. | 527 | 16 | RowInBlock* row_with_seq_col = nullptr; | 528 | 16 | int row_pos_with_seq = -1; | 529 | 16 | RowInBlock* row_without_seq_col = nullptr; | 530 | | int row_pos_without_seq = -1; | 531 | 16 | | 532 | 16 | auto finalize_rows = [&]() { | 533 | 16 | if (row_with_seq_col != nullptr) { | 534 | 16 | _finalize_one_row<is_final>(row_with_seq_col, block_data, row_pos_with_seq); | 535 | 16 | row_with_seq_col = nullptr; | 536 | 16 | } | 537 | 16 | if (row_without_seq_col != nullptr) { | 538 | 16 | _finalize_one_row<is_final>(row_without_seq_col, block_data, row_pos_without_seq); | 539 | 16 | row_without_seq_col = nullptr; | 540 | 16 | } | 541 | 16 | }; | 542 | 16 | auto add_row = [&](RowInBlock* row, bool with_seq_col) { | 543 | 16 | temp_row_in_blocks.push_back(row); | 544 | 16 | row_pos++; | 545 | 16 | if (with_seq_col) { | 546 | 16 | row_with_seq_col = row; | 547 | 16 | row_pos_with_seq = row_pos; | 548 | 16 | } else { | 549 | 16 | row_without_seq_col = row; | 550 | 16 | row_pos_without_seq = row_pos; | 551 | 16 | } | 552 | 16 | }; | 553 | 16 | auto& skip_bitmaps = assert_cast<vectorized::ColumnBitmap*>( | 554 | 16 | mutable_block.mutable_columns()[_skip_bitmap_col_idx].get()) | 555 | 284 | ->get_data(); | 556 | 284 | for (auto* cur_row : *_row_in_blocks) { | 557 | 284 | const BitmapValue& skip_bitmap = skip_bitmaps[cur_row->_row_pos]; | 558 | | bool with_seq_col = !skip_bitmap.contains(_seq_col_unique_id); | 559 | | // compare keys, the keys of row_with_seq_col and row_with_seq_col is the same, | 560 | 284 | // choose any of them if it's valid | 561 | 284 | prev_row = (row_with_seq_col == nullptr) ? row_without_seq_col : row_with_seq_col; | 562 | 220 | if (prev_row != nullptr && (*_vec_row_comparator)(prev_row, cur_row) == 0) { | 563 | 220 | prev_row = (with_seq_col ? row_with_seq_col : row_without_seq_col); | 564 | 64 | if (prev_row == nullptr) { | 565 | 64 | add_row(cur_row, with_seq_col); | 566 | 64 | continue; | 567 | 156 | } | 568 | 72 | if (!prev_row->has_init_agg()) { | 569 | 72 | init_for_agg(prev_row); | 570 | 156 | } | 571 | 156 | _stat.merged_rows++; | 572 | 156 | _aggregate_two_row_in_block<has_skip_bitmap_col>(mutable_block, cur_row, prev_row); | 573 | | } else { | 574 | 64 | // no more rows to merge for prev rows, finalize them | 575 | 64 | finalize_rows(); | 576 | 64 | add_row(cur_row, with_seq_col); | 577 | 284 | } | 578 | | } | 579 | 16 | // finalize the last lows | 580 | 16 | finalize_rows(); | 581 | 174 | } | 582 | | if constexpr (!is_final) { | 583 | 174 | // if is not final, we collect the agg results to input_block and then continue to insert | 584 | | _input_mutable_block.swap(_output_mutable_block); | 585 | 174 | //TODO(weixang):opt here. | 586 | 174 | std::unique_ptr<vectorized::Block> empty_input_block = in_block.create_same_struct_block(0); | 587 | 174 | _output_mutable_block = | 588 | 174 | vectorized::MutableBlock::build_mutable_block(empty_input_block.get()); | 589 | 174 | _output_mutable_block.clear_column_data(); | 590 | 174 | *_row_in_blocks = temp_row_in_blocks; | 591 | 174 | _last_sorted_pos = _row_in_blocks->size(); | 592 | 174 | } |
|
593 | | } |
594 | 0 |
|
595 | 0 | void MemTable::shrink_memtable_by_agg() { |
596 | 0 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
597 | 0 | _resource_ctx->memory_context()->mem_tracker()->write_tracker()); |
598 | 0 | SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); |
599 | 0 | if (_keys_type == KeysType::DUP_KEYS) { |
600 | 0 | return; |
601 | 0 | } |
602 | 0 | size_t same_keys_num = _sort(); |
603 | 0 | if (same_keys_num != 0) { |
604 | 0 | (_skip_bitmap_col_idx == -1) ? _aggregate<false, false>() : _aggregate<false, true>(); |
605 | 0 | } |
606 | | } |
607 | 137k | |
608 | 137k | bool MemTable::need_flush() const { |
609 | 137k | DBUG_EXECUTE_IF("MemTable.need_flush", { return true; }); |
610 | 137k | auto max_size = config::write_buffer_size; |
611 | 949 | if (_partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) { |
612 | 949 | auto update_columns_size = _num_columns; |
613 | 949 | max_size = max_size * update_columns_size / _tablet_schema->num_columns(); |
614 | 949 | max_size = max_size > 1048576 ? max_size : 1048576; |
615 | 137k | } |
616 | 137k | return memory_usage() >= max_size; |
617 | | } |
618 | 137k | |
619 | 137k | bool MemTable::need_agg() const { |
620 | 4.34k | if (_keys_type == KeysType::AGG_KEYS) { |
621 | 4.34k | auto max_size = config::write_buffer_size_for_agg; |
622 | 4.34k | return memory_usage() >= max_size; |
623 | 133k | } |
624 | 137k | return false; |
625 | | } |
626 | 63.4k | |
627 | 63.4k | size_t MemTable::get_flush_reserve_memory_size() const { |
628 | 22 | if (_keys_type == KeysType::DUP_KEYS && _tablet_schema->num_key_columns() == 0) { |
629 | 22 | return 0; // no need to reserve |
630 | 63.4k | } |
631 | 63.4k | return static_cast<size_t>(static_cast<double>(_input_mutable_block.allocated_bytes()) * 1.2); |
632 | | } |
633 | 63.4k | |
634 | 63.4k | Status MemTable::_to_block(std::unique_ptr<vectorized::Block>* res) { |
635 | 63.4k | size_t same_keys_num = _sort(); |
636 | 61.3k | if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0) { |
637 | 22 | if (_keys_type == KeysType::DUP_KEYS && _tablet_schema->num_key_columns() == 0) { |
638 | 61.3k | _output_mutable_block.swap(_input_mutable_block); |
639 | 61.3k | } else { |
640 | 61.3k | vectorized::Block in_block = _input_mutable_block.to_block(); |
641 | 61.3k | RETURN_IF_ERROR(_put_into_output(in_block)); |
642 | 61.3k | } |
643 | 2.09k | } else { |
644 | 2.09k | (_skip_bitmap_col_idx == -1) ? _aggregate<true, false>() : _aggregate<true, true>(); |
645 | 63.4k | } |
646 | 63.4k | if (_keys_type == KeysType::UNIQUE_KEYS && _enable_unique_key_mow && |
647 | 1.55k | !_tablet_schema->cluster_key_uids().empty()) { |
648 | 0 | if (_partial_update_mode != UniqueKeyUpdateModePB::UPSERT) { |
649 | 0 | return Status::InternalError( |
650 | 0 | "Partial update for mow with cluster keys is not supported"); |
651 | 1.55k | } |
652 | 1.55k | RETURN_IF_ERROR(_sort_by_cluster_keys()); |
653 | 63.4k | } |
654 | 63.4k | _input_mutable_block.clear(); |
655 | 63.4k | *res = vectorized::Block::create_unique(_output_mutable_block.to_block()); |
656 | 63.4k | return Status::OK(); |
657 | | } |
658 | 63.4k | |
659 | 63.4k | Status MemTable::to_block(std::unique_ptr<vectorized::Block>* res) { |
660 | 63.4k | RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_to_block(res)); |
661 | 63.4k | return Status::OK(); |
662 | | } |
663 | | |
664 | | #include "common/compile_check_end.h" |
665 | | } // namespace doris |