/root/doris/be/src/pipeline/shuffle/writer.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "writer.h" |
19 | | |
20 | | #include "pipeline/exec/exchange_sink_operator.h" |
21 | | #include "vec/core/block.h" |
22 | | |
23 | | namespace doris::pipeline { |
24 | | #include "common/compile_check_begin.h" |
25 | | template <typename ChannelPtrType> |
26 | 0 | void Writer::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st) const { |
27 | 0 | channel->set_receiver_eof(st); |
28 | | // Chanel will not send RPC to the downstream when eof, so close chanel by OK status. |
29 | 0 | static_cast<void>(channel->close(state)); |
30 | 0 | } |
31 | | |
32 | | Status Writer::write(ExchangeSinkLocalState* local_state, RuntimeState* state, |
33 | 0 | vectorized::Block* block, bool eos) const { |
34 | 0 | bool already_sent = false; |
35 | 0 | { |
36 | 0 | SCOPED_TIMER(local_state->split_block_hash_compute_timer()); |
37 | 0 | RETURN_IF_ERROR( |
38 | 0 | local_state->partitioner()->do_partitioning(state, block, eos, &already_sent)); |
39 | 0 | } |
40 | 0 | if (already_sent) { |
41 | | // The same block may be sent twice by TabletSinkHashPartitioner. To get the correct |
42 | | // result, we should not send any rows the last time. |
43 | 0 | return Status::OK(); |
44 | 0 | } |
45 | 0 | auto rows = block->rows(); |
46 | 0 | { |
47 | 0 | SCOPED_TIMER(local_state->distribute_rows_into_channels_timer()); |
48 | 0 | const auto& channel_filed = local_state->partitioner()->get_channel_ids(); |
49 | 0 | if (channel_filed.len == sizeof(uint32_t)) { |
50 | 0 | RETURN_IF_ERROR(_channel_add_rows(state, local_state->channels, |
51 | 0 | local_state->channels.size(), |
52 | 0 | channel_filed.get<uint32_t>(), rows, block, eos)); |
53 | 0 | } else { |
54 | 0 | RETURN_IF_ERROR(_channel_add_rows(state, local_state->channels, |
55 | 0 | local_state->channels.size(), |
56 | 0 | channel_filed.get<int64_t>(), rows, block, eos)); |
57 | 0 | } |
58 | 0 | } |
59 | 0 | return Status::OK(); |
60 | 0 | } |
61 | | |
62 | | template <typename ChannelIdType> |
63 | | Status Writer::_channel_add_rows(RuntimeState* state, |
64 | | std::vector<std::shared_ptr<vectorized::Channel>>& channels, |
65 | | size_t partition_count, |
66 | | const ChannelIdType* __restrict channel_ids, size_t rows, |
67 | 0 | vectorized::Block* block, bool eos) const { |
68 | 0 | std::vector<uint32_t> partition_rows_histogram; |
69 | 0 | auto row_idx = vectorized::PODArray<uint32_t>(rows); |
70 | 0 | { |
71 | 0 | partition_rows_histogram.assign(partition_count + 2, 0); |
72 | 0 | for (size_t i = 0; i < rows; ++i) { |
73 | 0 | partition_rows_histogram[channel_ids[i] + 1]++; |
74 | 0 | } |
75 | 0 | for (size_t i = 1; i <= partition_count + 1; ++i) { |
76 | 0 | partition_rows_histogram[i] += partition_rows_histogram[i - 1]; |
77 | 0 | } |
78 | 0 | for (int32_t i = cast_set<int32_t>(rows) - 1; i >= 0; --i) { |
79 | 0 | row_idx[partition_rows_histogram[channel_ids[i] + 1] - 1] = i; |
80 | 0 | partition_rows_histogram[channel_ids[i] + 1]--; |
81 | 0 | } |
82 | 0 | } |
83 | 0 | Status status = Status::OK(); |
84 | 0 | for (size_t i = 0; i < partition_count; ++i) { |
85 | 0 | uint32_t start = partition_rows_histogram[i + 1]; |
86 | 0 | uint32_t size = partition_rows_histogram[i + 2] - start; |
87 | 0 | if (!channels[i]->is_receiver_eof() && size > 0) { |
88 | 0 | status = channels[i]->add_rows(block, row_idx.data(), start, size, false); |
89 | 0 | HANDLE_CHANNEL_STATUS(state, channels[i], status); |
90 | 0 | } |
91 | 0 | } |
92 | 0 | if (eos) { |
93 | 0 | for (int i = 0; i < partition_count; ++i) { |
94 | 0 | if (!channels[i]->is_receiver_eof()) { |
95 | 0 | status = channels[i]->add_rows(block, row_idx.data(), 0, 0, true); |
96 | 0 | HANDLE_CHANNEL_STATUS(state, channels[i], status); |
97 | 0 | } |
98 | 0 | } |
99 | 0 | } |
100 | 0 | return Status::OK(); |
101 | 0 | } Unexecuted instantiation: _ZNK5doris8pipeline6Writer17_channel_add_rowsIjEENS_6StatusEPNS_12RuntimeStateERSt6vectorISt10shared_ptrINS_10vectorized7ChannelEESaISA_EEmPKT_mPNS8_5BlockEb Unexecuted instantiation: _ZNK5doris8pipeline6Writer17_channel_add_rowsIlEENS_6StatusEPNS_12RuntimeStateERSt6vectorISt10shared_ptrINS_10vectorized7ChannelEESaISA_EEmPKT_mPNS8_5BlockEb |
102 | | |
103 | | } // namespace doris::pipeline |