be/src/exec/partitioner/partitioner.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/partitioner/partitioner.h" |
19 | | |
20 | | #include "common/cast_set.h" |
21 | | #include "common/status.h" |
22 | | #include "core/column/column_const.h" |
23 | | #include "exec/exchange/local_exchange_sink_operator.h" |
24 | | #include "exec/exchange/vdata_stream_sender.h" |
25 | | #include "runtime/thread_context.h" |
26 | | |
27 | | namespace doris { |
28 | | #include "common/compile_check_begin.h" |
29 | | |
30 | | template <typename ChannelIds> |
31 | 399k | Status Crc32HashPartitioner<ChannelIds>::do_partitioning(RuntimeState* state, Block* block) const { |
32 | 399k | size_t rows = block->rows(); |
33 | | |
34 | 399k | if (rows > 0) { |
35 | 60.6k | auto column_to_keep = block->columns(); |
36 | | |
37 | 60.6k | int result_size = cast_set<int>(_partition_expr_ctxs.size()); |
38 | 60.6k | std::vector<int> result(result_size); |
39 | | |
40 | 60.6k | _initialize_hash_vals(rows); |
41 | 60.6k | auto* __restrict hashes = _hash_vals.data(); |
42 | 60.6k | RETURN_IF_ERROR(_get_partition_column_result(block, result)); |
43 | 152k | for (int j = 0; j < result_size; ++j) { |
44 | 91.5k | const auto& [col, is_const] = unpack_if_const(block->get_by_position(result[j]).column); |
45 | 91.5k | if (is_const) { |
46 | 6 | continue; |
47 | 6 | } |
48 | 91.5k | _do_hash(col, hashes, j); |
49 | 91.5k | } |
50 | | |
51 | 55.0M | for (size_t i = 0; i < rows; i++) { |
52 | 54.9M | hashes[i] = ChannelIds()(hashes[i], _partition_count); |
53 | 54.9M | } |
54 | | |
55 | 60.6k | Block::erase_useless_column(block, column_to_keep); |
56 | 60.6k | } |
57 | 399k | return Status::OK(); |
58 | 399k | } _ZNK5doris20Crc32HashPartitionerINS_17ShuffleChannelIdsEE15do_partitioningEPNS_12RuntimeStateEPNS_5BlockE Line | Count | Source | 31 | 21.2k | Status Crc32HashPartitioner<ChannelIds>::do_partitioning(RuntimeState* state, Block* block) const { | 32 | 21.2k | size_t rows = block->rows(); | 33 | | | 34 | 21.2k | if (rows > 0) { | 35 | 6.74k | auto column_to_keep = block->columns(); | 36 | | | 37 | 6.74k | int result_size = cast_set<int>(_partition_expr_ctxs.size()); | 38 | 6.74k | std::vector<int> result(result_size); | 39 | | | 40 | 6.74k | _initialize_hash_vals(rows); | 41 | 6.74k | auto* __restrict hashes = _hash_vals.data(); | 42 | 6.74k | RETURN_IF_ERROR(_get_partition_column_result(block, result)); | 43 | 14.3k | for (int j = 0; j < result_size; ++j) { | 44 | 7.62k | const auto& [col, is_const] = unpack_if_const(block->get_by_position(result[j]).column); | 45 | 7.62k | if (is_const) { | 46 | 0 | continue; | 47 | 0 | } | 48 | 7.62k | _do_hash(col, hashes, j); | 49 | 7.62k | } | 50 | | | 51 | 4.63M | for (size_t i = 0; i < rows; i++) { | 52 | 4.62M | hashes[i] = ChannelIds()(hashes[i], _partition_count); | 53 | 4.62M | } | 54 | | | 55 | 6.74k | Block::erase_useless_column(block, column_to_keep); | 56 | 6.74k | } | 57 | 21.2k | return Status::OK(); | 58 | 21.2k | } |
_ZNK5doris20Crc32HashPartitionerINS_24SpillPartitionChannelIdsEE15do_partitioningEPNS_12RuntimeStateEPNS_5BlockE Line | Count | Source | 31 | 8 | Status Crc32HashPartitioner<ChannelIds>::do_partitioning(RuntimeState* state, Block* block) const { | 32 | 8 | size_t rows = block->rows(); | 33 | | | 34 | 8 | if (rows > 0) { | 35 | 8 | auto column_to_keep = block->columns(); | 36 | | | 37 | 8 | int result_size = cast_set<int>(_partition_expr_ctxs.size()); | 38 | 8 | std::vector<int> result(result_size); | 39 | | | 40 | 8 | _initialize_hash_vals(rows); | 41 | 8 | auto* __restrict hashes = _hash_vals.data(); | 42 | 8 | RETURN_IF_ERROR(_get_partition_column_result(block, result)); | 43 | 16 | for (int j = 0; j < result_size; ++j) { | 44 | 8 | const auto& [col, is_const] = unpack_if_const(block->get_by_position(result[j]).column); | 45 | 8 | if (is_const) { | 46 | 0 | continue; | 47 | 0 | } | 48 | 8 | _do_hash(col, hashes, j); | 49 | 8 | } | 50 | | | 51 | 3.14M | for (size_t i = 0; i < rows; i++) { | 52 | 3.14M | hashes[i] = ChannelIds()(hashes[i], _partition_count); | 53 | 3.14M | } | 54 | | | 55 | 8 | Block::erase_useless_column(block, column_to_keep); | 56 | 8 | } | 57 | 8 | return Status::OK(); | 58 | 8 | } |
_ZNK5doris20Crc32HashPartitionerINS_26SpillRePartitionChannelIdsEE15do_partitioningEPNS_12RuntimeStateEPNS_5BlockE Line | Count | Source | 31 | 8 | Status Crc32HashPartitioner<ChannelIds>::do_partitioning(RuntimeState* state, Block* block) const { | 32 | 8 | size_t rows = block->rows(); | 33 | | | 34 | 8 | if (rows > 0) { | 35 | 8 | auto column_to_keep = block->columns(); | 36 | | | 37 | 8 | int result_size = cast_set<int>(_partition_expr_ctxs.size()); | 38 | 8 | std::vector<int> result(result_size); | 39 | | | 40 | 8 | _initialize_hash_vals(rows); | 41 | 8 | auto* __restrict hashes = _hash_vals.data(); | 42 | 8 | RETURN_IF_ERROR(_get_partition_column_result(block, result)); | 43 | 16 | for (int j = 0; j < result_size; ++j) { | 44 | 8 | const auto& [col, is_const] = unpack_if_const(block->get_by_position(result[j]).column); | 45 | 8 | if (is_const) { | 46 | 0 | continue; | 47 | 0 | } | 48 | 8 | _do_hash(col, hashes, j); | 49 | 8 | } | 50 | | | 51 | 28 | for (size_t i = 0; i < rows; i++) { | 52 | 20 | hashes[i] = ChannelIds()(hashes[i], _partition_count); | 53 | 20 | } | 54 | | | 55 | 8 | Block::erase_useless_column(block, column_to_keep); | 56 | 8 | } | 57 | 8 | return Status::OK(); | 58 | 8 | } |
_ZNK5doris20Crc32HashPartitionerINS_15ShiftChannelIdsEE15do_partitioningEPNS_12RuntimeStateEPNS_5BlockE Line | Count | Source | 31 | 378k | Status Crc32HashPartitioner<ChannelIds>::do_partitioning(RuntimeState* state, Block* block) const { | 32 | 378k | size_t rows = block->rows(); | 33 | | | 34 | 378k | if (rows > 0) { | 35 | 53.8k | auto column_to_keep = block->columns(); | 36 | | | 37 | 53.8k | int result_size = cast_set<int>(_partition_expr_ctxs.size()); | 38 | 53.8k | std::vector<int> result(result_size); | 39 | | | 40 | 53.8k | _initialize_hash_vals(rows); | 41 | 53.8k | auto* __restrict hashes = _hash_vals.data(); | 42 | 53.8k | RETURN_IF_ERROR(_get_partition_column_result(block, result)); | 43 | 137k | for (int j = 0; j < result_size; ++j) { | 44 | 83.9k | const auto& [col, is_const] = unpack_if_const(block->get_by_position(result[j]).column); | 45 | 83.9k | if (is_const) { | 46 | 6 | continue; | 47 | 6 | } | 48 | 83.9k | _do_hash(col, hashes, j); | 49 | 83.9k | } | 50 | | | 51 | 47.2M | for (size_t i = 0; i < rows; i++) { | 52 | 47.2M | hashes[i] = ChannelIds()(hashes[i], _partition_count); | 53 | 47.2M | } | 54 | | | 55 | 53.8k | Block::erase_useless_column(block, column_to_keep); | 56 | 53.8k | } | 57 | 378k | return Status::OK(); | 58 | 378k | } |
|
59 | | |
60 | | template <typename ChannelIds> |
61 | | void Crc32HashPartitioner<ChannelIds>::_do_hash(const ColumnPtr& column, |
62 | 7.63k | HashValType* __restrict result, int idx) const { |
63 | 7.63k | column->update_crcs_with_value( |
64 | 7.63k | result, _partition_expr_ctxs[idx]->root()->data_type()->get_primitive_type(), |
65 | 7.63k | cast_set<HashValType>(column->size())); |
66 | 7.63k | } _ZNK5doris20Crc32HashPartitionerINS_17ShuffleChannelIdsEE8_do_hashERKNS_3COWINS_7IColumnEE13immutable_ptrIS4_EEPji Line | Count | Source | 62 | 7.62k | HashValType* __restrict result, int idx) const { | 63 | 7.62k | column->update_crcs_with_value( | 64 | 7.62k | result, _partition_expr_ctxs[idx]->root()->data_type()->get_primitive_type(), | 65 | 7.62k | cast_set<HashValType>(column->size())); | 66 | 7.62k | } |
_ZNK5doris20Crc32HashPartitionerINS_24SpillPartitionChannelIdsEE8_do_hashERKNS_3COWINS_7IColumnEE13immutable_ptrIS4_EEPji Line | Count | Source | 62 | 8 | HashValType* __restrict result, int idx) const { | 63 | 8 | column->update_crcs_with_value( | 64 | 8 | result, _partition_expr_ctxs[idx]->root()->data_type()->get_primitive_type(), | 65 | 8 | cast_set<HashValType>(column->size())); | 66 | 8 | } |
_ZNK5doris20Crc32HashPartitionerINS_26SpillRePartitionChannelIdsEE8_do_hashERKNS_3COWINS_7IColumnEE13immutable_ptrIS4_EEPji Line | Count | Source | 62 | 8 | HashValType* __restrict result, int idx) const { | 63 | 8 | column->update_crcs_with_value( | 64 | 8 | result, _partition_expr_ctxs[idx]->root()->data_type()->get_primitive_type(), | 65 | 8 | cast_set<HashValType>(column->size())); | 66 | 8 | } |
Unexecuted instantiation: _ZNK5doris20Crc32HashPartitionerINS_15ShiftChannelIdsEE8_do_hashERKNS_3COWINS_7IColumnEE13immutable_ptrIS4_EEPji |
67 | | |
68 | | template <typename ChannelIds> |
69 | | Status Crc32HashPartitioner<ChannelIds>::clone(RuntimeState* state, |
70 | 6.97k | std::unique_ptr<PartitionerBase>& partitioner) { |
71 | 6.97k | auto* new_partitioner = new Crc32HashPartitioner<ChannelIds>(_partition_count); |
72 | 6.97k | partitioner.reset(new_partitioner); |
73 | 6.97k | return _clone_expr_ctxs(state, new_partitioner->_partition_expr_ctxs); |
74 | 6.97k | } _ZN5doris20Crc32HashPartitionerINS_17ShuffleChannelIdsEE5cloneEPNS_12RuntimeStateERSt10unique_ptrINS_15PartitionerBaseESt14default_deleteIS6_EE Line | Count | Source | 70 | 6.96k | std::unique_ptr<PartitionerBase>& partitioner) { | 71 | 6.96k | auto* new_partitioner = new Crc32HashPartitioner<ChannelIds>(_partition_count); | 72 | 6.96k | partitioner.reset(new_partitioner); | 73 | 6.96k | return _clone_expr_ctxs(state, new_partitioner->_partition_expr_ctxs); | 74 | 6.96k | } |
_ZN5doris20Crc32HashPartitionerINS_24SpillPartitionChannelIdsEE5cloneEPNS_12RuntimeStateERSt10unique_ptrINS_15PartitionerBaseESt14default_deleteIS6_EE Line | Count | Source | 70 | 5 | std::unique_ptr<PartitionerBase>& partitioner) { | 71 | 5 | auto* new_partitioner = new Crc32HashPartitioner<ChannelIds>(_partition_count); | 72 | 5 | partitioner.reset(new_partitioner); | 73 | 5 | return _clone_expr_ctxs(state, new_partitioner->_partition_expr_ctxs); | 74 | 5 | } |
_ZN5doris20Crc32HashPartitionerINS_26SpillRePartitionChannelIdsEE5cloneEPNS_12RuntimeStateERSt10unique_ptrINS_15PartitionerBaseESt14default_deleteIS6_EE Line | Count | Source | 70 | 4 | std::unique_ptr<PartitionerBase>& partitioner) { | 71 | 4 | auto* new_partitioner = new Crc32HashPartitioner<ChannelIds>(_partition_count); | 72 | 4 | partitioner.reset(new_partitioner); | 73 | 4 | return _clone_expr_ctxs(state, new_partitioner->_partition_expr_ctxs); | 74 | 4 | } |
Unexecuted instantiation: _ZN5doris20Crc32HashPartitionerINS_15ShiftChannelIdsEE5cloneEPNS_12RuntimeStateERSt10unique_ptrINS_15PartitionerBaseESt14default_deleteIS6_EE |
75 | | |
76 | | void Crc32CHashPartitioner::_do_hash(const ColumnPtr& column, HashValType* __restrict result, |
77 | 83.9k | int idx) const { |
78 | 83.9k | column->update_crc32c_batch(result, nullptr); |
79 | 83.9k | } |
80 | | |
81 | | Status Crc32CHashPartitioner::clone(RuntimeState* state, |
82 | 145k | std::unique_ptr<PartitionerBase>& partitioner) { |
83 | 145k | auto* new_partitioner = new Crc32CHashPartitioner(_partition_count); |
84 | 145k | partitioner.reset(new_partitioner); |
85 | 145k | return _clone_expr_ctxs(state, new_partitioner->_partition_expr_ctxs); |
86 | 145k | } |
87 | | |
88 | | template class Crc32HashPartitioner<ShuffleChannelIds>; |
89 | | template class Crc32HashPartitioner<SpillPartitionChannelIds>; |
90 | | template class Crc32HashPartitioner<SpillRePartitionChannelIds>; |
91 | | |
92 | | } // namespace doris |