Coverage Report

Created: 2026-04-10 12:12

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/spill/spill_repartitioner.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/spill/spill_repartitioner.h"
19
20
#include <glog/logging.h>
21
22
#include <limits>
23
#include <memory>
24
#include <vector>
25
26
#include "core/block/block.h"
27
#include "core/column/column.h"
28
#include "exec/partitioner/partitioner.h"
29
#include "exec/spill/spill_file.h"
30
#include "exec/spill/spill_file_manager.h"
31
#include "exec/spill/spill_file_reader.h"
32
#include "exec/spill/spill_file_writer.h"
33
#include "runtime/exec_env.h"
34
#include "runtime/runtime_profile.h"
35
#include "runtime/runtime_state.h"
36
#include "util/uid_util.h"
37
38
namespace doris {
39
40
void SpillRepartitioner::init(std::unique_ptr<PartitionerBase> partitioner, RuntimeProfile* profile,
41
4
                              int fanout, int repartition_level) {
42
4
    _partitioner = std::move(partitioner);
43
4
    _use_column_index_mode = false;
44
4
    _fanout = fanout;
45
4
    _repartition_level = repartition_level;
46
4
    _operator_profile = profile;
47
4
    _repartition_timer = ADD_TIMER_WITH_LEVEL(profile, "SpillRepartitionTime", 1);
48
4
    _repartition_rows = ADD_COUNTER_WITH_LEVEL(profile, "SpillRepartitionRows", TUnit::UNIT, 1);
49
4
}
50
51
void SpillRepartitioner::init_with_key_columns(std::vector<size_t> key_column_indices,
52
                                               std::vector<DataTypePtr> key_data_types,
53
                                               RuntimeProfile* profile, int fanout,
54
12
                                               int repartition_level) {
55
12
    _key_column_indices = std::move(key_column_indices);
56
12
    _key_data_types = std::move(key_data_types);
57
12
    _use_column_index_mode = true;
58
12
    _partitioner.reset();
59
12
    _fanout = fanout;
60
12
    _repartition_level = repartition_level;
61
12
    _operator_profile = profile;
62
12
    _repartition_timer = ADD_TIMER_WITH_LEVEL(profile, "SpillRepartitionTime", 1);
63
12
    _repartition_rows = ADD_COUNTER_WITH_LEVEL(profile, "SpillRepartitionRows", TUnit::UNIT, 1);
64
12
}
65
66
Status SpillRepartitioner::setup_output(RuntimeState* state,
67
15
                                        std::vector<SpillFileSPtr>& output_spill_files) {
68
15
    DCHECK_EQ(output_spill_files.size(), _fanout);
69
15
    _output_spill_files = &output_spill_files;
70
15
    _output_writers.resize(_fanout);
71
99
    for (int i = 0; i < _fanout; ++i) {
72
84
        RETURN_IF_ERROR(
73
84
                output_spill_files[i]->create_writer(state, _operator_profile, _output_writers[i]));
74
84
    }
75
    // Reset reader state from any previous repartition session
76
15
    _input_reader.reset();
77
15
    _current_input_file.reset();
78
15
    return Status::OK();
79
15
}
80
81
Status SpillRepartitioner::repartition(RuntimeState* state, SpillFileSPtr& input_spill_file,
82
3
                                       bool* done) {
83
3
    DCHECK(_output_spill_files != nullptr) << "setup_output() must be called first";
84
3
    SCOPED_TIMER(_repartition_timer);
85
86
3
    *done = false;
87
3
    size_t accumulated_bytes = 0;
88
89
    // Create or reuse input reader. If the input file changed, create a new reader.
90
3
    if (_current_input_file != input_spill_file) {
91
3
        _current_input_file = input_spill_file;
92
3
        _input_reader = input_spill_file->create_reader(state, _operator_profile);
93
3
        RETURN_IF_ERROR(_input_reader->open());
94
3
    }
95
96
    // Per-partition write buffers to batch small writes
97
3
    std::vector<std::unique_ptr<MutableBlock>> output_buffers(_fanout);
98
99
3
    bool eos = false;
100
11
    while (!eos && !state->is_cancelled()) {
101
8
        Block block;
102
8
        RETURN_IF_ERROR(_input_reader->read(&block, &eos));
103
104
8
        if (block.empty()) {
105
3
            continue;
106
3
        }
107
108
5
        accumulated_bytes += block.allocated_bytes();
109
5
        COUNTER_UPDATE(_repartition_rows, block.rows());
110
111
5
        if (_use_column_index_mode) {
112
1
            RETURN_IF_ERROR(_route_block_by_columns(state, block, output_buffers));
113
4
        } else {
114
4
            RETURN_IF_ERROR(_route_block(state, block, output_buffers));
115
4
        }
116
117
        // Yield after processing MAX_BATCH_BYTES to let pipeline scheduler re-schedule
118
5
        if (accumulated_bytes >= MAX_BATCH_BYTES && !eos) {
119
0
            break;
120
0
        }
121
5
    }
122
123
    // Flush all remaining buffers
124
3
    RETURN_IF_ERROR(_flush_all_buffers(state, output_buffers, /*force=*/true));
125
126
3
    if (eos) {
127
3
        *done = true;
128
        // Reset reader for this input file
129
3
        _input_reader.reset();
130
3
        _current_input_file.reset();
131
3
    }
132
133
3
    return Status::OK();
134
3
}
135
136
Status SpillRepartitioner::repartition(RuntimeState* state, SpillFileReaderSPtr& reader,
137
3
                                       bool* done) {
138
3
    DCHECK(_output_spill_files != nullptr) << "setup_output() must be called first";
139
3
    DCHECK(reader != nullptr) << "reader must not be null";
140
3
    SCOPED_TIMER(_repartition_timer);
141
142
3
    *done = false;
143
3
    size_t accumulated_bytes = 0;
144
145
    // Per-partition write buffers to batch small writes
146
3
    std::vector<std::unique_ptr<MutableBlock>> output_buffers(_fanout);
147
148
3
    bool eos = false;
149
9
    while (!eos && !state->is_cancelled()) {
150
6
        Block block;
151
6
        RETURN_IF_ERROR(reader->read(&block, &eos));
152
153
6
        if (block.empty()) {
154
3
            continue;
155
3
        }
156
157
3
        accumulated_bytes += block.allocated_bytes();
158
3
        COUNTER_UPDATE(_repartition_rows, block.rows());
159
160
3
        if (_use_column_index_mode) {
161
1
            RETURN_IF_ERROR(_route_block_by_columns(state, block, output_buffers));
162
2
        } else {
163
2
            RETURN_IF_ERROR(_route_block(state, block, output_buffers));
164
2
        }
165
166
        // Yield after processing MAX_BATCH_BYTES to let pipeline scheduler re-schedule
167
3
        if (accumulated_bytes >= MAX_BATCH_BYTES && !eos) {
168
0
            break;
169
0
        }
170
3
    }
171
172
    // Flush all remaining buffers
173
3
    RETURN_IF_ERROR(_flush_all_buffers(state, output_buffers, /*force=*/true));
174
175
3
    if (eos) {
176
3
        *done = true;
177
3
        reader.reset();
178
3
    }
179
180
3
    return Status::OK();
181
3
}
182
183
12
Status SpillRepartitioner::route_block(RuntimeState* state, Block& block) {
184
12
    DCHECK(_output_spill_files != nullptr) << "setup_output() must be called first";
185
12
    if (UNLIKELY(_output_spill_files == nullptr)) {
186
0
        return Status::InternalError("SpillRepartitioner::setup_output() must be called first");
187
0
    }
188
12
    SCOPED_TIMER(_repartition_timer);
189
190
12
    if (block.empty()) {
191
1
        return Status::OK();
192
1
    }
193
194
11
    COUNTER_UPDATE(_repartition_rows, block.rows());
195
196
11
    std::vector<std::unique_ptr<MutableBlock>> output_buffers(_fanout);
197
11
    if (_use_column_index_mode) {
198
9
        RETURN_IF_ERROR(_route_block_by_columns(state, block, output_buffers));
199
9
    } else {
200
2
        RETURN_IF_ERROR(_route_block(state, block, output_buffers));
201
2
    }
202
11
    RETURN_IF_ERROR(_flush_all_buffers(state, output_buffers, /*force=*/true));
203
11
    return Status::OK();
204
11
}
205
206
15
Status SpillRepartitioner::finalize() {
207
15
    DCHECK(_output_spill_files != nullptr) << "setup_output() must be called first";
208
15
    if (UNLIKELY(_output_spill_files == nullptr)) {
209
0
        return Status::InternalError("SpillRepartitioner::setup_output() must be called first");
210
0
    }
211
    // Close all writers (Writer::close() automatically updates SpillFile stats)
212
99
    for (int i = 0; i < _fanout; ++i) {
213
84
        if (_output_writers[i]) {
214
84
            RETURN_IF_ERROR(_output_writers[i]->close());
215
84
        }
216
84
    }
217
15
    _output_writers.clear();
218
15
    _output_spill_files = nullptr;
219
15
    _input_reader.reset();
220
15
    _current_input_file.reset();
221
15
    return Status::OK();
222
15
}
223
224
Status SpillRepartitioner::create_output_spill_files(
225
        RuntimeState* state, int node_id, const std::string& label_prefix, int fanout,
226
16
        std::vector<SpillFileSPtr>& output_spill_files) {
227
16
    output_spill_files.resize(fanout);
228
106
    for (int i = 0; i < fanout; ++i) {
229
90
        auto relative_path = fmt::format("{}/{}_sub{}-{}-{}-{}", print_id(state->query_id()),
230
90
                                         label_prefix, i, node_id, state->task_id(),
231
90
                                         ExecEnv::GetInstance()->spill_file_mgr()->next_id());
232
90
        RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(
233
90
                relative_path, output_spill_files[i]));
234
90
    }
235
16
    return Status::OK();
236
16
}
237
238
Status SpillRepartitioner::_route_block(
239
        RuntimeState* state, Block& block,
240
8
        std::vector<std::unique_ptr<MutableBlock>>& output_buffers) {
241
    // Compute raw hash values for every row in the block.
242
8
    RETURN_IF_ERROR(_partitioner->do_partitioning(state, &block));
243
8
    const auto& hash_vals = _partitioner->get_channel_ids();
244
8
    const auto rows = block.rows();
245
246
    // Build per-partition row index lists
247
8
    std::vector<std::vector<uint32_t>> partition_row_indexes(_fanout);
248
28
    for (uint32_t i = 0; i < rows; ++i) {
249
20
        auto partition_idx = _map_hash_to_partition(hash_vals[i]);
250
20
        partition_row_indexes[partition_idx].emplace_back(i);
251
20
    }
252
253
    // Scatter rows into per-partition buffers
254
72
    for (int p = 0; p < _fanout; ++p) {
255
64
        if (partition_row_indexes[p].empty()) {
256
44
            continue;
257
44
        }
258
259
        // Lazily initialize the buffer
260
20
        if (!output_buffers[p]) {
261
18
            output_buffers[p] = MutableBlock::create_unique(block.clone_empty());
262
18
        }
263
264
20
        RETURN_IF_ERROR(output_buffers[p]->add_rows(
265
20
                &block, partition_row_indexes[p].data(),
266
20
                partition_row_indexes[p].data() + partition_row_indexes[p].size()));
267
268
        // Flush large buffers immediately to keep memory bounded
269
20
        if (output_buffers[p]->allocated_bytes() >= MAX_BATCH_BYTES) {
270
0
            RETURN_IF_ERROR(_flush_buffer(state, p, output_buffers[p]));
271
0
        }
272
20
    }
273
274
8
    return Status::OK();
275
8
}
276
277
Status SpillRepartitioner::_route_block_by_columns(
278
        RuntimeState* state, Block& block,
279
11
        std::vector<std::unique_ptr<MutableBlock>>& output_buffers) {
280
11
    const auto rows = block.rows();
281
11
    if (rows == 0) {
282
0
        return Status::OK();
283
0
    }
284
285
    // Compute CRC32 hash on key columns
286
11
    std::vector<uint32_t> hash_vals(rows, 0);
287
11
    auto* __restrict hashes = hash_vals.data();
288
22
    for (size_t j = 0; j < _key_column_indices.size(); ++j) {
289
11
        auto col_idx = _key_column_indices[j];
290
11
        DCHECK_LT(col_idx, block.columns());
291
11
        const auto& column = block.get_by_position(col_idx).column;
292
11
        column->update_crcs_with_value(hashes, _key_data_types[j]->get_primitive_type(),
293
11
                                       static_cast<uint32_t>(rows));
294
11
    }
295
296
    // Map hash values to output channels with level-aware mixing.
297
641
    for (size_t i = 0; i < rows; ++i) {
298
630
        hashes[i] = _map_hash_to_partition(hashes[i]);
299
630
    }
300
301
    // Build per-partition row index lists
302
11
    std::vector<std::vector<uint32_t>> partition_row_indexes(_fanout);
303
641
    for (uint32_t i = 0; i < rows; ++i) {
304
630
        partition_row_indexes[hashes[i]].emplace_back(i);
305
630
    }
306
307
    // Scatter rows into per-partition buffers
308
55
    for (int p = 0; p < _fanout; ++p) {
309
44
        if (partition_row_indexes[p].empty()) {
310
0
            continue;
311
0
        }
312
313
44
        if (!output_buffers[p]) {
314
44
            output_buffers[p] = MutableBlock::create_unique(block.clone_empty());
315
44
        }
316
317
44
        RETURN_IF_ERROR(output_buffers[p]->add_rows(
318
44
                &block, partition_row_indexes[p].data(),
319
44
                partition_row_indexes[p].data() + partition_row_indexes[p].size()));
320
321
44
        if (output_buffers[p]->allocated_bytes() >= MAX_BATCH_BYTES) {
322
0
            RETURN_IF_ERROR(_flush_buffer(state, p, output_buffers[p]));
323
0
        }
324
44
    }
325
326
11
    return Status::OK();
327
11
}
328
329
Status SpillRepartitioner::_flush_buffer(RuntimeState* state, int partition_idx,
330
62
                                         std::unique_ptr<MutableBlock>& buffer) {
331
62
    if (!buffer || buffer->rows() == 0) {
332
0
        return Status::OK();
333
0
    }
334
62
    DCHECK(partition_idx < _fanout && _output_writers[partition_idx]);
335
62
    if (UNLIKELY(partition_idx >= _fanout || !_output_writers[partition_idx])) {
336
0
        return Status::InternalError(
337
0
                "SpillRepartitioner output writer is not initialized for partition {}",
338
0
                partition_idx);
339
0
    }
340
62
    auto out_block = buffer->to_block();
341
62
    buffer.reset();
342
62
    return _output_writers[partition_idx]->write_block(state, out_block);
343
62
}
344
345
Status SpillRepartitioner::_flush_all_buffers(
346
        RuntimeState* state, std::vector<std::unique_ptr<MutableBlock>>& output_buffers,
347
17
        bool force) {
348
109
    for (int i = 0; i < _fanout; ++i) {
349
92
        if (!output_buffers[i] || output_buffers[i]->rows() == 0) {
350
30
            continue;
351
30
        }
352
62
        if (force || output_buffers[i]->allocated_bytes() >= MAX_BATCH_BYTES) {
353
62
            RETURN_IF_ERROR(_flush_buffer(state, i, output_buffers[i]));
354
62
        }
355
62
    }
356
17
    return Status::OK();
357
17
}
358
359
650
uint32_t SpillRepartitioner::_map_hash_to_partition(uint32_t hash) const {
360
650
    DCHECK_GT(_fanout, 0);
361
    // Use a level-dependent salt so each repartition level has a different
362
    // projection from hash-space to partition-space.
363
650
    constexpr uint32_t LEVEL_SALT_BASE = 0x9E3779B9U;
364
650
    auto salt = static_cast<uint32_t>(_repartition_level + 1) * LEVEL_SALT_BASE;
365
650
    auto mixed = crc32c_shuffle_mix(hash ^ salt);
366
650
    return ((mixed >> 16) | (mixed << 16)) % static_cast<uint32_t>(_fanout);
367
650
}
368
369
} // namespace doris