Coverage Report

Created: 2026-03-19 07:34

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/spill/spill_repartitioner.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/spill/spill_repartitioner.h"
19
20
#include <glog/logging.h>
21
22
#include <limits>
23
#include <memory>
24
#include <vector>
25
26
#include "core/block/block.h"
27
#include "core/column/column.h"
28
#include "exec/partitioner/partitioner.h"
29
#include "exec/spill/spill_file.h"
30
#include "exec/spill/spill_file_manager.h"
31
#include "exec/spill/spill_file_reader.h"
32
#include "exec/spill/spill_file_writer.h"
33
#include "runtime/exec_env.h"
34
#include "runtime/runtime_profile.h"
35
#include "runtime/runtime_state.h"
36
#include "util/uid_util.h"
37
38
namespace doris {
39
#include "common/compile_check_begin.h"
40
41
void SpillRepartitioner::init(std::unique_ptr<PartitionerBase> partitioner, RuntimeProfile* profile,
42
4
                              int fanout, int repartition_level) {
43
4
    _partitioner = std::move(partitioner);
44
4
    _use_column_index_mode = false;
45
4
    _fanout = fanout;
46
4
    _repartition_level = repartition_level;
47
4
    _operator_profile = profile;
48
4
    _repartition_timer = ADD_TIMER_WITH_LEVEL(profile, "SpillRepartitionTime", 1);
49
4
    _repartition_rows = ADD_COUNTER_WITH_LEVEL(profile, "SpillRepartitionRows", TUnit::UNIT, 1);
50
4
}
51
52
void SpillRepartitioner::init_with_key_columns(std::vector<size_t> key_column_indices,
53
                                               std::vector<DataTypePtr> key_data_types,
54
                                               RuntimeProfile* profile, int fanout,
55
12
                                               int repartition_level) {
56
12
    _key_column_indices = std::move(key_column_indices);
57
12
    _key_data_types = std::move(key_data_types);
58
12
    _use_column_index_mode = true;
59
12
    _partitioner.reset();
60
12
    _fanout = fanout;
61
12
    _repartition_level = repartition_level;
62
12
    _operator_profile = profile;
63
12
    _repartition_timer = ADD_TIMER_WITH_LEVEL(profile, "SpillRepartitionTime", 1);
64
12
    _repartition_rows = ADD_COUNTER_WITH_LEVEL(profile, "SpillRepartitionRows", TUnit::UNIT, 1);
65
12
}
66
67
Status SpillRepartitioner::setup_output(RuntimeState* state,
68
15
                                        std::vector<SpillFileSPtr>& output_spill_files) {
69
15
    DCHECK_EQ(output_spill_files.size(), _fanout);
70
15
    _output_spill_files = &output_spill_files;
71
15
    _output_writers.resize(_fanout);
72
99
    for (int i = 0; i < _fanout; ++i) {
73
84
        RETURN_IF_ERROR(
74
84
                output_spill_files[i]->create_writer(state, _operator_profile, _output_writers[i]));
75
84
    }
76
    // Reset reader state from any previous repartition session
77
15
    _input_reader.reset();
78
15
    _current_input_file.reset();
79
15
    return Status::OK();
80
15
}
81
82
Status SpillRepartitioner::repartition(RuntimeState* state, SpillFileSPtr& input_spill_file,
83
3
                                       bool* done) {
84
3
    DCHECK(_output_spill_files != nullptr) << "setup_output() must be called first";
85
3
    SCOPED_TIMER(_repartition_timer);
86
87
3
    *done = false;
88
3
    size_t accumulated_bytes = 0;
89
90
    // Create or reuse input reader. If the input file changed, create a new reader.
91
3
    if (_current_input_file != input_spill_file) {
92
3
        _current_input_file = input_spill_file;
93
3
        _input_reader = input_spill_file->create_reader(state, _operator_profile);
94
3
        RETURN_IF_ERROR(_input_reader->open());
95
3
    }
96
97
    // Per-partition write buffers to batch small writes
98
3
    std::vector<std::unique_ptr<MutableBlock>> output_buffers(_fanout);
99
100
3
    bool eos = false;
101
11
    while (!eos && !state->is_cancelled()) {
102
8
        Block block;
103
8
        RETURN_IF_ERROR(_input_reader->read(&block, &eos));
104
105
8
        if (block.empty()) {
106
3
            continue;
107
3
        }
108
109
5
        accumulated_bytes += block.allocated_bytes();
110
5
        COUNTER_UPDATE(_repartition_rows, block.rows());
111
112
5
        if (_use_column_index_mode) {
113
1
            RETURN_IF_ERROR(_route_block_by_columns(state, block, output_buffers));
114
4
        } else {
115
4
            RETURN_IF_ERROR(_route_block(state, block, output_buffers));
116
4
        }
117
118
        // Yield after processing MAX_BATCH_BYTES to let pipeline scheduler re-schedule
119
5
        if (accumulated_bytes >= MAX_BATCH_BYTES && !eos) {
120
0
            break;
121
0
        }
122
5
    }
123
124
    // Flush all remaining buffers
125
3
    RETURN_IF_ERROR(_flush_all_buffers(state, output_buffers, /*force=*/true));
126
127
3
    if (eos) {
128
3
        *done = true;
129
        // Reset reader for this input file
130
3
        _input_reader.reset();
131
3
        _current_input_file.reset();
132
3
    }
133
134
3
    return Status::OK();
135
3
}
136
137
Status SpillRepartitioner::repartition(RuntimeState* state, SpillFileReaderSPtr& reader,
138
3
                                       bool* done) {
139
3
    DCHECK(_output_spill_files != nullptr) << "setup_output() must be called first";
140
3
    DCHECK(reader != nullptr) << "reader must not be null";
141
3
    SCOPED_TIMER(_repartition_timer);
142
143
3
    *done = false;
144
3
    size_t accumulated_bytes = 0;
145
146
    // Per-partition write buffers to batch small writes
147
3
    std::vector<std::unique_ptr<MutableBlock>> output_buffers(_fanout);
148
149
3
    bool eos = false;
150
9
    while (!eos && !state->is_cancelled()) {
151
6
        Block block;
152
6
        RETURN_IF_ERROR(reader->read(&block, &eos));
153
154
6
        if (block.empty()) {
155
3
            continue;
156
3
        }
157
158
3
        accumulated_bytes += block.allocated_bytes();
159
3
        COUNTER_UPDATE(_repartition_rows, block.rows());
160
161
3
        if (_use_column_index_mode) {
162
1
            RETURN_IF_ERROR(_route_block_by_columns(state, block, output_buffers));
163
2
        } else {
164
2
            RETURN_IF_ERROR(_route_block(state, block, output_buffers));
165
2
        }
166
167
        // Yield after processing MAX_BATCH_BYTES to let pipeline scheduler re-schedule
168
3
        if (accumulated_bytes >= MAX_BATCH_BYTES && !eos) {
169
0
            break;
170
0
        }
171
3
    }
172
173
    // Flush all remaining buffers
174
3
    RETURN_IF_ERROR(_flush_all_buffers(state, output_buffers, /*force=*/true));
175
176
3
    if (eos) {
177
3
        *done = true;
178
3
        reader.reset();
179
3
    }
180
181
3
    return Status::OK();
182
3
}
183
184
12
Status SpillRepartitioner::route_block(RuntimeState* state, Block& block) {
185
12
    DCHECK(_output_spill_files != nullptr) << "setup_output() must be called first";
186
12
    if (UNLIKELY(_output_spill_files == nullptr)) {
187
0
        return Status::InternalError("SpillRepartitioner::setup_output() must be called first");
188
0
    }
189
12
    SCOPED_TIMER(_repartition_timer);
190
191
12
    if (block.empty()) {
192
1
        return Status::OK();
193
1
    }
194
195
11
    COUNTER_UPDATE(_repartition_rows, block.rows());
196
197
11
    std::vector<std::unique_ptr<MutableBlock>> output_buffers(_fanout);
198
11
    if (_use_column_index_mode) {
199
9
        RETURN_IF_ERROR(_route_block_by_columns(state, block, output_buffers));
200
9
    } else {
201
2
        RETURN_IF_ERROR(_route_block(state, block, output_buffers));
202
2
    }
203
11
    RETURN_IF_ERROR(_flush_all_buffers(state, output_buffers, /*force=*/true));
204
11
    return Status::OK();
205
11
}
206
207
15
Status SpillRepartitioner::finalize() {
208
15
    DCHECK(_output_spill_files != nullptr) << "setup_output() must be called first";
209
15
    if (UNLIKELY(_output_spill_files == nullptr)) {
210
0
        return Status::InternalError("SpillRepartitioner::setup_output() must be called first");
211
0
    }
212
    // Close all writers (Writer::close() automatically updates SpillFile stats)
213
99
    for (int i = 0; i < _fanout; ++i) {
214
84
        if (_output_writers[i]) {
215
84
            RETURN_IF_ERROR(_output_writers[i]->close());
216
84
        }
217
84
    }
218
15
    _output_writers.clear();
219
15
    _output_spill_files = nullptr;
220
15
    _input_reader.reset();
221
15
    _current_input_file.reset();
222
15
    return Status::OK();
223
15
}
224
225
Status SpillRepartitioner::create_output_spill_files(
226
        RuntimeState* state, int node_id, const std::string& label_prefix, int fanout,
227
16
        std::vector<SpillFileSPtr>& output_spill_files) {
228
16
    output_spill_files.resize(fanout);
229
106
    for (int i = 0; i < fanout; ++i) {
230
90
        auto relative_path = fmt::format("{}/{}_sub{}-{}-{}-{}", print_id(state->query_id()),
231
90
                                         label_prefix, i, node_id, state->task_id(),
232
90
                                         ExecEnv::GetInstance()->spill_file_mgr()->next_id());
233
90
        RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(
234
90
                relative_path, output_spill_files[i]));
235
90
    }
236
16
    return Status::OK();
237
16
}
238
239
Status SpillRepartitioner::_route_block(
240
        RuntimeState* state, Block& block,
241
8
        std::vector<std::unique_ptr<MutableBlock>>& output_buffers) {
242
    // Compute raw hash values for every row in the block.
243
8
    RETURN_IF_ERROR(_partitioner->do_partitioning(state, &block));
244
8
    const auto& hash_vals = _partitioner->get_channel_ids();
245
8
    const auto rows = block.rows();
246
247
    // Build per-partition row index lists
248
8
    std::vector<std::vector<uint32_t>> partition_row_indexes(_fanout);
249
28
    for (uint32_t i = 0; i < rows; ++i) {
250
20
        auto partition_idx = _map_hash_to_partition(hash_vals[i]);
251
20
        partition_row_indexes[partition_idx].emplace_back(i);
252
20
    }
253
254
    // Scatter rows into per-partition buffers
255
72
    for (int p = 0; p < _fanout; ++p) {
256
64
        if (partition_row_indexes[p].empty()) {
257
44
            continue;
258
44
        }
259
260
        // Lazily initialize the buffer
261
20
        if (!output_buffers[p]) {
262
18
            output_buffers[p] = MutableBlock::create_unique(block.clone_empty());
263
18
        }
264
265
20
        RETURN_IF_ERROR(output_buffers[p]->add_rows(
266
20
                &block, partition_row_indexes[p].data(),
267
20
                partition_row_indexes[p].data() + partition_row_indexes[p].size()));
268
269
        // Flush large buffers immediately to keep memory bounded
270
20
        if (output_buffers[p]->allocated_bytes() >= MAX_BATCH_BYTES) {
271
0
            RETURN_IF_ERROR(_flush_buffer(state, p, output_buffers[p]));
272
0
        }
273
20
    }
274
275
8
    return Status::OK();
276
8
}
277
278
Status SpillRepartitioner::_route_block_by_columns(
279
        RuntimeState* state, Block& block,
280
11
        std::vector<std::unique_ptr<MutableBlock>>& output_buffers) {
281
11
    const auto rows = block.rows();
282
11
    if (rows == 0) {
283
0
        return Status::OK();
284
0
    }
285
286
    // Compute CRC32 hash on key columns
287
11
    std::vector<uint32_t> hash_vals(rows, 0);
288
11
    auto* __restrict hashes = hash_vals.data();
289
22
    for (size_t j = 0; j < _key_column_indices.size(); ++j) {
290
11
        auto col_idx = _key_column_indices[j];
291
11
        DCHECK_LT(col_idx, block.columns());
292
11
        const auto& column = block.get_by_position(col_idx).column;
293
11
        column->update_crcs_with_value(hashes, _key_data_types[j]->get_primitive_type(),
294
11
                                       static_cast<uint32_t>(rows));
295
11
    }
296
297
    // Map hash values to output channels with level-aware mixing.
298
641
    for (size_t i = 0; i < rows; ++i) {
299
630
        hashes[i] = _map_hash_to_partition(hashes[i]);
300
630
    }
301
302
    // Build per-partition row index lists
303
11
    std::vector<std::vector<uint32_t>> partition_row_indexes(_fanout);
304
641
    for (uint32_t i = 0; i < rows; ++i) {
305
630
        partition_row_indexes[hashes[i]].emplace_back(i);
306
630
    }
307
308
    // Scatter rows into per-partition buffers
309
55
    for (int p = 0; p < _fanout; ++p) {
310
44
        if (partition_row_indexes[p].empty()) {
311
0
            continue;
312
0
        }
313
314
44
        if (!output_buffers[p]) {
315
44
            output_buffers[p] = MutableBlock::create_unique(block.clone_empty());
316
44
        }
317
318
44
        RETURN_IF_ERROR(output_buffers[p]->add_rows(
319
44
                &block, partition_row_indexes[p].data(),
320
44
                partition_row_indexes[p].data() + partition_row_indexes[p].size()));
321
322
44
        if (output_buffers[p]->allocated_bytes() >= MAX_BATCH_BYTES) {
323
0
            RETURN_IF_ERROR(_flush_buffer(state, p, output_buffers[p]));
324
0
        }
325
44
    }
326
327
11
    return Status::OK();
328
11
}
329
330
Status SpillRepartitioner::_flush_buffer(RuntimeState* state, int partition_idx,
331
62
                                         std::unique_ptr<MutableBlock>& buffer) {
332
62
    if (!buffer || buffer->rows() == 0) {
333
0
        return Status::OK();
334
0
    }
335
62
    DCHECK(partition_idx < _fanout && _output_writers[partition_idx]);
336
62
    if (UNLIKELY(partition_idx >= _fanout || !_output_writers[partition_idx])) {
337
0
        return Status::InternalError(
338
0
                "SpillRepartitioner output writer is not initialized for partition {}",
339
0
                partition_idx);
340
0
    }
341
62
    auto out_block = buffer->to_block();
342
62
    buffer.reset();
343
62
    return _output_writers[partition_idx]->write_block(state, out_block);
344
62
}
345
346
Status SpillRepartitioner::_flush_all_buffers(
347
        RuntimeState* state, std::vector<std::unique_ptr<MutableBlock>>& output_buffers,
348
17
        bool force) {
349
109
    for (int i = 0; i < _fanout; ++i) {
350
92
        if (!output_buffers[i] || output_buffers[i]->rows() == 0) {
351
30
            continue;
352
30
        }
353
62
        if (force || output_buffers[i]->allocated_bytes() >= MAX_BATCH_BYTES) {
354
62
            RETURN_IF_ERROR(_flush_buffer(state, i, output_buffers[i]));
355
62
        }
356
62
    }
357
17
    return Status::OK();
358
17
}
359
360
650
uint32_t SpillRepartitioner::_map_hash_to_partition(uint32_t hash) const {
361
650
    DCHECK_GT(_fanout, 0);
362
    // Use a level-dependent salt so each repartition level has a different
363
    // projection from hash-space to partition-space.
364
650
    constexpr uint32_t LEVEL_SALT_BASE = 0x9E3779B9U;
365
650
    auto salt = static_cast<uint32_t>(_repartition_level + 1) * LEVEL_SALT_BASE;
366
650
    auto mixed = crc32c_shuffle_mix(hash ^ salt);
367
650
    return ((mixed >> 16) | (mixed << 16)) % static_cast<uint32_t>(_fanout);
368
650
}
369
370
#include "common/compile_check_end.h"
371
} // namespace doris