/root/doris/be/src/pipeline/shuffle/writer.cpp
| Line | Count | Source | 
| 1 |  | // Licensed to the Apache Software Foundation (ASF) under one | 
| 2 |  | // or more contributor license agreements.  See the NOTICE file | 
| 3 |  | // distributed with this work for additional information | 
| 4 |  | // regarding copyright ownership.  The ASF licenses this file | 
| 5 |  | // to you under the Apache License, Version 2.0 (the | 
| 6 |  | // "License"); you may not use this file except in compliance | 
| 7 |  | // with the License.  You may obtain a copy of the License at | 
| 8 |  | // | 
| 9 |  | //   http://www.apache.org/licenses/LICENSE-2.0 | 
| 10 |  | // | 
| 11 |  | // Unless required by applicable law or agreed to in writing, | 
| 12 |  | // software distributed under the License is distributed on an | 
| 13 |  | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
| 14 |  | // KIND, either express or implied.  See the License for the | 
| 15 |  | // specific language governing permissions and limitations | 
| 16 |  | // under the License. | 
| 17 |  |  | 
| 18 |  | #include "writer.h" | 
| 19 |  |  | 
| 20 |  | #include "pipeline/exec/exchange_sink_operator.h" | 
| 21 |  | #include "vec/core/block.h" | 
| 22 |  |  | 
| 23 |  | namespace doris::pipeline { | 
| 24 |  | #include "common/compile_check_begin.h" | 
| 25 |  | template <typename ChannelPtrType> | 
| 26 | 0 | void Writer::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st) const { | 
| 27 | 0 |     channel->set_receiver_eof(st); | 
| 28 |  |     // Chanel will not send RPC to the downstream when eof, so close chanel by OK status. | 
| 29 | 0 |     static_cast<void>(channel->close(state)); | 
| 30 | 0 | } | 
| 31 |  |  | 
| 32 |  | Status Writer::write(ExchangeSinkLocalState* local_state, RuntimeState* state, | 
| 33 | 0 |                      vectorized::Block* block, bool eos) const { | 
| 34 | 0 |     bool already_sent = false; | 
| 35 | 0 |     { | 
| 36 | 0 |         SCOPED_TIMER(local_state->split_block_hash_compute_timer()); | 
| 37 | 0 |         RETURN_IF_ERROR( | 
| 38 | 0 |                 local_state->partitioner()->do_partitioning(state, block, eos, &already_sent)); | 
| 39 | 0 |     } | 
| 40 | 0 |     if (already_sent) { | 
| 41 |  |         // The same block may be sent twice by TabletSinkHashPartitioner. To get the correct | 
| 42 |  |         // result, we should not send any rows the last time. | 
| 43 | 0 |         return Status::OK(); | 
| 44 | 0 |     } | 
| 45 | 0 |     auto rows = block->rows(); | 
| 46 | 0 |     { | 
| 47 | 0 |         SCOPED_TIMER(local_state->distribute_rows_into_channels_timer()); | 
| 48 | 0 |         const auto& channel_filed = local_state->partitioner()->get_channel_ids(); | 
| 49 | 0 |         if (channel_filed.len == sizeof(uint32_t)) { | 
| 50 | 0 |             RETURN_IF_ERROR(_channel_add_rows(state, local_state->channels, | 
| 51 | 0 |                                               local_state->channels.size(), | 
| 52 | 0 |                                               channel_filed.get<uint32_t>(), rows, block, eos)); | 
| 53 | 0 |         } else { | 
| 54 | 0 |             RETURN_IF_ERROR(_channel_add_rows(state, local_state->channels, | 
| 55 | 0 |                                               local_state->channels.size(), | 
| 56 | 0 |                                               channel_filed.get<int64_t>(), rows, block, eos)); | 
| 57 | 0 |         } | 
| 58 | 0 |     } | 
| 59 | 0 |     return Status::OK(); | 
| 60 | 0 | } | 
| 61 |  |  | 
| 62 |  | template <typename ChannelIdType> | 
| 63 |  | Status Writer::_channel_add_rows(RuntimeState* state, | 
| 64 |  |                                  std::vector<std::shared_ptr<vectorized::Channel>>& channels, | 
| 65 |  |                                  size_t partition_count, | 
| 66 |  |                                  const ChannelIdType* __restrict channel_ids, size_t rows, | 
| 67 | 0 |                                  vectorized::Block* block, bool eos) const { | 
| 68 | 0 |     std::vector<uint32_t> partition_rows_histogram; | 
| 69 | 0 |     auto row_idx = vectorized::PODArray<uint32_t>(rows); | 
| 70 | 0 |     { | 
| 71 | 0 |         partition_rows_histogram.assign(partition_count + 2, 0); | 
| 72 | 0 |         for (size_t i = 0; i < rows; ++i) { | 
| 73 | 0 |             partition_rows_histogram[channel_ids[i] + 1]++; | 
| 74 | 0 |         } | 
| 75 | 0 |         for (size_t i = 1; i <= partition_count + 1; ++i) { | 
| 76 | 0 |             partition_rows_histogram[i] += partition_rows_histogram[i - 1]; | 
| 77 | 0 |         } | 
| 78 | 0 |         for (int32_t i = cast_set<int32_t>(rows) - 1; i >= 0; --i) { | 
| 79 | 0 |             row_idx[partition_rows_histogram[channel_ids[i] + 1] - 1] = i; | 
| 80 | 0 |             partition_rows_histogram[channel_ids[i] + 1]--; | 
| 81 | 0 |         } | 
| 82 | 0 |     } | 
| 83 | 0 |     Status status = Status::OK(); | 
| 84 | 0 |     for (size_t i = 0; i < partition_count; ++i) { | 
| 85 | 0 |         uint32_t start = partition_rows_histogram[i + 1]; | 
| 86 | 0 |         uint32_t size = partition_rows_histogram[i + 2] - start; | 
| 87 | 0 |         if (!channels[i]->is_receiver_eof() && size > 0) { | 
| 88 | 0 |             status = channels[i]->add_rows(block, row_idx.data(), start, size, false); | 
| 89 | 0 |             HANDLE_CHANNEL_STATUS(state, channels[i], status); | 
| 90 | 0 |         } | 
| 91 | 0 |     } | 
| 92 | 0 |     if (eos) { | 
| 93 | 0 |         for (int i = 0; i < partition_count; ++i) { | 
| 94 | 0 |             if (!channels[i]->is_receiver_eof()) { | 
| 95 | 0 |                 status = channels[i]->add_rows(block, row_idx.data(), 0, 0, true); | 
| 96 | 0 |                 HANDLE_CHANNEL_STATUS(state, channels[i], status); | 
| 97 | 0 |             } | 
| 98 | 0 |         } | 
| 99 | 0 |     } | 
| 100 | 0 |     return Status::OK(); | 
| 101 | 0 | } Unexecuted instantiation: _ZNK5doris8pipeline6Writer17_channel_add_rowsIjEENS_6StatusEPNS_12RuntimeStateERSt6vectorISt10shared_ptrINS_10vectorized7ChannelEESaISA_EEmPKT_mPNS8_5BlockEbUnexecuted instantiation: _ZNK5doris8pipeline6Writer17_channel_add_rowsIlEENS_6StatusEPNS_12RuntimeStateERSt6vectorISt10shared_ptrINS_10vectorized7ChannelEESaISA_EEmPKT_mPNS8_5BlockEb | 
| 102 |  |  | 
| 103 |  | } // namespace doris::pipeline |