Coverage Report

Created: 2026-03-16 17:53

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/spill/spill_repartitioner.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <cstddef>
21
#include <cstdint>
22
#include <memory>
23
#include <vector>
24
25
#include "common/status.h"
26
#include "core/data_type/data_type.h"
27
#include "exec/spill/spill_file.h"
28
#include "exec/spill/spill_file_reader.h"
29
#include "exec/spill/spill_file_writer.h"
30
31
namespace doris {
32
class RuntimeState;
33
class RuntimeProfile;
34
35
class Block;
36
class PartitionerBase;
37
38
/// SpillRepartitioner reads data from an input SpillFile and redistributes it
39
/// into FANOUT output SpillFiles by computing hash on key columns.
40
///
41
/// This is the core building block for multi-level spill partitioning used by both
42
/// Hash Join and Aggregation operators.
43
///
44
/// Two modes of operation:
45
/// 1. Partitioner mode (init): Uses a PartitionerBase with expression contexts to
46
///    compute hash. Suitable for Hash Join where blocks match the child's row descriptor.
47
/// 2. Column-index mode (init_with_key_columns): Computes CRC32 hash directly on
48
///    specified column indices. Suitable for Aggregation where spill blocks have a
49
///    different schema (key columns at fixed positions 0..N-1).
50
///
51
/// For repartitioning, hash computation and final channel mapping are separated:
52
/// - a partitioner can provide either direct channel ids or raw hash values
53
///   (e.g. SpillRePartitionChannelIds returns raw hash),
54
/// - SpillRepartitioner then applies the final channel mapping strategy.
55
/// This keeps repartition policy centralized and allows level-aware mapping.
56
///
57
/// Processing is incremental: each call to repartition() processes up to MAX_BATCH_BYTES
58
/// (32 MB) of data and then returns, allowing the pipeline scheduler to yield and
59
/// re-schedule. The caller should loop calling repartition() until `done` is true.
60
///
61
/// Usage pattern:
62
///   // 1. Initialize
63
///   repartitioner.init(...) or repartitioner.init_with_key_columns(...)
64
///   // 2. Create output files and set up writers
65
///   SpillRepartitioner::create_output_spill_files(state, ..., output_files, fanout);
66
///   repartitioner.setup_output(state, output_files);
67
///   // 3. Route blocks and/or repartition files
68
///   repartitioner.route_block(state, block);         // from hash table
69
///   repartitioner.repartition(state, input_file, &done);  // from spill file
70
///   // 4. Finalize
71
///   repartitioner.finalize();
72
class SpillRepartitioner {
73
public:
74
    static constexpr int MAX_DEPTH = 8;
75
    static constexpr size_t MAX_BATCH_BYTES = 32 * 1024 * 1024; // 32 MB yield threshold
76
77
67
    SpillRepartitioner() = default;
78
67
    ~SpillRepartitioner() = default;
79
80
    /// Initialize the repartitioner with a partitioner (for Hash Join).
81
    void init(std::unique_ptr<PartitionerBase> partitioner, RuntimeProfile* profile, int fanout,
82
              int repartition_level);
83
84
    /// Initialize the repartitioner with explicit key column indices (for Aggregation).
85
    void init_with_key_columns(std::vector<size_t> key_column_indices,
86
                               std::vector<DataTypePtr> key_data_types, RuntimeProfile* profile,
87
                               int fanout, int repartition_level);
88
89
    /// Set up output SpillFiles and create persistent writers for them.
90
    /// Must be called before repartition() or route_block().
91
    Status setup_output(RuntimeState* state, std::vector<SpillFileSPtr>& output_spill_files);
92
93
    /// Repartition data from input_spill_file into output files.
94
    /// The input reader is created lazily and persists across yield calls.
95
    /// Call repeatedly until done == true.
96
    Status repartition(RuntimeState* state, SpillFileSPtr& input_spill_file, bool* done);
97
98
    /// Repartition data using an existing reader (continues from its current
99
    /// position). Useful when the caller has already partially read the file
100
    /// and wants to repartition only the remaining data without re-reading
101
    /// from the beginning. Ownership of the reader is transferred on completion.
102
    /// Call repeatedly until done == true.
103
    Status repartition(RuntimeState* state, SpillFileReaderSPtr& reader, bool* done);
104
105
    /// Route a single in-memory block into output files via persistent writers.
106
    Status route_block(RuntimeState* state, Block& block);
107
108
    /// Finalize: close all output writers and update SpillFile stats.
109
    /// Also resets internal reader state.
110
    Status finalize();
111
112
    /// Create FANOUT output SpillFiles registered with the SpillFileManager.
113
    static Status create_output_spill_files(RuntimeState* state, int node_id,
114
                                            const std::string& label_prefix, int fanout,
115
                                            std::vector<SpillFileSPtr>& output_spill_files);
116
117
1
    int fanout() const { return _fanout; }
118
119
private:
120
    /// Route a block using the partitioner (Hash Join mode).
121
    Status _route_block(RuntimeState* state, Block& block,
122
                        std::vector<std::unique_ptr<MutableBlock>>& output_buffers);
123
124
    /// Route a block using direct column-index hashing (Aggregation mode).
125
    Status _route_block_by_columns(RuntimeState* state, Block& block,
126
                                   std::vector<std::unique_ptr<MutableBlock>>& output_buffers);
127
128
    Status _flush_buffer(RuntimeState* state, int partition_idx,
129
                         std::unique_ptr<MutableBlock>& buffer);
130
131
    Status _flush_all_buffers(RuntimeState* state,
132
                              std::vector<std::unique_ptr<MutableBlock>>& output_buffers,
133
                              bool force);
134
135
    uint32_t _map_hash_to_partition(uint32_t hash) const;
136
137
    // Partitioner mode (used by Hash Join)
138
    std::unique_ptr<PartitionerBase> _partitioner;
139
140
    // Column-index mode (used by Aggregation)
141
    std::vector<size_t> _key_column_indices;
142
    std::vector<DataTypePtr> _key_data_types;
143
    bool _use_column_index_mode = false;
144
145
    RuntimeProfile::Counter* _repartition_timer = nullptr;
146
    RuntimeProfile::Counter* _repartition_rows = nullptr;
147
    RuntimeProfile* _operator_profile = nullptr;
148
    int _fanout = 8;
149
    int _repartition_level = 0;
150
151
    // ── Persistent state across repartition/route_block calls ──────
152
    // Output writers (one per partition), created by setup_output()
153
    std::vector<SpillFileWriterSPtr> _output_writers;
154
    // Pointer to caller's output SpillFiles vector (for finalize)
155
    std::vector<SpillFileSPtr>* _output_spill_files = nullptr;
156
    // Input reader for repartition(), persists across yield calls
157
    SpillFileReaderSPtr _input_reader;
158
    SpillFileSPtr _current_input_file;
159
};
160
161
} // namespace doris