/root/doris/be/src/pipeline/shuffle/writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "writer.h" |
19 | | |
20 | | #include <type_traits> |
21 | | |
22 | | #include "pipeline/exec/exchange_sink_operator.h" |
23 | | #include "vec/core/block.h" |
24 | | |
25 | | namespace doris::pipeline { |
26 | | #include "common/compile_check_begin.h" |
27 | | template <typename ChannelPtrType> |
28 | 0 | void Writer::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st) const { |
29 | 0 | channel->set_receiver_eof(st); |
30 | | // Chanel will not send RPC to the downstream when eof, so close chanel by OK status. |
31 | 0 | static_cast<void>(channel->close(state)); |
32 | 0 | } |
33 | | |
34 | | Status Writer::write(ExchangeSinkLocalState* local_state, RuntimeState* state, |
35 | 0 | vectorized::Block* block, bool eos) { |
36 | 0 | bool already_sent = false; |
37 | 0 | { |
38 | 0 | SCOPED_TIMER(local_state->split_block_hash_compute_timer()); |
39 | 0 | RETURN_IF_ERROR( |
40 | 0 | local_state->partitioner()->do_partitioning(state, block, eos, &already_sent)); |
41 | 0 | } |
42 | 0 | if (already_sent) { |
43 | | // The same block may be sent twice by TabletSinkHashPartitioner. To get the correct |
44 | | // result, we should not send any rows the last time. |
45 | 0 | return Status::OK(); |
46 | 0 | } |
47 | 0 | auto rows = block->rows(); |
48 | 0 | { |
49 | 0 | SCOPED_TIMER(local_state->distribute_rows_into_channels_timer()); |
50 | 0 | const auto& channel_filed = local_state->partitioner()->get_channel_ids(); |
51 | 0 | if (channel_filed.len == sizeof(uint32_t)) { |
52 | 0 | RETURN_IF_ERROR(_channel_add_rows(state, local_state->channels, |
53 | 0 | local_state->channels.size(), |
54 | 0 | channel_filed.get<uint32_t>(), rows, block, eos)); |
55 | 0 | } else { |
56 | 0 | RETURN_IF_ERROR(_channel_add_rows(state, local_state->channels, |
57 | 0 | local_state->channels.size(), |
58 | 0 | channel_filed.get<int64_t>(), rows, block, eos)); |
59 | 0 | } |
60 | 0 | } |
61 | 0 | return Status::OK(); |
62 | 0 | } |
63 | | |
64 | | template <typename ChannelIdType> |
65 | | Status Writer::_channel_add_rows(RuntimeState* state, |
66 | | std::vector<std::shared_ptr<vectorized::Channel>>& channels, |
67 | | size_t partition_count, |
68 | | const ChannelIdType* __restrict channel_ids, size_t rows, |
69 | 0 | vectorized::Block* block, bool eos) { |
70 | 0 | _row_idx.resize(rows); |
71 | 0 | { |
72 | 0 | _partition_rows_histogram.resize(partition_count); |
73 | 0 | _channel_start_offsets.resize(partition_count); |
74 | 0 | for (size_t i = 0; i < partition_count; ++i) { |
75 | 0 | _partition_rows_histogram[i] = 0; |
76 | 0 | } |
77 | 0 | for (size_t i = 0; i < rows; ++i) { |
78 | 0 | _partition_rows_histogram[channel_ids[i]]++; |
79 | 0 | } |
80 | 0 | _channel_start_offsets[0] = 0; |
81 | 0 | for (size_t i = 1; i < partition_count; ++i) { |
82 | 0 | _channel_start_offsets[i] = |
83 | 0 | _channel_start_offsets[i - 1] + _partition_rows_histogram[i - 1]; |
84 | 0 | } |
85 | 0 | for (uint32_t i = 0; i < rows; i++) { |
86 | 0 | if constexpr (std::is_signed_v<ChannelIdType>) { |
87 | | // -1 means this row is filtered by table sink hash partitioner |
88 | 0 | if (channel_ids[i] == -1) { |
89 | 0 | continue; |
90 | 0 | } |
91 | 0 | } |
92 | 0 | _row_idx[_channel_start_offsets[channel_ids[i]]++] = i; |
93 | 0 | } |
94 | 0 | } |
95 | 0 | Status status = Status::OK(); |
96 | 0 | uint32_t offset = 0; |
97 | 0 | for (size_t i = 0; i < partition_count; ++i) { |
98 | 0 | uint32_t size = _partition_rows_histogram[i]; |
99 | 0 | if (!channels[i]->is_receiver_eof() && size > 0) { |
100 | 0 | status = channels[i]->add_rows(block, _row_idx.data(), offset, size, false); |
101 | 0 | HANDLE_CHANNEL_STATUS(state, channels[i], status); |
102 | 0 | } |
103 | 0 | offset += size; |
104 | 0 | } |
105 | 0 | if (eos) { |
106 | 0 | for (int i = 0; i < partition_count; ++i) { |
107 | 0 | if (!channels[i]->is_receiver_eof()) { |
108 | 0 | status = channels[i]->add_rows(block, _row_idx.data(), 0, 0, true); |
109 | 0 | HANDLE_CHANNEL_STATUS(state, channels[i], status); |
110 | 0 | } |
111 | 0 | } |
112 | 0 | } |
113 | 0 | return Status::OK(); |
114 | 0 | } Unexecuted instantiation: _ZN5doris8pipeline6Writer17_channel_add_rowsIjEENS_6StatusEPNS_12RuntimeStateERSt6vectorISt10shared_ptrINS_10vectorized7ChannelEESaISA_EEmPKT_mPNS8_5BlockEb Unexecuted instantiation: _ZN5doris8pipeline6Writer17_channel_add_rowsIlEENS_6StatusEPNS_12RuntimeStateERSt6vectorISt10shared_ptrINS_10vectorized7ChannelEESaISA_EEmPKT_mPNS8_5BlockEb |
115 | | |
116 | | } // namespace doris::pipeline |