be/src/exec/exchange/exchange_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/exchange/exchange_writer.h" |
19 | | |
20 | | #include <glog/logging.h> |
21 | | |
22 | | #include <algorithm> |
23 | | #include <cstdint> |
24 | | #include <vector> |
25 | | |
26 | | #include "common/logging.h" |
27 | | #include "common/status.h" |
28 | | #include "core/assert_cast.h" |
29 | | #include "core/block/block.h" |
30 | | #include "exec/operator/exchange_sink_operator.h" |
31 | | #include "exec/sink/tablet_sink_hash_partitioner.h" |
32 | | |
33 | | namespace doris { |
34 | | |
35 | | ExchangeWriterBase::ExchangeWriterBase(ExchangeSinkLocalState& local_state) |
36 | 669k | : _local_state(local_state), _partitioner(local_state.partitioner()) {} |
37 | | |
38 | | template <typename ChannelPtrType> |
39 | | Status ExchangeWriterBase::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel, |
40 | 98.9k | Status st) const { |
41 | 98.9k | channel->set_receiver_eof(st); |
42 | | // Channel will not send RPC to the downstream when eof, so close channel by OK status. |
43 | 98.9k | return channel->close(state); |
44 | 98.9k | } |
45 | | |
46 | | // NOLINTBEGIN(readability-function-cognitive-complexity) |
47 | | Status ExchangeWriterBase::_add_rows_impl(RuntimeState* state, |
48 | | std::vector<std::shared_ptr<Channel>>& channels, |
49 | 278k | size_t channel_count, Block* block, bool eos) { |
50 | 278k | Status status = Status::OK(); |
51 | 278k | uint32_t offset = 0; |
52 | 3.81M | for (size_t i = 0; i < channel_count; ++i) { |
53 | 3.54M | uint32_t size = _channel_rows_histogram[i]; |
54 | 3.54M | if (!channels[i]->is_receiver_eof() && size > 0) { |
55 | 70.4k | VLOG_DEBUG << fmt::format("partition {} of {}, block:\n{}, start: {}, size: {}", i, |
56 | 0 | channel_count, block->dump_data(), offset, size); |
57 | 70.4k | status = channels[i]->add_rows(block, _origin_row_idx.data(), offset, size, false); |
58 | 70.4k | HANDLE_CHANNEL_STATUS(state, channels[i], status); |
59 | 70.4k | } |
60 | 3.54M | offset += size; |
61 | 3.54M | } |
62 | 278k | if (eos) { |
63 | 3.62M | for (int i = 0; i < channel_count; ++i) { |
64 | 3.36M | if (!channels[i]->is_receiver_eof()) { |
65 | 18.4E | VLOG_DEBUG << fmt::format("EOS partition {} of {}, block:\n{}", i, channel_count, |
66 | 18.4E | block->dump_data()); |
67 | 3.36M | status = channels[i]->add_rows(block, _origin_row_idx.data(), 0, 0, true); |
68 | 3.36M | HANDLE_CHANNEL_STATUS(state, channels[i], status); |
69 | 3.36M | } |
70 | 3.35M | } |
71 | 267k | } |
72 | 278k | return Status::OK(); |
73 | 278k | } |
74 | | // NOLINTEND(readability-function-cognitive-complexity) |
75 | | |
76 | 472 | Status ExchangeOlapWriter::write(RuntimeState* state, Block* block, bool eos) { |
77 | 472 | Block prior_block; |
78 | 472 | auto* tablet_partitioner = assert_cast<TabletSinkHashPartitioner*>(_partitioner); |
79 | 472 | RETURN_IF_ERROR(tablet_partitioner->try_cut_in_line(prior_block)); |
80 | 472 | if (!prior_block.empty()) { |
81 | | // prior_block (batching rows) cuts in line, deal it first. |
82 | 0 | RETURN_IF_ERROR(_write_impl(state, &prior_block)); |
83 | 0 | tablet_partitioner->finish_cut_in_line(); |
84 | 0 | } |
85 | | |
86 | 472 | RETURN_IF_ERROR(_write_impl(state, block)); |
87 | | |
88 | | // all data wrote. consider batched rows before eos. |
89 | 472 | if (eos) { |
90 | | // get all batched rows |
91 | 432 | tablet_partitioner->mark_last_block(); |
92 | 432 | Block final_batching_block; |
93 | 432 | RETURN_IF_ERROR(tablet_partitioner->try_cut_in_line(final_batching_block)); |
94 | 432 | if (!final_batching_block.empty()) { |
95 | 0 | RETURN_IF_ERROR(_write_impl(state, &final_batching_block, true)); |
96 | 432 | } else { |
97 | | // No batched rows, send empty block with eos signal. |
98 | 432 | Block empty_block = block->clone_empty(); |
99 | 432 | RETURN_IF_ERROR(_write_impl(state, &empty_block, true)); |
100 | 432 | } |
101 | 432 | } |
102 | 472 | return Status::OK(); |
103 | 472 | } |
104 | | |
105 | 906 | Status ExchangeOlapWriter::_write_impl(RuntimeState* state, Block* block, bool eos) { |
106 | 906 | auto rows = block->rows(); |
107 | 906 | auto* tablet_partitioner = assert_cast<TabletSinkHashPartitioner*>(_partitioner); |
108 | 906 | { |
109 | 906 | SCOPED_TIMER(_local_state.split_block_hash_compute_timer()); |
110 | 906 | RETURN_IF_ERROR(tablet_partitioner->do_partitioning(state, block)); |
111 | 906 | } |
112 | 906 | { |
113 | 906 | SCOPED_TIMER(_local_state.distribute_rows_into_channels_timer()); |
114 | 906 | const auto& channel_ids = tablet_partitioner->get_channel_ids(); |
115 | 906 | const auto invalid_val = tablet_partitioner->invalid_sentinel(); |
116 | 906 | DCHECK_EQ(channel_ids.size(), rows); |
117 | | |
118 | | // decrease not sinked rows this time |
119 | 906 | COUNTER_UPDATE(_local_state.rows_input_counter(), |
120 | 906 | -1LL * std::ranges::count(channel_ids, invalid_val)); |
121 | | |
122 | 906 | RETURN_IF_ERROR(_channel_add_rows(state, _local_state.channels, |
123 | 906 | _local_state.channels.size(), channel_ids, rows, block, |
124 | 906 | eos, invalid_val)); |
125 | 906 | } |
126 | 906 | return Status::OK(); |
127 | 906 | } |
128 | | |
129 | 277k | Status ExchangeTrivialWriter::write(RuntimeState* state, Block* block, bool eos) { |
130 | 277k | { |
131 | 277k | SCOPED_TIMER(_local_state.split_block_hash_compute_timer()); |
132 | 277k | RETURN_IF_ERROR(_partitioner->do_partitioning(state, block)); |
133 | 277k | } |
134 | 277k | { |
135 | 277k | auto rows = block->rows(); |
136 | 277k | SCOPED_TIMER(_local_state.distribute_rows_into_channels_timer()); |
137 | 277k | const auto& channel_ids = _partitioner->get_channel_ids(); |
138 | | |
139 | 277k | RETURN_IF_ERROR(_channel_add_rows(state, _local_state.channels, |
140 | 277k | _local_state.channels.size(), channel_ids, rows, block, |
141 | 277k | eos)); |
142 | 277k | } |
143 | | |
144 | 277k | return Status::OK(); |
145 | 277k | } |
146 | | |
147 | | Status ExchangeOlapWriter::_channel_add_rows(RuntimeState* state, |
148 | | std::vector<std::shared_ptr<Channel>>& channels, |
149 | | size_t channel_count, |
150 | | const std::vector<HashValType>& channel_ids, |
151 | | size_t rows, Block* block, bool eos, |
152 | 915 | HashValType invalid_val) { |
153 | 915 | size_t effective_rows = 0; |
154 | 915 | effective_rows = |
155 | 915 | std::ranges::count_if(channel_ids, [=](int64_t cid) { return cid != invalid_val; }); |
156 | | |
157 | | // row index will skip all skipped rows. |
158 | 915 | _origin_row_idx.resize(effective_rows); |
159 | 915 | _channel_rows_histogram.assign(channel_count, 0U); |
160 | 915 | _channel_pos_offsets.resize(channel_count); |
161 | 985 | for (size_t i = 0; i < rows; ++i) { |
162 | 70 | if (channel_ids[i] == invalid_val) { |
163 | 2 | continue; |
164 | 2 | } |
165 | 68 | auto cid = channel_ids[i]; |
166 | 68 | _channel_rows_histogram[cid]++; |
167 | 68 | } |
168 | 915 | _channel_pos_offsets[0] = 0; |
169 | 7.30k | for (size_t i = 1; i < channel_count; ++i) { |
170 | 6.38k | _channel_pos_offsets[i] = _channel_pos_offsets[i - 1] + _channel_rows_histogram[i - 1]; |
171 | 6.38k | } |
172 | 985 | for (uint32_t i = 0; i < rows; ++i) { |
173 | 70 | if (channel_ids[i] == invalid_val) { |
174 | 2 | continue; |
175 | 2 | } |
176 | 68 | auto cid = channel_ids[i]; |
177 | 68 | auto pos = _channel_pos_offsets[cid]++; |
178 | 68 | _origin_row_idx[pos] = i; |
179 | 68 | } |
180 | | |
181 | 915 | return _add_rows_impl(state, channels, channel_count, block, eos); |
182 | 915 | } |
183 | | |
184 | | Status ExchangeTrivialWriter::_channel_add_rows(RuntimeState* state, |
185 | | std::vector<std::shared_ptr<Channel>>& channels, |
186 | | size_t channel_count, |
187 | | const std::vector<HashValType>& channel_ids, |
188 | 277k | size_t rows, Block* block, bool eos) { |
189 | 277k | _origin_row_idx.resize(rows); |
190 | 277k | _channel_rows_histogram.assign(channel_count, 0U); |
191 | 277k | _channel_pos_offsets.resize(channel_count); |
192 | 17.6M | for (size_t i = 0; i < rows; ++i) { |
193 | 17.3M | _channel_rows_histogram[channel_ids[i]]++; |
194 | 17.3M | } |
195 | 277k | _channel_pos_offsets[0] = 0; |
196 | 3.53M | for (size_t i = 1; i < channel_count; ++i) { |
197 | 3.25M | _channel_pos_offsets[i] = _channel_pos_offsets[i - 1] + _channel_rows_histogram[i - 1]; |
198 | 3.25M | } |
199 | 17.5M | for (uint32_t i = 0; i < rows; i++) { |
200 | 17.3M | auto cid = channel_ids[i]; |
201 | 17.3M | auto pos = _channel_pos_offsets[cid]++; |
202 | 17.3M | _origin_row_idx[pos] = i; |
203 | 17.3M | } |
204 | | |
205 | 277k | return _add_rows_impl(state, channels, channel_count, block, eos); |
206 | 277k | } |
207 | | |
208 | | } // namespace doris |