be/src/exec/exchange/exchange_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/exchange/exchange_writer.h" |
19 | | |
20 | | #include <glog/logging.h> |
21 | | |
22 | | #include <algorithm> |
23 | | #include <cstdint> |
24 | | #include <vector> |
25 | | |
26 | | #include "common/logging.h" |
27 | | #include "common/status.h" |
28 | | #include "core/assert_cast.h" |
29 | | #include "core/block/block.h" |
30 | | #include "exec/operator/exchange_sink_operator.h" |
31 | | #include "exec/sink/tablet_sink_hash_partitioner.h" |
32 | | |
33 | | namespace doris { |
34 | | #include "common/compile_check_begin.h" |
35 | | |
36 | | ExchangeWriterBase::ExchangeWriterBase(ExchangeSinkLocalState& local_state) |
37 | 149k | : _local_state(local_state), _partitioner(local_state.partitioner()) {} |
38 | | |
39 | | template <typename ChannelPtrType> |
40 | | Status ExchangeWriterBase::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel, |
41 | 84 | Status st) const { |
42 | 84 | channel->set_receiver_eof(st); |
43 | | // Channel will not send RPC to the downstream when eof, so close channel by OK status. |
44 | 84 | return channel->close(state); |
45 | 84 | } |
46 | | |
47 | | // NOLINTBEGIN(readability-function-cognitive-complexity) |
48 | | Status ExchangeWriterBase::_add_rows_impl(RuntimeState* state, |
49 | | std::vector<std::shared_ptr<Channel>>& channels, |
50 | 16.9k | size_t channel_count, Block* block, bool eos) { |
51 | 16.9k | Status status = Status::OK(); |
52 | 16.9k | uint32_t offset = 0; |
53 | 152k | for (size_t i = 0; i < channel_count; ++i) { |
54 | 135k | uint32_t size = _channel_rows_histogram[i]; |
55 | 135k | if (!channels[i]->is_receiver_eof() && size > 0) { |
56 | 45.6k | VLOG_DEBUG << fmt::format("partition {} of {}, block:\n{}, start: {}, size: {}", i, |
57 | 2 | channel_count, block->dump_data(), offset, size); |
58 | 45.6k | status = channels[i]->add_rows(block, _origin_row_idx.data(), offset, size, false); |
59 | 45.6k | HANDLE_CHANNEL_STATUS(state, channels[i], status); |
60 | 45.6k | } |
61 | 135k | offset += size; |
62 | 135k | } |
63 | 16.9k | if (eos) { |
64 | 103k | for (int i = 0; i < channel_count; ++i) { |
65 | 91.5k | if (!channels[i]->is_receiver_eof()) { |
66 | 18.4E | VLOG_DEBUG << fmt::format("EOS partition {} of {}, block:\n{}", i, channel_count, |
67 | 18.4E | block->dump_data()); |
68 | 91.5k | status = channels[i]->add_rows(block, _origin_row_idx.data(), 0, 0, true); |
69 | 91.5k | HANDLE_CHANNEL_STATUS(state, channels[i], status); |
70 | 91.5k | } |
71 | 91.5k | } |
72 | 11.4k | } |
73 | 16.9k | return Status::OK(); |
74 | 16.9k | } |
75 | | // NOLINTEND(readability-function-cognitive-complexity) |
76 | | |
77 | 336 | Status ExchangeOlapWriter::write(RuntimeState* state, Block* block, bool eos) { |
78 | 336 | Block prior_block; |
79 | 336 | auto* tablet_partitioner = assert_cast<TabletSinkHashPartitioner*>(_partitioner); |
80 | 336 | RETURN_IF_ERROR(tablet_partitioner->try_cut_in_line(prior_block)); |
81 | 336 | if (!prior_block.empty()) { |
82 | | // prior_block (batching rows) cuts in line, deal it first. |
83 | 0 | RETURN_IF_ERROR(_write_impl(state, &prior_block)); |
84 | 0 | tablet_partitioner->finish_cut_in_line(); |
85 | 0 | } |
86 | | |
87 | 336 | RETURN_IF_ERROR(_write_impl(state, block)); |
88 | | |
89 | | // all data wrote. consider batched rows before eos. |
90 | 336 | if (eos) { |
91 | | // get all batched rows |
92 | 304 | tablet_partitioner->mark_last_block(); |
93 | 304 | Block final_batching_block; |
94 | 304 | RETURN_IF_ERROR(tablet_partitioner->try_cut_in_line(final_batching_block)); |
95 | 304 | if (!final_batching_block.empty()) { |
96 | 0 | RETURN_IF_ERROR(_write_impl(state, &final_batching_block, true)); |
97 | 304 | } else { |
98 | | // No batched rows, send empty block with eos signal. |
99 | 304 | Block empty_block = block->clone_empty(); |
100 | 304 | RETURN_IF_ERROR(_write_impl(state, &empty_block, true)); |
101 | 304 | } |
102 | 304 | } |
103 | 336 | return Status::OK(); |
104 | 336 | } |
105 | | |
106 | 632 | Status ExchangeOlapWriter::_write_impl(RuntimeState* state, Block* block, bool eos) { |
107 | 632 | auto rows = block->rows(); |
108 | 632 | auto* tablet_partitioner = assert_cast<TabletSinkHashPartitioner*>(_partitioner); |
109 | 632 | { |
110 | 632 | SCOPED_TIMER(_local_state.split_block_hash_compute_timer()); |
111 | 632 | RETURN_IF_ERROR(tablet_partitioner->do_partitioning(state, block)); |
112 | 632 | } |
113 | 632 | { |
114 | 632 | SCOPED_TIMER(_local_state.distribute_rows_into_channels_timer()); |
115 | 632 | const auto& channel_ids = tablet_partitioner->get_channel_ids(); |
116 | 632 | const auto invalid_val = tablet_partitioner->invalid_sentinel(); |
117 | 632 | DCHECK_EQ(channel_ids.size(), rows); |
118 | | |
119 | | // decrease not sinked rows this time |
120 | 632 | COUNTER_UPDATE(_local_state.rows_input_counter(), |
121 | 632 | -1LL * std::ranges::count(channel_ids, invalid_val)); |
122 | | |
123 | 632 | RETURN_IF_ERROR(_channel_add_rows(state, _local_state.channels, |
124 | 632 | _local_state.channels.size(), channel_ids, rows, block, |
125 | 632 | eos, invalid_val)); |
126 | 632 | } |
127 | 632 | return Status::OK(); |
128 | 632 | } |
129 | | |
130 | 16.2k | Status ExchangeTrivialWriter::write(RuntimeState* state, Block* block, bool eos) { |
131 | 16.2k | auto rows = block->rows(); |
132 | 16.2k | { |
133 | 16.2k | SCOPED_TIMER(_local_state.split_block_hash_compute_timer()); |
134 | 16.2k | RETURN_IF_ERROR(_partitioner->do_partitioning(state, block)); |
135 | 16.2k | } |
136 | 16.2k | { |
137 | 16.2k | SCOPED_TIMER(_local_state.distribute_rows_into_channels_timer()); |
138 | 16.2k | const auto& channel_ids = _partitioner->get_channel_ids(); |
139 | | |
140 | 16.2k | RETURN_IF_ERROR(_channel_add_rows(state, _local_state.channels, |
141 | 16.2k | _local_state.channels.size(), channel_ids, rows, block, |
142 | 16.2k | eos)); |
143 | 16.2k | } |
144 | | |
145 | 16.2k | return Status::OK(); |
146 | 16.2k | } |
147 | | |
148 | | Status ExchangeOlapWriter::_channel_add_rows(RuntimeState* state, |
149 | | std::vector<std::shared_ptr<Channel>>& channels, |
150 | | size_t channel_count, |
151 | | const std::vector<HashValType>& channel_ids, |
152 | | size_t rows, Block* block, bool eos, |
153 | 639 | HashValType invalid_val) { |
154 | 639 | size_t effective_rows = 0; |
155 | 639 | effective_rows = |
156 | 639 | std::ranges::count_if(channel_ids, [=](int64_t cid) { return cid != invalid_val; }); |
157 | | |
158 | | // row index will skip all skipped rows. |
159 | 639 | _origin_row_idx.resize(effective_rows); |
160 | 639 | _channel_rows_histogram.assign(channel_count, 0U); |
161 | 639 | _channel_pos_offsets.resize(channel_count); |
162 | 695 | for (size_t i = 0; i < rows; ++i) { |
163 | 56 | if (channel_ids[i] == invalid_val) { |
164 | 2 | continue; |
165 | 2 | } |
166 | 54 | auto cid = channel_ids[i]; |
167 | 54 | _channel_rows_histogram[cid]++; |
168 | 54 | } |
169 | 639 | _channel_pos_offsets[0] = 0; |
170 | 5.12k | for (size_t i = 1; i < channel_count; ++i) { |
171 | 4.48k | _channel_pos_offsets[i] = _channel_pos_offsets[i - 1] + _channel_rows_histogram[i - 1]; |
172 | 4.48k | } |
173 | 695 | for (uint32_t i = 0; i < rows; ++i) { |
174 | 56 | if (channel_ids[i] == invalid_val) { |
175 | 2 | continue; |
176 | 2 | } |
177 | 54 | auto cid = channel_ids[i]; |
178 | 54 | auto pos = _channel_pos_offsets[cid]++; |
179 | 54 | _origin_row_idx[pos] = i; |
180 | 54 | } |
181 | | |
182 | 639 | return _add_rows_impl(state, channels, channel_count, block, eos); |
183 | 639 | } |
184 | | |
185 | | Status ExchangeTrivialWriter::_channel_add_rows(RuntimeState* state, |
186 | | std::vector<std::shared_ptr<Channel>>& channels, |
187 | | size_t channel_count, |
188 | | const std::vector<HashValType>& channel_ids, |
189 | 16.2k | size_t rows, Block* block, bool eos) { |
190 | 16.2k | _origin_row_idx.resize(rows); |
191 | 16.2k | _channel_rows_histogram.assign(channel_count, 0U); |
192 | 16.2k | _channel_pos_offsets.resize(channel_count); |
193 | 30.2M | for (size_t i = 0; i < rows; ++i) { |
194 | 30.2M | _channel_rows_histogram[channel_ids[i]]++; |
195 | 30.2M | } |
196 | 16.2k | _channel_pos_offsets[0] = 0; |
197 | 130k | for (size_t i = 1; i < channel_count; ++i) { |
198 | 114k | _channel_pos_offsets[i] = _channel_pos_offsets[i - 1] + _channel_rows_histogram[i - 1]; |
199 | 114k | } |
200 | 30.2M | for (uint32_t i = 0; i < rows; i++) { |
201 | 30.1M | auto cid = channel_ids[i]; |
202 | 30.1M | auto pos = _channel_pos_offsets[cid]++; |
203 | 30.1M | _origin_row_idx[pos] = i; |
204 | 30.1M | } |
205 | | |
206 | 16.2k | return _add_rows_impl(state, channels, channel_count, block, eos); |
207 | 16.2k | } |
208 | | |
209 | | } // namespace doris |