Coverage Report

Created: 2026-03-12 13:34

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/exchange/exchange_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/exchange/exchange_writer.h"
19
20
#include <glog/logging.h>
21
22
#include <algorithm>
23
#include <cstdint>
24
#include <vector>
25
26
#include "common/logging.h"
27
#include "common/status.h"
28
#include "core/assert_cast.h"
29
#include "core/block/block.h"
30
#include "exec/operator/exchange_sink_operator.h"
31
#include "exec/sink/tablet_sink_hash_partitioner.h"
32
33
namespace doris {
34
#include "common/compile_check_begin.h"
35
36
ExchangeWriterBase::ExchangeWriterBase(ExchangeSinkLocalState& local_state)
37
700k
        : _local_state(local_state), _partitioner(local_state.partitioner()) {}
38
39
template <typename ChannelPtrType>
40
Status ExchangeWriterBase::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel,
41
22.8k
                                               Status st) const {
42
22.8k
    channel->set_receiver_eof(st);
43
    // Channel will not send RPC to the downstream when eof, so close channel by OK status.
44
22.8k
    return channel->close(state);
45
22.8k
}
46
47
// NOLINTBEGIN(readability-function-cognitive-complexity)
48
Status ExchangeWriterBase::_add_rows_impl(RuntimeState* state,
49
                                          std::vector<std::shared_ptr<Channel>>& channels,
50
275k
                                          size_t channel_count, Block* block, bool eos) {
51
275k
    Status status = Status::OK();
52
275k
    uint32_t offset = 0;
53
5.25M
    for (size_t i = 0; i < channel_count; ++i) {
54
4.98M
        uint32_t size = _channel_rows_histogram[i];
55
4.98M
        if (!channels[i]->is_receiver_eof() && size > 0) {
56
18.4E
            VLOG_DEBUG << fmt::format("partition {} of {}, block:\n{}, start: {}, size: {}", i,
57
18.4E
                                      channel_count, block->dump_data(), offset, size);
58
71.4k
            status = channels[i]->add_rows(block, _origin_row_idx.data(), offset, size, false);
59
71.4k
            HANDLE_CHANNEL_STATUS(state, channels[i], status);
60
71.4k
        }
61
4.98M
        offset += size;
62
4.98M
    }
63
275k
    if (eos) {
64
4.99M
        for (int i = 0; i < channel_count; ++i) {
65
4.75M
            if (!channels[i]->is_receiver_eof()) {
66
18.4E
                VLOG_DEBUG << fmt::format("EOS partition {} of {}, block:\n{}", i, channel_count,
67
18.4E
                                          block->dump_data());
68
4.75M
                status = channels[i]->add_rows(block, _origin_row_idx.data(), 0, 0, true);
69
4.75M
                HANDLE_CHANNEL_STATUS(state, channels[i], status);
70
4.75M
            }
71
4.73M
        }
72
261k
    }
73
275k
    return Status::OK();
74
275k
}
75
// NOLINTEND(readability-function-cognitive-complexity)
76
77
3.89k
Status ExchangeOlapWriter::write(RuntimeState* state, Block* block, bool eos) {
78
3.89k
    Block prior_block;
79
3.89k
    auto* tablet_partitioner = assert_cast<TabletSinkHashPartitioner*>(_partitioner);
80
3.89k
    RETURN_IF_ERROR(tablet_partitioner->try_cut_in_line(prior_block));
81
3.89k
    if (!prior_block.empty()) {
82
        // prior_block (batching rows) cuts in line, deal it first.
83
0
        RETURN_IF_ERROR(_write_impl(state, &prior_block));
84
0
        tablet_partitioner->finish_cut_in_line();
85
0
    }
86
87
3.89k
    RETURN_IF_ERROR(_write_impl(state, block));
88
89
    // all data wrote. consider batched rows before eos.
90
3.88k
    if (eos) {
91
        // get all batched rows
92
2.80k
        tablet_partitioner->mark_last_block();
93
2.80k
        Block final_batching_block;
94
2.80k
        RETURN_IF_ERROR(tablet_partitioner->try_cut_in_line(final_batching_block));
95
2.80k
        if (!final_batching_block.empty()) {
96
12
            RETURN_IF_ERROR(_write_impl(state, &final_batching_block, true));
97
2.79k
        } else {
98
            // No batched rows, send empty block with eos signal.
99
2.79k
            Block empty_block = block->clone_empty();
100
2.79k
            RETURN_IF_ERROR(_write_impl(state, &empty_block, true));
101
2.79k
        }
102
2.80k
    }
103
3.88k
    return Status::OK();
104
3.88k
}
105
106
6.69k
Status ExchangeOlapWriter::_write_impl(RuntimeState* state, Block* block, bool eos) {
107
6.69k
    auto rows = block->rows();
108
6.69k
    auto* tablet_partitioner = assert_cast<TabletSinkHashPartitioner*>(_partitioner);
109
6.69k
    {
110
6.69k
        SCOPED_TIMER(_local_state.split_block_hash_compute_timer());
111
6.69k
        RETURN_IF_ERROR(tablet_partitioner->do_partitioning(state, block));
112
6.69k
    }
113
6.69k
    {
114
6.69k
        SCOPED_TIMER(_local_state.distribute_rows_into_channels_timer());
115
6.69k
        const auto& channel_ids = tablet_partitioner->get_channel_ids();
116
6.69k
        const auto invalid_val = tablet_partitioner->invalid_sentinel();
117
6.69k
        DCHECK_EQ(channel_ids.size(), rows);
118
119
        // decrease not sinked rows this time
120
6.69k
        COUNTER_UPDATE(_local_state.rows_input_counter(),
121
6.69k
                       -1LL * std::ranges::count(channel_ids, invalid_val));
122
123
6.69k
        RETURN_IF_ERROR(_channel_add_rows(state, _local_state.channels,
124
6.69k
                                          _local_state.channels.size(), channel_ids, rows, block,
125
6.69k
                                          eos, invalid_val));
126
6.69k
    }
127
6.69k
    return Status::OK();
128
6.69k
}
129
130
267k
Status ExchangeTrivialWriter::write(RuntimeState* state, Block* block, bool eos) {
131
267k
    auto rows = block->rows();
132
267k
    {
133
267k
        SCOPED_TIMER(_local_state.split_block_hash_compute_timer());
134
267k
        RETURN_IF_ERROR(_partitioner->do_partitioning(state, block));
135
267k
    }
136
267k
    {
137
267k
        SCOPED_TIMER(_local_state.distribute_rows_into_channels_timer());
138
267k
        const auto& channel_ids = _partitioner->get_channel_ids();
139
140
267k
        RETURN_IF_ERROR(_channel_add_rows(state, _local_state.channels,
141
267k
                                          _local_state.channels.size(), channel_ids, rows, block,
142
267k
                                          eos));
143
267k
    }
144
145
267k
    return Status::OK();
146
267k
}
147
148
Status ExchangeOlapWriter::_channel_add_rows(RuntimeState* state,
149
                                             std::vector<std::shared_ptr<Channel>>& channels,
150
                                             size_t channel_count,
151
                                             const std::vector<HashValType>& channel_ids,
152
                                             size_t rows, Block* block, bool eos,
153
6.69k
                                             HashValType invalid_val) {
154
6.69k
    size_t effective_rows = 0;
155
6.69k
    effective_rows =
156
2.70M
            std::ranges::count_if(channel_ids, [=](int64_t cid) { return cid != invalid_val; });
157
158
    // row index will skip all skipped rows.
159
6.69k
    _origin_row_idx.resize(effective_rows);
160
6.69k
    _channel_rows_histogram.assign(channel_count, 0U);
161
6.69k
    _channel_pos_offsets.resize(channel_count);
162
2.53M
    for (size_t i = 0; i < rows; ++i) {
163
2.52M
        if (channel_ids[i] == invalid_val) {
164
23
            continue;
165
23
        }
166
2.52M
        auto cid = channel_ids[i];
167
2.52M
        _channel_rows_histogram[cid]++;
168
2.52M
    }
169
6.69k
    _channel_pos_offsets[0] = 0;
170
44.7k
    for (size_t i = 1; i < channel_count; ++i) {
171
38.0k
        _channel_pos_offsets[i] = _channel_pos_offsets[i - 1] + _channel_rows_histogram[i - 1];
172
38.0k
    }
173
2.51M
    for (uint32_t i = 0; i < rows; ++i) {
174
2.50M
        if (channel_ids[i] == invalid_val) {
175
23
            continue;
176
23
        }
177
2.50M
        auto cid = channel_ids[i];
178
2.50M
        auto pos = _channel_pos_offsets[cid]++;
179
2.50M
        _origin_row_idx[pos] = i;
180
2.50M
    }
181
182
6.69k
    return _add_rows_impl(state, channels, channel_count, block, eos);
183
6.69k
}
184
185
Status ExchangeTrivialWriter::_channel_add_rows(RuntimeState* state,
186
                                                std::vector<std::shared_ptr<Channel>>& channels,
187
                                                size_t channel_count,
188
                                                const std::vector<HashValType>& channel_ids,
189
267k
                                                size_t rows, Block* block, bool eos) {
190
267k
    _origin_row_idx.resize(rows);
191
267k
    _channel_rows_histogram.assign(channel_count, 0U);
192
267k
    _channel_pos_offsets.resize(channel_count);
193
19.7M
    for (size_t i = 0; i < rows; ++i) {
194
19.4M
        _channel_rows_histogram[channel_ids[i]]++;
195
19.4M
    }
196
267k
    _channel_pos_offsets[0] = 0;
197
4.93M
    for (size_t i = 1; i < channel_count; ++i) {
198
4.66M
        _channel_pos_offsets[i] = _channel_pos_offsets[i - 1] + _channel_rows_histogram[i - 1];
199
4.66M
    }
200
19.6M
    for (uint32_t i = 0; i < rows; i++) {
201
19.3M
        auto cid = channel_ids[i];
202
19.3M
        auto pos = _channel_pos_offsets[cid]++;
203
19.3M
        _origin_row_idx[pos] = i;
204
19.3M
    }
205
206
267k
    return _add_rows_impl(state, channels, channel_count, block, eos);
207
267k
}
208
209
} // namespace doris