/root/doris/be/src/olap/memtable.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "olap/memtable.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/olap_file.pb.h> |
22 | | #include <pdqsort.h> |
23 | | |
24 | | #include <algorithm> |
25 | | #include <cstddef> |
26 | | #include <limits> |
27 | | #include <shared_mutex> |
28 | | #include <string> |
29 | | #include <utility> |
30 | | #include <vector> |
31 | | |
32 | | #include "common/config.h" |
33 | | #include "common/consts.h" |
34 | | #include "common/logging.h" |
35 | | #include "olap/olap_define.h" |
36 | | #include "olap/rowset/beta_rowset.h" |
37 | | #include "olap/rowset/rowset_writer.h" |
38 | | #include "olap/rowset/segment_v2/segment.h" |
39 | | #include "olap/schema.h" |
40 | | #include "olap/schema_change.h" |
41 | | #include "olap/tablet_schema.h" |
42 | | #include "runtime/descriptors.h" |
43 | | #include "runtime/exec_env.h" |
44 | | #include "runtime/load_channel_mgr.h" |
45 | | #include "runtime/thread_context.h" |
46 | | #include "tablet_meta.h" |
47 | | #include "util/doris_metrics.h" |
48 | | #include "util/runtime_profile.h" |
49 | | #include "util/stopwatch.hpp" |
50 | | #include "util/string_util.h" |
51 | | #include "vec/aggregate_functions/aggregate_function_reader.h" |
52 | | #include "vec/aggregate_functions/aggregate_function_simple_factory.h" |
53 | | #include "vec/columns/column.h" |
54 | | #include "vec/columns/column_object.h" |
55 | | #include "vec/columns/column_string.h" |
56 | | #include "vec/common/assert_cast.h" |
57 | | #include "vec/common/schema_util.h" |
58 | | #include "vec/core/column_with_type_and_name.h" |
59 | | #include "vec/data_types/data_type.h" |
60 | | #include "vec/data_types/data_type_factory.hpp" |
61 | | #include "vec/json/path_in_data.h" |
62 | | #include "vec/jsonb/serialize.h" |
63 | | |
64 | | namespace doris { |
65 | | using namespace ErrorCode; |
66 | | |
67 | | MemTable::MemTable(TabletSharedPtr tablet, Schema* schema, const TabletSchema* tablet_schema, |
68 | | const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc, |
69 | | RowsetWriter* rowset_writer, std::shared_ptr<MowContext> mow_context, |
70 | | PartialUpdateInfo* partial_update_info, |
71 | | const std::shared_ptr<MemTracker>& insert_mem_tracker, |
72 | | const std::shared_ptr<MemTracker>& flush_mem_tracker) |
73 | | : _tablet(std::move(tablet)), |
74 | | _keys_type(_tablet->keys_type()), |
75 | | _schema(schema), |
76 | | _tablet_schema(tablet_schema), |
77 | | _insert_mem_tracker(insert_mem_tracker), |
78 | | _flush_mem_tracker(flush_mem_tracker), |
79 | | _schema_size(_schema->schema_size()), |
80 | | _rowset_writer(rowset_writer), |
81 | | _is_first_insertion(true), |
82 | | _agg_functions(schema->num_columns()), |
83 | | _offsets_of_aggregate_states(schema->num_columns()), |
84 | | _total_size_of_aggregate_states(0), |
85 | | _mem_usage(0), |
86 | 8 | _mow_context(mow_context) { |
87 | | #ifndef BE_TEST |
88 | | _insert_mem_tracker_use_hook = std::make_unique<MemTracker>( |
89 | | fmt::format("MemTableHookInsert:TabletId={}", std::to_string(tablet_id())), |
90 | | ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker()); |
91 | | #else |
92 | 8 | _insert_mem_tracker_use_hook = std::make_unique<MemTracker>( |
93 | 8 | fmt::format("MemTableHookInsert:TabletId={}", std::to_string(tablet_id()))); |
94 | 8 | #endif |
95 | 8 | _arena = std::make_unique<vectorized::Arena>(); |
96 | 8 | _vec_row_comparator = std::make_shared<RowInBlockComparator>(_schema); |
97 | | // TODO: Support ZOrderComparator in the future |
98 | 8 | _init_columns_offset_by_slot_descs(slot_descs, tuple_desc); |
99 | 8 | _num_columns = _tablet_schema->num_columns(); |
100 | 8 | if (partial_update_info != nullptr) { |
101 | 8 | _is_partial_update = partial_update_info->is_partial_update; |
102 | 8 | if (_is_partial_update) { |
103 | 0 | _num_columns = partial_update_info->partial_update_input_columns.size(); |
104 | 0 | } |
105 | 8 | } |
106 | 8 | } |
107 | | void MemTable::_init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs, |
108 | 8 | const TupleDescriptor* tuple_desc) { |
109 | 71 | for (auto slot_desc : *slot_descs) { |
110 | 71 | const auto& slots = tuple_desc->slots(); |
111 | 581 | for (int j = 0; j < slots.size(); ++j) { |
112 | 581 | if (slot_desc->id() == slots[j]->id()) { |
113 | 71 | _column_offset.emplace_back(j); |
114 | 71 | break; |
115 | 71 | } |
116 | 581 | } |
117 | 71 | } |
118 | 8 | } |
119 | | |
120 | 6 | void MemTable::_init_agg_functions(const vectorized::Block* block) { |
121 | 30 | for (uint32_t cid = _schema->num_key_columns(); cid < _num_columns; ++cid) { |
122 | 24 | vectorized::AggregateFunctionPtr function; |
123 | 24 | if (_keys_type == KeysType::UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write()) { |
124 | | // In such table, non-key column's aggregation type is NONE, so we need to construct |
125 | | // the aggregate function manually. |
126 | 6 | function = vectorized::AggregateFunctionSimpleFactory::instance().get( |
127 | 6 | "replace_load", {block->get_data_type(cid)}, |
128 | 6 | block->get_data_type(cid)->is_nullable()); |
129 | 18 | } else { |
130 | 18 | function = |
131 | 18 | _tablet_schema->column(cid).get_aggregate_function(vectorized::AGG_LOAD_SUFFIX); |
132 | 18 | if (function == nullptr) { |
133 | 0 | LOG(WARNING) << "column get aggregate function failed, column=" |
134 | 0 | << _tablet_schema->column(cid).name(); |
135 | 0 | } |
136 | 18 | } |
137 | | |
138 | 24 | DCHECK(function != nullptr); |
139 | 24 | _agg_functions[cid] = function; |
140 | 24 | } |
141 | | |
142 | 30 | for (uint32_t cid = _schema->num_key_columns(); cid < _num_columns; ++cid) { |
143 | 24 | _offsets_of_aggregate_states[cid] = _total_size_of_aggregate_states; |
144 | 24 | _total_size_of_aggregate_states += _agg_functions[cid]->size_of_data(); |
145 | | |
146 | | // If not the last aggregate_state, we need pad it so that next aggregate_state will be aligned. |
147 | 24 | if (cid + 1 < _num_columns) { |
148 | 18 | size_t alignment_of_next_state = _agg_functions[cid + 1]->align_of_data(); |
149 | | |
150 | | /// Extend total_size to next alignment requirement |
151 | | /// Add padding by rounding up 'total_size_of_aggregate_states' to be a multiplier of alignment_of_next_state. |
152 | 18 | _total_size_of_aggregate_states = |
153 | 18 | (_total_size_of_aggregate_states + alignment_of_next_state - 1) / |
154 | 18 | alignment_of_next_state * alignment_of_next_state; |
155 | 18 | } |
156 | 24 | } |
157 | 6 | } |
158 | | |
159 | 8 | MemTable::~MemTable() { |
160 | 8 | if (_keys_type != KeysType::DUP_KEYS) { |
161 | 17 | for (auto it = _row_in_blocks.begin(); it != _row_in_blocks.end(); it++) { |
162 | 9 | if (!(*it)->has_init_agg()) { |
163 | 9 | continue; |
164 | 9 | } |
165 | | // We should release agg_places here, because they are not released when a |
166 | | // load is canceled. |
167 | 0 | for (size_t i = _schema->num_key_columns(); i < _num_columns; ++i) { |
168 | 0 | auto function = _agg_functions[i]; |
169 | 0 | DCHECK(function != nullptr); |
170 | 0 | function->destroy((*it)->agg_places(i)); |
171 | 0 | } |
172 | 0 | } |
173 | 8 | } |
174 | 8 | std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(), std::default_delete<RowInBlock>()); |
175 | 8 | _insert_mem_tracker->release(_mem_usage); |
176 | 8 | _flush_mem_tracker->set_consumption(0); |
177 | 8 | DCHECK_EQ(_insert_mem_tracker->consumption(), 0) |
178 | 0 | << std::endl |
179 | 0 | << MemTracker::log_usage(_insert_mem_tracker->make_snapshot()); |
180 | 8 | DCHECK_EQ(_flush_mem_tracker->consumption(), 0); |
181 | 8 | } |
182 | | |
183 | 1 | int RowInBlockComparator::operator()(const RowInBlock* left, const RowInBlock* right) const { |
184 | 1 | return _pblock->compare_at(left->_row_pos, right->_row_pos, _schema->num_key_columns(), |
185 | 1 | *_pblock, -1); |
186 | 1 | } |
187 | | |
188 | 9 | void MemTable::insert(const vectorized::Block* input_block, const std::vector<int>& row_idxs) { |
189 | 9 | SCOPED_CONSUME_MEM_TRACKER(_insert_mem_tracker_use_hook.get()); |
190 | 9 | vectorized::Block target_block = *input_block; |
191 | 9 | if (!_tablet_schema->is_dynamic_schema()) { |
192 | | // This insert may belong to a rollup tablet, rollup columns is a subset of base table |
193 | | // but for dynamic table, it's need full columns, so input_block should ignore _column_offset |
194 | | // of each column and avoid copy_block |
195 | 9 | target_block = input_block->copy_block(_column_offset); |
196 | 9 | } |
197 | 9 | if (_is_first_insertion) { |
198 | 6 | _is_first_insertion = false; |
199 | 6 | auto cloneBlock = target_block.clone_without_columns(); |
200 | 6 | _input_mutable_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock); |
201 | 6 | _vec_row_comparator->set_block(&_input_mutable_block); |
202 | 6 | _output_mutable_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock); |
203 | 6 | if (_keys_type != KeysType::DUP_KEYS) { |
204 | 6 | _init_agg_functions(&target_block); |
205 | 6 | } |
206 | 6 | if (_tablet_schema->has_sequence_col()) { |
207 | 5 | if (_is_partial_update) { |
208 | | // for unique key partial update, sequence column index in block |
209 | | // may be different with the index in `_tablet_schema` |
210 | 0 | for (size_t i = 0; i < cloneBlock.columns(); i++) { |
211 | 0 | if (cloneBlock.get_by_position(i).name == SEQUENCE_COL) { |
212 | 0 | _seq_col_idx_in_block = i; |
213 | 0 | break; |
214 | 0 | } |
215 | 0 | } |
216 | 5 | } else { |
217 | 5 | _seq_col_idx_in_block = _tablet_schema->sequence_col_idx(); |
218 | 5 | } |
219 | 5 | } |
220 | 6 | } |
221 | | |
222 | 9 | auto num_rows = row_idxs.size(); |
223 | 9 | size_t cursor_in_mutableblock = _input_mutable_block.rows(); |
224 | 9 | _input_mutable_block.add_rows(&target_block, row_idxs.data(), row_idxs.data() + num_rows); |
225 | 9 | size_t input_size = target_block.allocated_bytes() * num_rows / target_block.rows(); |
226 | 9 | _mem_usage += input_size; |
227 | 9 | _insert_mem_tracker->consume(input_size); |
228 | 18 | for (int i = 0; i < num_rows; i++) { |
229 | 9 | _row_in_blocks.emplace_back(new RowInBlock {cursor_in_mutableblock + i}); |
230 | 9 | } |
231 | | |
232 | 9 | _stat.raw_rows += num_rows; |
233 | 9 | } |
234 | | |
235 | | void MemTable::_aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, |
236 | 1 | RowInBlock* new_row, RowInBlock* row_in_skiplist) { |
237 | 1 | if (_tablet_schema->has_sequence_col() && _seq_col_idx_in_block >= 0) { |
238 | 1 | DCHECK_LT(_seq_col_idx_in_block, mutable_block.columns()); |
239 | 1 | auto col_ptr = mutable_block.mutable_columns()[_seq_col_idx_in_block].get(); |
240 | 1 | auto res = col_ptr->compare_at(row_in_skiplist->_row_pos, new_row->_row_pos, *col_ptr, -1); |
241 | | // dst sequence column larger than src, don't need to update |
242 | 1 | if (res > 0) { |
243 | 1 | return; |
244 | 1 | } |
245 | | // need to update the row pos in skiplist to the new row pos when has |
246 | | // sequence column |
247 | 0 | row_in_skiplist->_row_pos = new_row->_row_pos; |
248 | 0 | } |
249 | | // dst is non-sequence row, or dst sequence is smaller |
250 | 0 | for (uint32_t cid = _schema->num_key_columns(); cid < _num_columns; ++cid) { |
251 | 0 | auto col_ptr = mutable_block.mutable_columns()[cid].get(); |
252 | 0 | _agg_functions[cid]->add(row_in_skiplist->agg_places(cid), |
253 | 0 | const_cast<const doris::vectorized::IColumn**>(&col_ptr), |
254 | 0 | new_row->_row_pos, _arena.get()); |
255 | 0 | } |
256 | 0 | } |
257 | 5 | void MemTable::_put_into_output(vectorized::Block& in_block) { |
258 | 5 | SCOPED_RAW_TIMER(&_stat.put_into_output_ns); |
259 | 5 | std::vector<int> row_pos_vec; |
260 | 5 | DCHECK(in_block.rows() <= std::numeric_limits<int>::max()); |
261 | 5 | row_pos_vec.reserve(in_block.rows()); |
262 | 12 | for (int i = 0; i < _row_in_blocks.size(); i++) { |
263 | 7 | row_pos_vec.emplace_back(_row_in_blocks[i]->_row_pos); |
264 | 7 | } |
265 | 5 | _output_mutable_block.add_rows(&in_block, row_pos_vec.data(), |
266 | 5 | row_pos_vec.data() + in_block.rows()); |
267 | 5 | } |
268 | | |
269 | 6 | size_t MemTable::_sort() { |
270 | 6 | SCOPED_RAW_TIMER(&_stat.sort_ns); |
271 | 6 | _stat.sort_times++; |
272 | 6 | size_t same_keys_num = 0; |
273 | | // sort new rows |
274 | 6 | Tie tie = Tie(_last_sorted_pos, _row_in_blocks.size()); |
275 | 27 | for (size_t i = 0; i < _schema->num_key_columns(); i++) { |
276 | 21 | auto cmp = [&](const RowInBlock* lhs, const RowInBlock* rhs) -> int { |
277 | 8 | return _input_mutable_block.compare_one_column(lhs->_row_pos, rhs->_row_pos, i, -1); |
278 | 8 | }; |
279 | 21 | _sort_one_column(_row_in_blocks, tie, cmp); |
280 | 21 | } |
281 | 6 | bool is_dup = (_keys_type == KeysType::DUP_KEYS); |
282 | | // sort extra round by _row_pos to make the sort stable |
283 | 6 | auto iter = tie.iter(); |
284 | 7 | while (iter.next()) { |
285 | 1 | pdqsort(std::next(_row_in_blocks.begin(), iter.left()), |
286 | 1 | std::next(_row_in_blocks.begin(), iter.right()), |
287 | 1 | [&is_dup](const RowInBlock* lhs, const RowInBlock* rhs) -> bool { |
288 | 1 | return is_dup ? lhs->_row_pos > rhs->_row_pos : lhs->_row_pos < rhs->_row_pos; |
289 | 1 | }); |
290 | 1 | same_keys_num += iter.right() - iter.left(); |
291 | 1 | } |
292 | | // merge new rows and old rows |
293 | 6 | _vec_row_comparator->set_block(&_input_mutable_block); |
294 | 6 | auto cmp_func = [this, is_dup, &same_keys_num](const RowInBlock* l, |
295 | 6 | const RowInBlock* r) -> bool { |
296 | 0 | auto value = (*(this->_vec_row_comparator))(l, r); |
297 | 0 | if (value == 0) { |
298 | 0 | same_keys_num++; |
299 | 0 | return is_dup ? l->_row_pos > r->_row_pos : l->_row_pos < r->_row_pos; |
300 | 0 | } else { |
301 | 0 | return value < 0; |
302 | 0 | } |
303 | 0 | }; |
304 | 6 | auto new_row_it = std::next(_row_in_blocks.begin(), _last_sorted_pos); |
305 | 6 | std::inplace_merge(_row_in_blocks.begin(), new_row_it, _row_in_blocks.end(), cmp_func); |
306 | 6 | _last_sorted_pos = _row_in_blocks.size(); |
307 | 6 | return same_keys_num; |
308 | 6 | } |
309 | | |
310 | | void MemTable::_sort_one_column(std::vector<RowInBlock*>& row_in_blocks, Tie& tie, |
311 | 21 | std::function<int(const RowInBlock*, const RowInBlock*)> cmp) { |
312 | 21 | auto iter = tie.iter(); |
313 | 25 | while (iter.next()) { |
314 | 4 | pdqsort(std::next(row_in_blocks.begin(), iter.left()), |
315 | 4 | std::next(row_in_blocks.begin(), iter.right()), |
316 | 4 | [&cmp](auto lhs, auto rhs) -> bool { return cmp(lhs, rhs) < 0; }); |
317 | 4 | tie[iter.left()] = 0; |
318 | 8 | for (int i = iter.left() + 1; i < iter.right(); i++) { |
319 | 4 | tie[i] = (cmp(row_in_blocks[i - 1], row_in_blocks[i]) == 0); |
320 | 4 | } |
321 | 4 | } |
322 | 21 | } |
323 | | |
324 | | template <bool is_final> |
325 | | void MemTable::_finalize_one_row(RowInBlock* row, |
326 | | const vectorized::ColumnsWithTypeAndName& block_data, |
327 | 1 | int row_pos) { |
328 | | // move key columns |
329 | 3 | for (size_t i = 0; i < _schema->num_key_columns(); ++i) { |
330 | 2 | _output_mutable_block.get_column_by_position(i)->insert_from(*block_data[i].column.get(), |
331 | 2 | row->_row_pos); |
332 | 2 | } |
333 | 1 | if (row->has_init_agg()) { |
334 | | // get value columns from agg_places |
335 | 4 | for (size_t i = _schema->num_key_columns(); i < _num_columns; ++i) { |
336 | 3 | auto function = _agg_functions[i]; |
337 | 3 | auto agg_place = row->agg_places(i); |
338 | 3 | auto col_ptr = _output_mutable_block.get_column_by_position(i).get(); |
339 | 3 | function->insert_result_into(agg_place, *col_ptr); |
340 | 3 | if constexpr (is_final) { |
341 | 0 | function->destroy(agg_place); |
342 | 0 | } else { |
343 | 0 | function->reset(agg_place); |
344 | 0 | function->add(agg_place, const_cast<const doris::vectorized::IColumn**>(&col_ptr), |
345 | 0 | row_pos, _arena.get()); |
346 | 0 | } |
347 | 3 | } |
348 | 1 | if constexpr (is_final) { |
349 | 1 | row->remove_init_agg(); |
350 | 1 | } |
351 | 1 | } else { |
352 | | // move columns for rows do not need agg |
353 | 0 | for (size_t i = _schema->num_key_columns(); i < _num_columns; ++i) { |
354 | 0 | _output_mutable_block.get_column_by_position(i)->insert_from( |
355 | 0 | *block_data[i].column.get(), row->_row_pos); |
356 | 0 | } |
357 | 0 | } |
358 | 1 | if constexpr (!is_final) { |
359 | 0 | row->_row_pos = row_pos; |
360 | 0 | } |
361 | 1 | } Unexecuted instantiation: _ZN5doris8MemTable17_finalize_one_rowILb0EEEvPNS_10RowInBlockERKSt6vectorINS_10vectorized21ColumnWithTypeAndNameESaIS6_EEi _ZN5doris8MemTable17_finalize_one_rowILb1EEEvPNS_10RowInBlockERKSt6vectorINS_10vectorized21ColumnWithTypeAndNameESaIS6_EEi Line | Count | Source | 327 | 1 | int row_pos) { | 328 | | // move key columns | 329 | 3 | for (size_t i = 0; i < _schema->num_key_columns(); ++i) { | 330 | 2 | _output_mutable_block.get_column_by_position(i)->insert_from(*block_data[i].column.get(), | 331 | 2 | row->_row_pos); | 332 | 2 | } | 333 | 1 | if (row->has_init_agg()) { | 334 | | // get value columns from agg_places | 335 | 4 | for (size_t i = _schema->num_key_columns(); i < _num_columns; ++i) { | 336 | 3 | auto function = _agg_functions[i]; | 337 | 3 | auto agg_place = row->agg_places(i); | 338 | 3 | auto col_ptr = _output_mutable_block.get_column_by_position(i).get(); | 339 | 3 | function->insert_result_into(agg_place, *col_ptr); | 340 | 3 | if constexpr (is_final) { | 341 | 3 | function->destroy(agg_place); | 342 | 3 | } else { | 343 | 3 | function->reset(agg_place); | 344 | 3 | function->add(agg_place, const_cast<const doris::vectorized::IColumn**>(&col_ptr), | 345 | 3 | row_pos, _arena.get()); | 346 | 3 | } | 347 | 3 | } | 348 | 1 | if constexpr (is_final) { | 349 | 1 | row->remove_init_agg(); | 350 | 1 | } | 351 | 1 | } else { | 352 | | // move columns for rows do not need agg | 353 | 0 | for (size_t i = _schema->num_key_columns(); i < _num_columns; ++i) { | 354 | 0 | _output_mutable_block.get_column_by_position(i)->insert_from( | 355 | 0 | *block_data[i].column.get(), row->_row_pos); | 356 | 0 | } | 357 | 0 | } | 358 | 1 | if constexpr (!is_final) { | 359 | 1 | row->_row_pos = row_pos; | 360 | 1 | } | 361 | 1 | } |
|
362 | | |
363 | | template <bool is_final> |
364 | 1 | void MemTable::_aggregate() { |
365 | 1 | SCOPED_RAW_TIMER(&_stat.agg_ns); |
366 | 1 | _stat.agg_times++; |
367 | 1 | vectorized::Block in_block = _input_mutable_block.to_block(); |
368 | 1 | vectorized::MutableBlock mutable_block = |
369 | 1 | vectorized::MutableBlock::build_mutable_block(&in_block); |
370 | 1 | _vec_row_comparator->set_block(&mutable_block); |
371 | 1 | auto& block_data = in_block.get_columns_with_type_and_name(); |
372 | 1 | std::vector<RowInBlock*> temp_row_in_blocks; |
373 | 1 | temp_row_in_blocks.reserve(_last_sorted_pos); |
374 | 1 | RowInBlock* prev_row = nullptr; |
375 | 1 | int row_pos = -1; |
376 | | //only init agg if needed |
377 | 3 | for (int i = 0; i < _row_in_blocks.size(); i++) { |
378 | 2 | if (!temp_row_in_blocks.empty() && |
379 | 2 | (*_vec_row_comparator)(prev_row, _row_in_blocks[i]) == 0) { |
380 | 1 | if (!prev_row->has_init_agg()) { |
381 | 1 | prev_row->init_agg_places( |
382 | 1 | _arena->aligned_alloc(_total_size_of_aggregate_states, 16), |
383 | 1 | _offsets_of_aggregate_states.data()); |
384 | 4 | for (auto cid = _tablet_schema->num_key_columns(); cid < _num_columns; cid++) { |
385 | 3 | auto col_ptr = mutable_block.mutable_columns()[cid].get(); |
386 | 3 | auto data = prev_row->agg_places(cid); |
387 | 3 | _agg_functions[cid]->create(data); |
388 | 3 | _agg_functions[cid]->add( |
389 | 3 | data, const_cast<const doris::vectorized::IColumn**>(&col_ptr), |
390 | 3 | prev_row->_row_pos, _arena.get()); |
391 | 3 | } |
392 | 1 | } |
393 | 1 | _stat.merged_rows++; |
394 | 1 | _aggregate_two_row_in_block(mutable_block, _row_in_blocks[i], prev_row); |
395 | 1 | } else { |
396 | 1 | prev_row = _row_in_blocks[i]; |
397 | 1 | if (!temp_row_in_blocks.empty()) { |
398 | | // no more rows to merge for prev row, finalize it |
399 | 0 | _finalize_one_row<is_final>(temp_row_in_blocks.back(), block_data, row_pos); |
400 | 0 | } |
401 | 1 | temp_row_in_blocks.push_back(prev_row); |
402 | 1 | row_pos++; |
403 | 1 | } |
404 | 2 | } |
405 | 1 | if (!temp_row_in_blocks.empty()) { |
406 | | // finalize the last low |
407 | 1 | _finalize_one_row<is_final>(temp_row_in_blocks.back(), block_data, row_pos); |
408 | 1 | } |
409 | 1 | if constexpr (!is_final) { |
410 | | // if is not final, we collect the agg results to input_block and then continue to insert |
411 | 0 | size_t shrunked_after_agg = _output_mutable_block.allocated_bytes(); |
412 | | // flush will not run here, so will not duplicate `_flush_mem_tracker` |
413 | 0 | _insert_mem_tracker->consume(shrunked_after_agg - _mem_usage); |
414 | 0 | _mem_usage = shrunked_after_agg; |
415 | 0 | _input_mutable_block.swap(_output_mutable_block); |
416 | | //TODO(weixang):opt here. |
417 | 0 | std::unique_ptr<vectorized::Block> empty_input_block = in_block.create_same_struct_block(0); |
418 | 0 | _output_mutable_block = |
419 | 0 | vectorized::MutableBlock::build_mutable_block(empty_input_block.get()); |
420 | 0 | _output_mutable_block.clear_column_data(); |
421 | 0 | _row_in_blocks = temp_row_in_blocks; |
422 | 0 | _last_sorted_pos = _row_in_blocks.size(); |
423 | 0 | } |
424 | 1 | } Unexecuted instantiation: _ZN5doris8MemTable10_aggregateILb0EEEvv _ZN5doris8MemTable10_aggregateILb1EEEvv Line | Count | Source | 364 | 1 | void MemTable::_aggregate() { | 365 | 1 | SCOPED_RAW_TIMER(&_stat.agg_ns); | 366 | 1 | _stat.agg_times++; | 367 | 1 | vectorized::Block in_block = _input_mutable_block.to_block(); | 368 | 1 | vectorized::MutableBlock mutable_block = | 369 | 1 | vectorized::MutableBlock::build_mutable_block(&in_block); | 370 | 1 | _vec_row_comparator->set_block(&mutable_block); | 371 | 1 | auto& block_data = in_block.get_columns_with_type_and_name(); | 372 | 1 | std::vector<RowInBlock*> temp_row_in_blocks; | 373 | 1 | temp_row_in_blocks.reserve(_last_sorted_pos); | 374 | 1 | RowInBlock* prev_row = nullptr; | 375 | 1 | int row_pos = -1; | 376 | | //only init agg if needed | 377 | 3 | for (int i = 0; i < _row_in_blocks.size(); i++) { | 378 | 2 | if (!temp_row_in_blocks.empty() && | 379 | 2 | (*_vec_row_comparator)(prev_row, _row_in_blocks[i]) == 0) { | 380 | 1 | if (!prev_row->has_init_agg()) { | 381 | 1 | prev_row->init_agg_places( | 382 | 1 | _arena->aligned_alloc(_total_size_of_aggregate_states, 16), | 383 | 1 | _offsets_of_aggregate_states.data()); | 384 | 4 | for (auto cid = _tablet_schema->num_key_columns(); cid < _num_columns; cid++) { | 385 | 3 | auto col_ptr = mutable_block.mutable_columns()[cid].get(); | 386 | 3 | auto data = prev_row->agg_places(cid); | 387 | 3 | _agg_functions[cid]->create(data); | 388 | 3 | _agg_functions[cid]->add( | 389 | 3 | data, const_cast<const doris::vectorized::IColumn**>(&col_ptr), | 390 | 3 | prev_row->_row_pos, _arena.get()); | 391 | 3 | } | 392 | 1 | } | 393 | 1 | _stat.merged_rows++; | 394 | 1 | _aggregate_two_row_in_block(mutable_block, _row_in_blocks[i], prev_row); | 395 | 1 | } else { | 396 | 1 | prev_row = _row_in_blocks[i]; | 397 | 1 | if (!temp_row_in_blocks.empty()) { | 398 | | // no more rows to merge for prev row, finalize it | 399 | 0 | _finalize_one_row<is_final>(temp_row_in_blocks.back(), block_data, row_pos); | 400 | 0 | } | 401 | 1 | temp_row_in_blocks.push_back(prev_row); | 402 | 1 | row_pos++; | 403 | 1 | } | 404 | 2 | } | 405 | 1 | if (!temp_row_in_blocks.empty()) { | 406 | | // finalize the last low | 407 | 1 | _finalize_one_row<is_final>(temp_row_in_blocks.back(), block_data, row_pos); | 408 | 1 | } | 409 | 1 | if constexpr (!is_final) { | 410 | | // if is not final, we collect the agg results to input_block and then continue to insert | 411 | 1 | size_t shrunked_after_agg = _output_mutable_block.allocated_bytes(); | 412 | | // flush will not run here, so will not duplicate `_flush_mem_tracker` | 413 | 1 | _insert_mem_tracker->consume(shrunked_after_agg - _mem_usage); | 414 | 1 | _mem_usage = shrunked_after_agg; | 415 | 1 | _input_mutable_block.swap(_output_mutable_block); | 416 | | //TODO(weixang):opt here. | 417 | 1 | std::unique_ptr<vectorized::Block> empty_input_block = in_block.create_same_struct_block(0); | 418 | 1 | _output_mutable_block = | 419 | 1 | vectorized::MutableBlock::build_mutable_block(empty_input_block.get()); | 420 | 1 | _output_mutable_block.clear_column_data(); | 421 | 1 | _row_in_blocks = temp_row_in_blocks; | 422 | 1 | _last_sorted_pos = _row_in_blocks.size(); | 423 | 1 | } | 424 | 1 | } |
|
425 | | |
426 | 0 | void MemTable::shrink_memtable_by_agg() { |
427 | 0 | SCOPED_CONSUME_MEM_TRACKER(_insert_mem_tracker_use_hook.get()); |
428 | 0 | if (_keys_type == KeysType::DUP_KEYS) { |
429 | 0 | return; |
430 | 0 | } |
431 | 0 | size_t same_keys_num = _sort(); |
432 | 0 | if (same_keys_num != 0) { |
433 | 0 | _aggregate<false>(); |
434 | 0 | } |
435 | 0 | } |
436 | | |
437 | 9 | bool MemTable::need_flush() const { |
438 | 9 | auto max_size = config::write_buffer_size; |
439 | 9 | if (_is_partial_update) { |
440 | 0 | auto update_columns_size = _num_columns; |
441 | 0 | max_size = max_size * update_columns_size / _tablet_schema->num_columns(); |
442 | 0 | max_size = max_size > 1048576 ? max_size : 1048576; |
443 | 0 | } |
444 | 9 | return memory_usage() >= max_size; |
445 | 9 | } |
446 | | |
447 | 9 | bool MemTable::need_agg() const { |
448 | 9 | if (_keys_type == KeysType::AGG_KEYS) { |
449 | 1 | auto max_size = config::write_buffer_size_for_agg; |
450 | 1 | return memory_usage() >= max_size; |
451 | 1 | } |
452 | 8 | return false; |
453 | 9 | } |
454 | | |
455 | 6 | Status MemTable::_generate_delete_bitmap(int32_t segment_id) { |
456 | 6 | SCOPED_RAW_TIMER(&_stat.delete_bitmap_ns); |
457 | | // generate delete bitmap, build a tmp rowset and load recent segment |
458 | 6 | if (!_tablet->enable_unique_key_merge_on_write()) { |
459 | 4 | return Status::OK(); |
460 | 4 | } |
461 | | |
462 | 2 | RowsetSharedPtr rowset_ptr; |
463 | 2 | RETURN_IF_ERROR(_rowset_writer->build_tmp(rowset_ptr)); |
464 | 2 | auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset_ptr.get()); |
465 | 2 | std::vector<segment_v2::SegmentSharedPtr> segments; |
466 | 2 | RETURN_IF_ERROR(beta_rowset->load_segments(segment_id, segment_id + 1, &segments)); |
467 | 2 | std::vector<RowsetSharedPtr> specified_rowsets; |
468 | 2 | { |
469 | 2 | std::shared_lock meta_rlock(_tablet->get_header_lock()); |
470 | 2 | specified_rowsets = _tablet->get_rowset_by_ids(&_mow_context->rowset_ids); |
471 | 2 | } |
472 | 2 | OlapStopWatch watch; |
473 | 2 | RETURN_IF_ERROR(_tablet->calc_delete_bitmap(rowset_ptr, segments, specified_rowsets, |
474 | 2 | _mow_context->delete_bitmap, |
475 | 2 | _mow_context->max_version, nullptr)); |
476 | 2 | size_t total_rows = std::accumulate( |
477 | 2 | segments.begin(), segments.end(), 0, |
478 | 2 | [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); }); |
479 | 2 | LOG(INFO) << "[Memtable Flush] construct delete bitmap tablet: " << tablet_id() |
480 | 2 | << ", rowset_ids: " << _mow_context->rowset_ids.size() |
481 | 2 | << ", cur max_version: " << _mow_context->max_version |
482 | 2 | << ", transaction_id: " << _mow_context->txn_id |
483 | 2 | << ", cost: " << watch.get_elapse_time_us() << "(us), total rows: " << total_rows; |
484 | 2 | return Status::OK(); |
485 | 2 | } |
486 | | |
487 | 6 | Status MemTable::flush() { |
488 | 6 | VLOG_CRITICAL << "begin to flush memtable for tablet: " << tablet_id() |
489 | 0 | << ", memsize: " << memory_usage() << ", rows: " << _stat.raw_rows; |
490 | | // For merge_on_write table, it must get all segments in this flush. |
491 | | // The id of new segment is set by the _num_segment of beta_rowset_writer, |
492 | | // and new segment ids is between [atomic_num_segments_before_flush, atomic_num_segments_after_flush), |
493 | | // and use the ids to load segment data file for calc delete bitmap. |
494 | 6 | int64_t duration_ns; |
495 | 6 | SCOPED_RAW_TIMER(&duration_ns); |
496 | 6 | SKIP_MEMORY_CHECK(RETURN_IF_ERROR(_do_flush())); |
497 | 6 | _delta_writer_callback(_stat); |
498 | 6 | DorisMetrics::instance()->memtable_flush_total->increment(1); |
499 | 6 | DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000); |
500 | 6 | VLOG_CRITICAL << "after flush memtable for tablet: " << tablet_id() |
501 | 0 | << ", flushsize: " << _flush_size; |
502 | | |
503 | 6 | return Status::OK(); |
504 | 6 | } |
505 | | |
506 | 6 | Status MemTable::_do_flush() { |
507 | 6 | SCOPED_CONSUME_MEM_TRACKER(_flush_mem_tracker); |
508 | 6 | size_t same_keys_num = _sort(); |
509 | 6 | if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0) { |
510 | 5 | if (_keys_type == KeysType::DUP_KEYS && _schema->num_key_columns() == 0) { |
511 | 0 | _output_mutable_block.swap(_input_mutable_block); |
512 | 5 | } else { |
513 | 5 | vectorized::Block in_block = _input_mutable_block.to_block(); |
514 | 5 | _put_into_output(in_block); |
515 | 5 | } |
516 | 5 | } else { |
517 | 1 | _aggregate<true>(); |
518 | 1 | } |
519 | 6 | vectorized::Block block = _output_mutable_block.to_block(); |
520 | 6 | FlushContext ctx; |
521 | 6 | ctx.block = █ |
522 | 6 | if (_tablet_schema->is_dynamic_schema()) { |
523 | | // Unfold variant column |
524 | 0 | RETURN_IF_ERROR(unfold_variant_column(block, &ctx)); |
525 | 0 | } |
526 | 6 | if (!_is_partial_update) { |
527 | 6 | ctx.generate_delete_bitmap = [this](size_t segment_id) { |
528 | 6 | return _generate_delete_bitmap(segment_id); |
529 | 6 | }; |
530 | 6 | } |
531 | 6 | ctx.segment_id = _segment_id; |
532 | 6 | SCOPED_RAW_TIMER(&_stat.segment_writer_ns); |
533 | 6 | RETURN_IF_ERROR(_rowset_writer->flush_single_memtable(&block, &_flush_size, &ctx)); |
534 | 6 | return Status::OK(); |
535 | 6 | } |
536 | | |
537 | 6 | void MemTable::assign_segment_id() { |
538 | 6 | _segment_id = std::optional<int32_t> {_rowset_writer->allocate_segment_id()}; |
539 | 6 | } |
540 | | |
541 | 0 | Status MemTable::close() { |
542 | 0 | return flush(); |
543 | 0 | } |
544 | | |
545 | 0 | Status MemTable::unfold_variant_column(vectorized::Block& block, FlushContext* ctx) { |
546 | 0 | if (block.rows() == 0) { |
547 | 0 | return Status::OK(); |
548 | 0 | } |
549 | | |
550 | | // Sanitize block to match exactly from the same type of frontend meta |
551 | 0 | vectorized::schema_util::FullBaseSchemaView schema_view; |
552 | 0 | schema_view.table_id = _tablet_schema->table_id(); |
553 | 0 | vectorized::ColumnWithTypeAndName* variant_column = |
554 | 0 | block.try_get_by_name(BeConsts::DYNAMIC_COLUMN_NAME); |
555 | 0 | if (!variant_column) { |
556 | 0 | return Status::OK(); |
557 | 0 | } |
558 | 0 | auto base_column = variant_column->column; |
559 | 0 | vectorized::ColumnObject& object_column = |
560 | 0 | assert_cast<vectorized::ColumnObject&>(base_column->assume_mutable_ref()); |
561 | 0 | if (object_column.empty()) { |
562 | 0 | block.erase(BeConsts::DYNAMIC_COLUMN_NAME); |
563 | 0 | return Status::OK(); |
564 | 0 | } |
565 | 0 | object_column.finalize(); |
566 | | // Has extended columns |
567 | 0 | RETURN_IF_ERROR(vectorized::schema_util::send_fetch_full_base_schema_view_rpc(&schema_view)); |
568 | | // Dynamic Block consists of two parts, dynamic part of columns and static part of columns |
569 | | // static dynamic |
570 | | // | ----- | ------- | |
571 | | // The static ones are original _tablet_schame columns |
572 | 0 | TabletSchemaSPtr flush_schema = std::make_shared<TabletSchema>(*_tablet_schema); |
573 | 0 | vectorized::Block flush_block(std::move(block)); |
574 | | // The dynamic ones are auto generated and extended, append them the the orig_block |
575 | 0 | for (auto& entry : object_column.get_subcolumns()) { |
576 | 0 | const std::string& column_name = entry->path.get_path(); |
577 | 0 | auto column_iter = schema_view.column_name_to_column.find(column_name); |
578 | 0 | if (UNLIKELY(column_iter == schema_view.column_name_to_column.end())) { |
579 | | // Column maybe dropped by light weight schema change DDL |
580 | 0 | continue; |
581 | 0 | } |
582 | 0 | TabletColumn column(column_iter->second); |
583 | 0 | auto data_type = vectorized::DataTypeFactory::instance().create_data_type( |
584 | 0 | column, column.is_nullable()); |
585 | | // Dynamic generated columns does not appear in original tablet schema |
586 | 0 | if (_tablet_schema->field_index(column.name()) < 0) { |
587 | 0 | flush_schema->append_column(column); |
588 | 0 | flush_block.insert({data_type->create_column(), data_type, column.name()}); |
589 | 0 | } |
590 | 0 | } |
591 | | |
592 | | // Ensure column are all present at this schema version.Otherwise there will be some senario: |
593 | | // Load1 -> version(10) with schema [a, b, c, d, e], d & e is new added columns and schema version became 10 |
594 | | // Load2 -> version(10) with schema [a, b, c] and has no extended columns and fetched the schema at version 10 |
595 | | // Load2 will persist meta with [a, b, c] but Load1 will persist meta with [a, b, c, d, e] |
596 | | // So we should make sure that rowset at the same schema version alawys contain the same size of columns. |
597 | | // so that all columns at schema_version is in either _tablet_schema or schema_change_recorder |
598 | 0 | for (const auto& [name, column] : schema_view.column_name_to_column) { |
599 | 0 | if (_tablet_schema->field_index(name) == -1) { |
600 | 0 | const auto& tcolumn = schema_view.column_name_to_column[name]; |
601 | 0 | TabletColumn new_column(tcolumn); |
602 | 0 | _rowset_writer->mutable_schema_change_recorder()->add_extended_columns( |
603 | 0 | column, schema_view.schema_version); |
604 | 0 | } |
605 | 0 | } |
606 | | |
607 | | // Last schema alignment before flush to disk, due to the schema maybe variant before this procedure |
608 | | // Eg. add columnA(INT) -> drop ColumnA -> add ColumnA(Double), then columnA could be type of `Double`, |
609 | | // unfold will cast to Double type |
610 | 0 | RETURN_IF_ERROR(vectorized::schema_util::unfold_object( |
611 | 0 | flush_block.get_position_by_name(BeConsts::DYNAMIC_COLUMN_NAME), flush_block, true)); |
612 | 0 | flush_block.erase(BeConsts::DYNAMIC_COLUMN_NAME); |
613 | 0 | ctx->flush_schema = flush_schema; |
614 | 0 | block.swap(flush_block); |
615 | 0 | return Status::OK(); |
616 | 0 | } |
617 | | |
618 | 0 | void MemTable::serialize_block_to_row_column(vectorized::Block& block) { |
619 | 0 | if (block.rows() == 0) { |
620 | 0 | return; |
621 | 0 | } |
622 | 0 | MonotonicStopWatch watch; |
623 | 0 | watch.start(); |
624 | | // find row column id |
625 | 0 | int row_column_id = 0; |
626 | 0 | for (int i = 0; i < _num_columns; ++i) { |
627 | 0 | if (_tablet_schema->column(i).is_row_store_column()) { |
628 | 0 | row_column_id = i; |
629 | 0 | break; |
630 | 0 | } |
631 | 0 | } |
632 | 0 | if (row_column_id == 0) { |
633 | 0 | return; |
634 | 0 | } |
635 | 0 | vectorized::ColumnString* row_store_column = |
636 | 0 | static_cast<vectorized::ColumnString*>(block.get_by_position(row_column_id) |
637 | 0 | .column->assume_mutable_ref() |
638 | 0 | .assume_mutable() |
639 | 0 | .get()); |
640 | 0 | row_store_column->clear(); |
641 | 0 | vectorized::DataTypeSerDeSPtrs serdes = |
642 | 0 | vectorized::create_data_type_serdes(block.get_data_types()); |
643 | 0 | vectorized::JsonbSerializeUtil::block_to_jsonb(*_tablet_schema, block, *row_store_column, |
644 | 0 | _tablet_schema->num_columns(), serdes); |
645 | 0 | VLOG_DEBUG << "serialize , num_rows:" << block.rows() << ", row_column_id:" << row_column_id |
646 | 0 | << ", total_byte_size:" << block.allocated_bytes() << ", serialize_cost(us)" |
647 | 0 | << watch.elapsed_time() / 1000; |
648 | 0 | } |
649 | | |
650 | | } // namespace doris |