be/src/exec/spill/spill_repartitioner.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <cstddef> |
21 | | #include <cstdint> |
22 | | #include <memory> |
23 | | #include <vector> |
24 | | |
25 | | #include "common/status.h" |
26 | | #include "core/data_type/data_type.h" |
27 | | #include "exec/spill/spill_file.h" |
28 | | #include "exec/spill/spill_file_reader.h" |
29 | | #include "exec/spill/spill_file_writer.h" |
30 | | |
31 | | namespace doris { |
32 | | class RuntimeState; |
33 | | class RuntimeProfile; |
34 | | |
35 | | class Block; |
36 | | class PartitionerBase; |
37 | | |
38 | | /// SpillRepartitioner reads data from an input SpillFile and redistributes it |
39 | | /// into FANOUT output SpillFiles by computing hash on key columns. |
40 | | /// |
41 | | /// This is the core building block for multi-level spill partitioning used by both |
42 | | /// Hash Join and Aggregation operators. |
43 | | /// |
44 | | /// Two modes of operation: |
45 | | /// 1. Partitioner mode (init): Uses a PartitionerBase with expression contexts to |
46 | | /// compute hash. Suitable for Hash Join where blocks match the child's row descriptor. |
47 | | /// 2. Column-index mode (init_with_key_columns): Computes CRC32 hash directly on |
48 | | /// specified column indices. Suitable for Aggregation where spill blocks have a |
49 | | /// different schema (key columns at fixed positions 0..N-1). |
50 | | /// |
51 | | /// For repartitioning, hash computation and final channel mapping are separated: |
52 | | /// - a partitioner can provide either direct channel ids or raw hash values |
53 | | /// (e.g. SpillRePartitionChannelIds returns raw hash), |
54 | | /// - SpillRepartitioner then applies the final channel mapping strategy. |
55 | | /// This keeps repartition policy centralized and allows level-aware mapping. |
56 | | /// |
57 | | /// Processing is incremental: each call to repartition() processes up to MAX_BATCH_BYTES |
58 | | /// (32 MB) of data and then returns, allowing the pipeline scheduler to yield and |
59 | | /// re-schedule. The caller should loop calling repartition() until `done` is true. |
60 | | /// |
61 | | /// Usage pattern: |
62 | | /// // 1. Initialize |
63 | | /// repartitioner.init(...) or repartitioner.init_with_key_columns(...) |
64 | | /// // 2. Create output files and set up writers |
65 | | /// SpillRepartitioner::create_output_spill_files(state, ..., output_files, fanout); |
66 | | /// repartitioner.setup_output(state, output_files); |
67 | | /// // 3. Route blocks and/or repartition files |
68 | | /// repartitioner.route_block(state, block); // from hash table |
69 | | /// repartitioner.repartition(state, input_file, &done); // from spill file |
70 | | /// // 4. Finalize |
71 | | /// repartitioner.finalize(); |
72 | | class SpillRepartitioner { |
73 | | public: |
74 | | static constexpr int MAX_DEPTH = 8; |
75 | | static constexpr size_t MAX_BATCH_BYTES = 32 * 1024 * 1024; // 32 MB yield threshold |
76 | | |
77 | 67 | SpillRepartitioner() = default; |
78 | 67 | ~SpillRepartitioner() = default; |
79 | | |
80 | | /// Initialize the repartitioner with a partitioner (for Hash Join). |
81 | | void init(std::unique_ptr<PartitionerBase> partitioner, RuntimeProfile* profile, int fanout, |
82 | | int repartition_level); |
83 | | |
84 | | /// Initialize the repartitioner with explicit key column indices (for Aggregation). |
85 | | void init_with_key_columns(std::vector<size_t> key_column_indices, |
86 | | std::vector<DataTypePtr> key_data_types, RuntimeProfile* profile, |
87 | | int fanout, int repartition_level); |
88 | | |
89 | | /// Set up output SpillFiles and create persistent writers for them. |
90 | | /// Must be called before repartition() or route_block(). |
91 | | Status setup_output(RuntimeState* state, std::vector<SpillFileSPtr>& output_spill_files); |
92 | | |
93 | | /// Repartition data from input_spill_file into output files. |
94 | | /// The input reader is created lazily and persists across yield calls. |
95 | | /// Call repeatedly until done == true. |
96 | | Status repartition(RuntimeState* state, SpillFileSPtr& input_spill_file, bool* done); |
97 | | |
98 | | /// Repartition data using an existing reader (continues from its current |
99 | | /// position). Useful when the caller has already partially read the file |
100 | | /// and wants to repartition only the remaining data without re-reading |
101 | | /// from the beginning. Ownership of the reader is transferred on completion. |
102 | | /// Call repeatedly until done == true. |
103 | | Status repartition(RuntimeState* state, SpillFileReaderSPtr& reader, bool* done); |
104 | | |
105 | | /// Route a single in-memory block into output files via persistent writers. |
106 | | Status route_block(RuntimeState* state, Block& block); |
107 | | |
108 | | /// Finalize: close all output writers and update SpillFile stats. |
109 | | /// Also resets internal reader state. |
110 | | Status finalize(); |
111 | | |
112 | | /// Create FANOUT output SpillFiles registered with the SpillFileManager. |
113 | | static Status create_output_spill_files(RuntimeState* state, int node_id, |
114 | | const std::string& label_prefix, int fanout, |
115 | | std::vector<SpillFileSPtr>& output_spill_files); |
116 | | |
117 | 1 | int fanout() const { return _fanout; } |
118 | | |
119 | | private: |
120 | | /// Route a block using the partitioner (Hash Join mode). |
121 | | Status _route_block(RuntimeState* state, Block& block, |
122 | | std::vector<std::unique_ptr<MutableBlock>>& output_buffers); |
123 | | |
124 | | /// Route a block using direct column-index hashing (Aggregation mode). |
125 | | Status _route_block_by_columns(RuntimeState* state, Block& block, |
126 | | std::vector<std::unique_ptr<MutableBlock>>& output_buffers); |
127 | | |
128 | | Status _flush_buffer(RuntimeState* state, int partition_idx, |
129 | | std::unique_ptr<MutableBlock>& buffer); |
130 | | |
131 | | Status _flush_all_buffers(RuntimeState* state, |
132 | | std::vector<std::unique_ptr<MutableBlock>>& output_buffers, |
133 | | bool force); |
134 | | |
135 | | uint32_t _map_hash_to_partition(uint32_t hash) const; |
136 | | |
137 | | // Partitioner mode (used by Hash Join) |
138 | | std::unique_ptr<PartitionerBase> _partitioner; |
139 | | |
140 | | // Column-index mode (used by Aggregation) |
141 | | std::vector<size_t> _key_column_indices; |
142 | | std::vector<DataTypePtr> _key_data_types; |
143 | | bool _use_column_index_mode = false; |
144 | | |
145 | | RuntimeProfile::Counter* _repartition_timer = nullptr; |
146 | | RuntimeProfile::Counter* _repartition_rows = nullptr; |
147 | | RuntimeProfile* _operator_profile = nullptr; |
148 | | int _fanout = 8; |
149 | | int _repartition_level = 0; |
150 | | |
151 | | // ── Persistent state across repartition/route_block calls ────── |
152 | | // Output writers (one per partition), created by setup_output() |
153 | | std::vector<SpillFileWriterSPtr> _output_writers; |
154 | | // Pointer to caller's output SpillFiles vector (for finalize) |
155 | | std::vector<SpillFileSPtr>* _output_spill_files = nullptr; |
156 | | // Input reader for repartition(), persists across yield calls |
157 | | SpillFileReaderSPtr _input_reader; |
158 | | SpillFileSPtr _current_input_file; |
159 | | }; |
160 | | |
161 | | } // namespace doris |