Coverage Report

Created: 2025-06-23 17:55

/root/doris/be/src/pipeline/dependency.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <sqltypes.h>
21
22
#include <atomic>
23
#include <functional>
24
#include <memory>
25
#include <mutex>
26
#include <thread>
27
#include <utility>
28
29
#include "common/logging.h"
30
#include "concurrentqueue.h"
31
#include "gutil/integral_types.h"
32
#include "pipeline/common/agg_utils.h"
33
#include "pipeline/common/join_utils.h"
34
#include "pipeline/exec/data_queue.h"
35
#include "pipeline/exec/join/process_hash_table_probe.h"
36
#include "vec/common/sort/partition_sorter.h"
37
#include "vec/common/sort/sorter.h"
38
#include "vec/core/block.h"
39
#include "vec/core/types.h"
40
#include "vec/spill/spill_stream.h"
41
42
namespace doris::vectorized {
43
class AggFnEvaluator;
44
class VSlotRef;
45
} // namespace doris::vectorized
46
47
namespace doris::pipeline {
48
49
class Dependency;
50
class PipelineTask;
51
struct BasicSharedState;
52
using DependencySPtr = std::shared_ptr<Dependency>;
53
class LocalExchangeSourceLocalState;
54
55
static constexpr auto SLOW_DEPENDENCY_THRESHOLD = 60 * 1000L * 1000L * 1000L;
56
static constexpr auto TIME_UNIT_DEPENDENCY_LOG = 30 * 1000L * 1000L * 1000L;
57
static_assert(TIME_UNIT_DEPENDENCY_LOG < SLOW_DEPENDENCY_THRESHOLD);
58
59
struct BasicSharedState {
60
    ENABLE_FACTORY_CREATOR(BasicSharedState)
61
62
    template <class TARGET>
63
0
    TARGET* cast() {
64
0
        DCHECK(dynamic_cast<TARGET*>(this))
65
0
                << " Mismatch type! Current type is " << typeid(*this).name()
66
0
                << " and expect type is" << typeid(TARGET).name();
67
0
        return reinterpret_cast<TARGET*>(this);
68
0
    }
Unexecuted instantiation: _ZN5doris8pipeline16BasicSharedState4castINS0_19HashJoinSharedStateEEEPT_v
Unexecuted instantiation: _ZN5doris8pipeline16BasicSharedState4castINS0_30PartitionedHashJoinSharedStateEEEPT_v
Unexecuted instantiation: _ZN5doris8pipeline16BasicSharedState4castINS0_15SortSharedStateEEEPT_v
Unexecuted instantiation: _ZN5doris8pipeline16BasicSharedState4castINS0_20SpillSortSharedStateEEEPT_v
Unexecuted instantiation: _ZN5doris8pipeline16BasicSharedState4castINS0_25NestedLoopJoinSharedStateEEEPT_v
Unexecuted instantiation: _ZN5doris8pipeline16BasicSharedState4castINS0_19AnalyticSharedStateEEEPT_v
Unexecuted instantiation: _ZN5doris8pipeline16BasicSharedState4castINS0_14AggSharedStateEEEPT_v
Unexecuted instantiation: _ZN5doris8pipeline16BasicSharedState4castINS0_25PartitionedAggSharedStateEEEPT_v
Unexecuted instantiation: _ZN5doris8pipeline16BasicSharedState4castINS0_16UnionSharedStateEEEPT_v
Unexecuted instantiation: _ZN5doris8pipeline16BasicSharedState4castINS0_28PartitionSortNodeSharedStateEEEPT_v
Unexecuted instantiation: _ZN5doris8pipeline16BasicSharedState4castINS0_20MultiCastSharedStateEEEPT_v
Unexecuted instantiation: _ZN5doris8pipeline16BasicSharedState4castINS0_14SetSharedStateEEEPT_v
Unexecuted instantiation: _ZN5doris8pipeline16BasicSharedState4castINS0_24LocalExchangeSharedStateEEEPT_v
Unexecuted instantiation: _ZN5doris8pipeline16BasicSharedState4castIS1_EEPT_v
Unexecuted instantiation: _ZN5doris8pipeline16BasicSharedState4castINS0_16CacheSharedStateEEEPT_v
69
    template <class TARGET>
70
    const TARGET* cast() const {
71
        DCHECK(dynamic_cast<const TARGET*>(this))
72
                << " Mismatch type! Current type is " << typeid(*this).name()
73
                << " and expect type is" << typeid(TARGET).name();
74
        return reinterpret_cast<const TARGET*>(this);
75
    }
76
    std::vector<DependencySPtr> source_deps;
77
    std::vector<DependencySPtr> sink_deps;
78
    int id = 0;
79
    std::set<int> related_op_ids;
80
81
1
    virtual ~BasicSharedState() = default;
82
83
    Dependency* create_source_dependency(int operator_id, int node_id, std::string name);
84
85
    Dependency* create_sink_dependency(int dest_id, int node_id, std::string name);
86
};
87
88
class Dependency : public std::enable_shared_from_this<Dependency> {
89
public:
90
    ENABLE_FACTORY_CREATOR(Dependency);
91
    Dependency(int id, int node_id, std::string name)
92
4
            : _id(id), _node_id(node_id), _name(std::move(name)), _ready(false) {}
93
    Dependency(int id, int node_id, std::string name, bool ready)
94
0
            : _id(id), _node_id(node_id), _name(std::move(name)), _ready(ready) {}
95
4
    virtual ~Dependency() = default;
96
97
0
    [[nodiscard]] int id() const { return _id; }
98
4
    [[nodiscard]] virtual std::string name() const { return _name; }
99
0
    BasicSharedState* shared_state() { return _shared_state; }
100
0
    void set_shared_state(BasicSharedState* shared_state) { _shared_state = shared_state; }
101
    virtual std::string debug_string(int indentation_level = 0);
102
0
    bool ready() const { return _ready; }
103
104
    // Start the watcher. We use it to count how long this dependency block the current pipeline task.
105
0
    void start_watcher() { _watcher.start(); }
106
8
    [[nodiscard]] int64_t watcher_elapse_time() { return _watcher.elapsed_time(); }
107
108
    // Which dependency current pipeline task is blocked by. `nullptr` if this dependency is ready.
109
    [[nodiscard]] virtual Dependency* is_blocked_by(PipelineTask* task = nullptr);
110
    // Notify downstream pipeline tasks this dependency is ready.
111
    void set_ready();
112
0
    void set_ready_to_read() {
113
0
        DCHECK_EQ(_shared_state->source_deps.size(), 1) << debug_string();
114
0
        _shared_state->source_deps.front()->set_ready();
115
0
    }
116
0
    void set_block_to_read() {
117
0
        DCHECK_EQ(_shared_state->source_deps.size(), 1) << debug_string();
118
0
        _shared_state->source_deps.front()->block();
119
0
    }
120
0
    void set_ready_to_write() {
121
0
        DCHECK_EQ(_shared_state->sink_deps.size(), 1) << debug_string();
122
0
        _shared_state->sink_deps.front()->set_ready();
123
0
    }
124
0
    void set_block_to_write() {
125
0
        DCHECK_EQ(_shared_state->sink_deps.size(), 1) << debug_string();
126
0
        _shared_state->sink_deps.front()->block();
127
0
    }
128
129
    // Notify downstream pipeline tasks this dependency is blocked.
130
0
    void block() {
131
0
        if (_always_ready) {
132
0
            return;
133
0
        }
134
0
        std::unique_lock<std::mutex> lc(_always_ready_lock);
135
0
        if (_always_ready) {
136
0
            return;
137
0
        }
138
0
        _ready = false;
139
0
    }
140
141
0
    void set_always_ready() {
142
0
        if (_always_ready) {
143
0
            return;
144
0
        }
145
0
        std::unique_lock<std::mutex> lc(_always_ready_lock);
146
0
        if (_always_ready) {
147
0
            return;
148
0
        }
149
0
        _always_ready = true;
150
0
        set_ready();
151
0
    }
152
153
protected:
154
    void _add_block_task(PipelineTask* task);
155
156
    const int _id;
157
    const int _node_id;
158
    const std::string _name;
159
    std::atomic<bool> _ready;
160
161
    BasicSharedState* _shared_state = nullptr;
162
    MonotonicStopWatch _watcher;
163
164
    std::mutex _task_lock;
165
    std::vector<PipelineTask*> _blocked_task;
166
167
    // If `_always_ready` is true, `block()` will never block tasks.
168
    std::atomic<bool> _always_ready = false;
169
    std::mutex _always_ready_lock;
170
};
171
172
struct FakeSharedState final : public BasicSharedState {
173
    ENABLE_FACTORY_CREATOR(FakeSharedState)
174
};
175
176
struct CountedFinishDependency final : public Dependency {
177
public:
178
    using SharedState = FakeSharedState;
179
    CountedFinishDependency(int id, int node_id, std::string name)
180
0
            : Dependency(id, node_id, name, true) {}
181
182
0
    void add() {
183
0
        std::unique_lock<std::mutex> l(_mtx);
184
0
        if (!_counter) {
185
0
            block();
186
0
        }
187
0
        _counter++;
188
0
    }
189
190
0
    void sub() {
191
0
        std::unique_lock<std::mutex> l(_mtx);
192
0
        _counter--;
193
0
        if (!_counter) {
194
0
            set_ready();
195
0
        }
196
0
    }
197
198
    std::string debug_string(int indentation_level = 0) override;
199
200
private:
201
    std::mutex _mtx;
202
    uint32_t _counter = 0;
203
};
204
205
class RuntimeFilterDependency;
206
struct RuntimeFilterTimerQueue;
207
class RuntimeFilterTimer {
208
public:
209
    RuntimeFilterTimer(int64_t registration_time, int32_t wait_time_ms,
210
                       std::shared_ptr<Dependency> parent, bool force_wait_timeout = false)
211
            : _parent(std::move(parent)),
212
              _registration_time(registration_time),
213
              _wait_time_ms(wait_time_ms),
214
0
              _force_wait_timeout(force_wait_timeout) {}
215
216
    // Called by runtime filter producer.
217
    void call_ready();
218
219
    // Called by RuntimeFilterTimerQueue which is responsible for checking if this rf is timeout.
220
    void call_timeout();
221
222
0
    int64_t registration_time() const { return _registration_time; }
223
0
    int32_t wait_time_ms() const { return _wait_time_ms; }
224
225
    void set_local_runtime_filter_dependencies(
226
0
            const std::vector<std::shared_ptr<RuntimeFilterDependency>>& deps) {
227
0
        _local_runtime_filter_dependencies = deps;
228
0
    }
229
230
    bool should_be_check_timeout();
231
232
0
    bool force_wait_timeout() { return _force_wait_timeout; }
233
234
private:
235
    friend struct RuntimeFilterTimerQueue;
236
    std::shared_ptr<Dependency> _parent = nullptr;
237
    std::vector<std::shared_ptr<RuntimeFilterDependency>> _local_runtime_filter_dependencies;
238
    std::mutex _lock;
239
    int64_t _registration_time;
240
    const int32_t _wait_time_ms;
241
    // true only for group_commit_scan_operator
242
    bool _force_wait_timeout;
243
};
244
245
struct RuntimeFilterTimerQueue {
246
    constexpr static int64_t interval = 10;
247
0
    void run() { _thread.detach(); }
248
    void start();
249
250
0
    void stop() {
251
0
        _stop = true;
252
0
        cv.notify_all();
253
0
        wait_for_shutdown();
254
0
    }
255
256
0
    void wait_for_shutdown() const {
257
0
        while (!_shutdown) {
258
0
            std::this_thread::sleep_for(std::chrono::milliseconds(interval));
259
0
        }
260
0
    }
261
262
0
    ~RuntimeFilterTimerQueue() = default;
263
0
    RuntimeFilterTimerQueue() { _thread = std::thread(&RuntimeFilterTimerQueue::start, this); }
264
0
    void push_filter_timer(std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>>&& filter) {
265
0
        std::unique_lock<std::mutex> lc(_que_lock);
266
0
        _que.insert(_que.end(), filter.begin(), filter.end());
267
0
        cv.notify_all();
268
0
    }
269
270
    std::thread _thread;
271
    std::condition_variable cv;
272
    std::mutex cv_m;
273
    std::mutex _que_lock;
274
    std::atomic_bool _stop = false;
275
    std::atomic_bool _shutdown = false;
276
    std::list<std::shared_ptr<pipeline::RuntimeFilterTimer>> _que;
277
};
278
279
class RuntimeFilterDependency final : public Dependency {
280
public:
281
    RuntimeFilterDependency(int id, int node_id, std::string name, IRuntimeFilter* runtime_filter)
282
0
            : Dependency(id, node_id, name), _runtime_filter(runtime_filter) {}
283
    std::string debug_string(int indentation_level = 0) override;
284
285
    Dependency* is_blocked_by(PipelineTask* task) override;
286
287
private:
288
    const IRuntimeFilter* _runtime_filter = nullptr;
289
};
290
291
struct AggSharedState : public BasicSharedState {
292
    ENABLE_FACTORY_CREATOR(AggSharedState)
293
public:
294
1
    AggSharedState() {
295
1
        agg_data = std::make_unique<AggregatedDataVariants>();
296
1
        agg_arena_pool = std::make_unique<vectorized::Arena>();
297
1
    }
298
1
    ~AggSharedState() override {
299
1
        if (!probe_expr_ctxs.empty()) {
300
0
            _close_with_serialized_key();
301
1
        } else {
302
1
            _close_without_key();
303
1
        }
304
1
    }
305
306
    Status reset_hash_table();
307
308
    bool do_limit_filter(vectorized::Block* block, size_t num_rows,
309
                         const std::vector<int>* key_locs = nullptr);
310
    void build_limit_heap(size_t hash_table_size);
311
312
    // We should call this function only at 1st phase.
313
    // 1st phase: is_merge=true, only have one SlotRef.
314
    // 2nd phase: is_merge=false, maybe have multiple exprs.
315
    static int get_slot_column_id(const vectorized::AggFnEvaluator* evaluator);
316
317
    AggregatedDataVariantsUPtr agg_data = nullptr;
318
    std::unique_ptr<AggregateDataContainer> aggregate_data_container;
319
    ArenaUPtr agg_arena_pool;
320
    std::vector<vectorized::AggFnEvaluator*> aggregate_evaluators;
321
    // group by k1,k2
322
    vectorized::VExprContextSPtrs probe_expr_ctxs;
323
    size_t input_num_rows = 0;
324
    std::vector<vectorized::AggregateDataPtr> values;
325
    /// The total size of the row from the aggregate functions.
326
    size_t total_size_of_aggregate_states = 0;
327
    size_t align_aggregate_states = 1;
328
    /// The offset to the n-th aggregate function in a row of aggregate functions.
329
    vectorized::Sizes offsets_of_aggregate_states;
330
    std::vector<size_t> make_nullable_keys;
331
332
    bool agg_data_created_without_key = false;
333
    bool enable_spill = false;
334
    bool reach_limit = false;
335
336
    int64_t limit = -1;
337
    bool do_sort_limit = false;
338
    vectorized::MutableColumns limit_columns;
339
    int limit_columns_min = -1;
340
    vectorized::PaddedPODArray<uint8_t> need_computes;
341
    std::vector<uint8_t> cmp_res;
342
    std::vector<int> order_directions;
343
    std::vector<int> null_directions;
344
345
    struct HeapLimitCursor {
346
        HeapLimitCursor(int row_id, vectorized::MutableColumns& limit_columns,
347
                        std::vector<int>& order_directions, std::vector<int>& null_directions)
348
                : _row_id(row_id),
349
                  _limit_columns(limit_columns),
350
                  _order_directions(order_directions),
351
6
                  _null_directions(null_directions) {}
352
353
        HeapLimitCursor(const HeapLimitCursor& other) = default;
354
355
        HeapLimitCursor(HeapLimitCursor&& other) noexcept
356
                : _row_id(other._row_id),
357
                  _limit_columns(other._limit_columns),
358
                  _order_directions(other._order_directions),
359
30
                  _null_directions(other._null_directions) {}
360
361
0
        HeapLimitCursor& operator=(const HeapLimitCursor& other) noexcept {
362
0
            _row_id = other._row_id;
363
0
            return *this;
364
0
        }
365
366
23
        HeapLimitCursor& operator=(HeapLimitCursor&& other) noexcept {
367
23
            _row_id = other._row_id;
368
23
            return *this;
369
23
        }
370
371
14
        bool operator<(const HeapLimitCursor& rhs) const {
372
14
            for (int i = 0; i < _limit_columns.size(); ++i) {
373
14
                const auto& _limit_column = _limit_columns[i];
374
14
                auto res = _limit_column->compare_at(_row_id, rhs._row_id, *_limit_column,
375
14
                                                     _null_directions[i]) *
376
14
                           _order_directions[i];
377
14
                if (res < 0) {
378
5
                    return true;
379
9
                } else if (res > 0) {
380
9
                    return false;
381
9
                }
382
14
            }
383
0
            return false;
384
14
        }
385
386
        int _row_id;
387
        vectorized::MutableColumns& _limit_columns;
388
        std::vector<int>& _order_directions;
389
        std::vector<int>& _null_directions;
390
    };
391
392
    std::priority_queue<HeapLimitCursor> limit_heap;
393
394
    // Refresh the top limit heap with a new row
395
    void refresh_top_limit(size_t row_id, const vectorized::ColumnRawPtrs& key_columns);
396
397
private:
398
    vectorized::MutableColumns _get_keys_hash_table();
399
400
0
    void _close_with_serialized_key() {
401
0
        std::visit(vectorized::Overload {[&](std::monostate& arg) -> void {
402
                                             // Do nothing
403
0
                                         },
404
0
                                         [&](auto& agg_method) -> void {
405
0
                                             auto& data = *agg_method.hash_table;
406
0
                                             data.for_each_mapped([&](auto& mapped) {
407
0
                                                 if (mapped) {
408
0
                                                     static_cast<void>(_destroy_agg_status(mapped));
409
0
                                                     mapped = nullptr;
410
0
                                                 }
411
0
                                             });
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized16MethodSerializedI9PHHashMapINS_9StringRefEPc11DefaultHashIS9_vELb0EEEEEEvS3_ENKUlS3_E_clISA_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIh9PHHashMapIhPc11DefaultHashIhvELb0EEEEEEvS3_ENKUlS3_E_clIS9_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIt9PHHashMapItPc11DefaultHashItvELb0EEEEEEvS3_ENKUlS3_E_clIS9_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIj9PHHashMapIjPc9HashCRC32IjELb0EEEEEEvS3_ENKUlS3_E_clIS9_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIm9PHHashMapImPc9HashCRC32ImELb0EEEEEEvS3_ENKUlS3_E_clIS9_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized19MethodStringNoCacheINS_13StringHashMapIPc9AllocatorILb1ELb1ELb0E22DefaultMemoryAllocatorEEEEEEEvS3_ENKUlS3_E_clIS9_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIN4wide7integerILm128EjEE9PHHashMapISA_Pc9HashCRC32ISA_ELb0EEEEEEvS3_ENKUlS3_E_clISC_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIj9PHHashMapIjPc14HashMixWrapperIj9HashCRC32IjEELb0EEEEEEvS3_ENKUlS3_E_clIS9_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIm9PHHashMapImPc14HashMixWrapperIm9HashCRC32ImEELb0EEEEEEvS3_ENKUlS3_E_clIS9_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIN4wide7integerILm128EjEE9PHHashMapISA_Pc14HashMixWrapperISA_9HashCRC32ISA_EELb0EEEEEEvS3_ENKUlS3_E_clISC_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIhNS6_15DataWithNullKeyI9PHHashMapIhPc11DefaultHashIhvELb0EEEEEEEEEEvS3_ENKUlS3_E_clISB_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberItNS6_15DataWithNullKeyI9PHHashMapItPc11DefaultHashItvELb0EEEEEEEEEEvS3_ENKUlS3_E_clISB_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIjNS6_15DataWithNullKeyI9PHHashMapIjPc9HashCRC32IjELb0EEEEEEEEEEvS3_ENKUlS3_E_clISB_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberImNS6_15DataWithNullKeyI9PHHashMapImPc9HashCRC32ImELb0EEEEEEEEEEvS3_ENKUlS3_E_clISB_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIjNS6_15DataWithNullKeyI9PHHashMapIjPc14HashMixWrapperIj9HashCRC32IjEELb0EEEEEEEEEEvS3_ENKUlS3_E_clISB_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberImNS6_15DataWithNullKeyI9PHHashMapImPc14HashMixWrapperIm9HashCRC32ImEELb0EEEEEEEEEEvS3_ENKUlS3_E_clISB_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIN4wide7integerILm128EjEENS6_15DataWithNullKeyI9PHHashMapISB_Pc9HashCRC32ISB_ELb0EEEEEEEEEEvS3_ENKUlS3_E_clISE_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIN4wide7integerILm128EjEENS6_15DataWithNullKeyI9PHHashMapISB_Pc14HashMixWrapperISB_9HashCRC32ISB_EELb0EEEEEEEEEEvS3_ENKUlS3_E_clISE_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_19MethodStringNoCacheINS6_15DataWithNullKeyINS_13StringHashMapIPc9AllocatorILb1ELb1ELb0E22DefaultMemoryAllocatorEEEEEEEEEEEvS3_ENKUlS3_E_clISB_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapImPc9HashCRC32ImELb0EELb0EEEEEvS3_ENKUlS3_E_clIS9_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapImPc9HashCRC32ImELb0EELb1EEEEEvS3_ENKUlS3_E_clIS9_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapIN4wide7integerILm128EjEEPc9HashCRC32ISB_ELb0EELb0EEEEEvS3_ENKUlS3_E_clISC_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapIN4wide7integerILm128EjEEPc9HashCRC32ISB_ELb0EELb1EEEEEvS3_ENKUlS3_E_clISC_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapIN4wide7integerILm256EjEEPc9HashCRC32ISB_ELb0EELb0EEEEEvS3_ENKUlS3_E_clISC_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapIN4wide7integerILm256EjEEPc9HashCRC32ISB_ELb0EELb1EEEEEvS3_ENKUlS3_E_clISC_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapINS6_7UInt136EPc9HashCRC32IS9_ELb0EELb0EEEEEvS3_ENKUlS3_E_clISA_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapINS6_7UInt136EPc9HashCRC32IS9_ELb0EELb1EEEEEvS3_ENKUlS3_E_clISA_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapImPc14HashMixWrapperIm9HashCRC32ImEELb0EELb0EEEEEvS3_ENKUlS3_E_clIS9_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapImPc14HashMixWrapperIm9HashCRC32ImEELb0EELb1EEEEEvS3_ENKUlS3_E_clIS9_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapIN4wide7integerILm128EjEEPc14HashMixWrapperISB_9HashCRC32ISB_EELb0EELb0EEEEEvS3_ENKUlS3_E_clISC_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapIN4wide7integerILm128EjEEPc14HashMixWrapperISB_9HashCRC32ISB_EELb0EELb1EEEEEvS3_ENKUlS3_E_clISC_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapIN4wide7integerILm256EjEEPc14HashMixWrapperISB_9HashCRC32ISB_EELb0EELb0EEEEEvS3_ENKUlS3_E_clISC_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapIN4wide7integerILm256EjEEPc14HashMixWrapperISB_9HashCRC32ISB_EELb0EELb1EEEEEvS3_ENKUlS3_E_clISC_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapINS6_7UInt136EPc14HashMixWrapperIS9_9HashCRC32IS9_EELb0EELb0EEEEEvS3_ENKUlS3_E_clISA_EEDaS3_
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapINS6_7UInt136EPc14HashMixWrapperIS9_9HashCRC32IS9_EELb0EELb1EEEEEvS3_ENKUlS3_E_clISA_EEDaS3_
412
0
                                             if (data.has_null_key_data()) {
413
0
                                                 auto st = _destroy_agg_status(
414
0
                                                         data.template get_null_key_data<
415
0
                                                                 vectorized::AggregateDataPtr>());
416
0
                                                 if (!st) {
417
0
                                                     throw Exception(st.code(), st.to_string());
418
0
                                                 }
419
0
                                             }
420
0
                                         }},
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized16MethodSerializedI9PHHashMapINS_9StringRefEPc11DefaultHashIS9_vELb0EEEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIh9PHHashMapIhPc11DefaultHashIhvELb0EEEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIt9PHHashMapItPc11DefaultHashItvELb0EEEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIj9PHHashMapIjPc9HashCRC32IjELb0EEEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIm9PHHashMapImPc9HashCRC32ImELb0EEEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized19MethodStringNoCacheINS_13StringHashMapIPc9AllocatorILb1ELb1ELb0E22DefaultMemoryAllocatorEEEEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIN4wide7integerILm128EjEE9PHHashMapISA_Pc9HashCRC32ISA_ELb0EEEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIj9PHHashMapIjPc14HashMixWrapperIj9HashCRC32IjEELb0EEEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIm9PHHashMapImPc14HashMixWrapperIm9HashCRC32ImEELb0EEEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIN4wide7integerILm128EjEE9PHHashMapISA_Pc14HashMixWrapperISA_9HashCRC32ISA_EELb0EEEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIhNS6_15DataWithNullKeyI9PHHashMapIhPc11DefaultHashIhvELb0EEEEEEEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberItNS6_15DataWithNullKeyI9PHHashMapItPc11DefaultHashItvELb0EEEEEEEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIjNS6_15DataWithNullKeyI9PHHashMapIjPc9HashCRC32IjELb0EEEEEEEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberImNS6_15DataWithNullKeyI9PHHashMapImPc9HashCRC32ImELb0EEEEEEEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIjNS6_15DataWithNullKeyI9PHHashMapIjPc14HashMixWrapperIj9HashCRC32IjEELb0EEEEEEEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberImNS6_15DataWithNullKeyI9PHHashMapImPc14HashMixWrapperIm9HashCRC32ImEELb0EEEEEEEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIN4wide7integerILm128EjEENS6_15DataWithNullKeyI9PHHashMapISB_Pc9HashCRC32ISB_ELb0EEEEEEEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIN4wide7integerILm128EjEENS6_15DataWithNullKeyI9PHHashMapISB_Pc14HashMixWrapperISB_9HashCRC32ISB_EELb0EEEEEEEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_19MethodStringNoCacheINS6_15DataWithNullKeyINS_13StringHashMapIPc9AllocatorILb1ELb1ELb0E22DefaultMemoryAllocatorEEEEEEEEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapImPc9HashCRC32ImELb0EELb0EEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapImPc9HashCRC32ImELb0EELb1EEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapIN4wide7integerILm128EjEEPc9HashCRC32ISB_ELb0EELb0EEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapIN4wide7integerILm128EjEEPc9HashCRC32ISB_ELb0EELb1EEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapIN4wide7integerILm256EjEEPc9HashCRC32ISB_ELb0EELb0EEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapIN4wide7integerILm256EjEEPc9HashCRC32ISB_ELb0EELb1EEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapINS6_7UInt136EPc9HashCRC32IS9_ELb0EELb0EEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapINS6_7UInt136EPc9HashCRC32IS9_ELb0EELb1EEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapImPc14HashMixWrapperIm9HashCRC32ImEELb0EELb0EEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapImPc14HashMixWrapperIm9HashCRC32ImEELb0EELb1EEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapIN4wide7integerILm128EjEEPc14HashMixWrapperISB_9HashCRC32ISB_EELb0EELb0EEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapIN4wide7integerILm128EjEEPc14HashMixWrapperISB_9HashCRC32ISB_EELb0EELb1EEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapIN4wide7integerILm256EjEEPc14HashMixWrapperISB_9HashCRC32ISB_EELb0EELb0EEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapIN4wide7integerILm256EjEEPc14HashMixWrapperISB_9HashCRC32ISB_EELb0EELb1EEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapINS6_7UInt136EPc14HashMixWrapperIS9_9HashCRC32IS9_EELb0EELb0EEEEEvS3_
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapINS6_7UInt136EPc14HashMixWrapperIS9_9HashCRC32IS9_EELb0EELb1EEEEEvS3_
421
0
                   agg_data->method_variant);
422
0
    }
423
424
1
    void _close_without_key() {
425
        //because prepare maybe failed, and couldn't create agg data.
426
        //but finally call close to destory agg data, if agg data has bitmapValue
427
        //will be core dump, it's not initialized
428
1
        if (agg_data_created_without_key) {
429
0
            static_cast<void>(_destroy_agg_status(agg_data->without_key));
430
0
            agg_data_created_without_key = false;
431
0
        }
432
1
    }
433
    Status _destroy_agg_status(vectorized::AggregateDataPtr data);
434
};
435
436
struct AggSpillPartition;
437
struct PartitionedAggSharedState : public BasicSharedState,
438
                                   public std::enable_shared_from_this<PartitionedAggSharedState> {
439
    ENABLE_FACTORY_CREATOR(PartitionedAggSharedState)
440
441
0
    PartitionedAggSharedState() = default;
442
0
    ~PartitionedAggSharedState() override = default;
443
444
    void init_spill_params(size_t spill_partition_count_bits);
445
446
    void close();
447
448
    AggSharedState* in_mem_shared_state = nullptr;
449
    std::shared_ptr<BasicSharedState> in_mem_shared_state_sptr;
450
451
    size_t partition_count_bits;
452
    size_t partition_count;
453
    size_t max_partition_index;
454
    Status sink_status;
455
    bool is_spilled = false;
456
    std::atomic_bool is_closed = false;
457
    std::deque<std::shared_ptr<AggSpillPartition>> spill_partitions;
458
459
0
    size_t get_partition_index(size_t hash_value) const {
460
0
        return (hash_value >> (32 - partition_count_bits)) & max_partition_index;
461
0
    }
462
};
463
464
struct AggSpillPartition {
465
    static constexpr int64_t AGG_SPILL_FILE_SIZE = 1024 * 1024 * 1024; // 1G
466
467
0
    AggSpillPartition() = default;
468
469
    void close();
470
471
    Status get_spill_stream(RuntimeState* state, int node_id, RuntimeProfile* profile,
472
                            vectorized::SpillStreamSPtr& spilling_stream);
473
474
0
    Status flush_if_full() {
475
0
        DCHECK(spilling_stream_);
476
0
        Status status;
477
        // avoid small spill files
478
0
        if (spilling_stream_->get_written_bytes() >= AGG_SPILL_FILE_SIZE) {
479
0
            status = spilling_stream_->spill_eof();
480
0
            spilling_stream_.reset();
481
0
        }
482
0
        return status;
483
0
    }
484
485
0
    Status finish_current_spilling(bool eos = false) {
486
0
        if (spilling_stream_) {
487
0
            if (eos || spilling_stream_->get_written_bytes() >= AGG_SPILL_FILE_SIZE) {
488
0
                auto status = spilling_stream_->spill_eof();
489
0
                spilling_stream_.reset();
490
0
                return status;
491
0
            }
492
0
        }
493
0
        return Status::OK();
494
0
    }
495
496
    std::deque<vectorized::SpillStreamSPtr> spill_streams_;
497
    vectorized::SpillStreamSPtr spilling_stream_;
498
};
499
using AggSpillPartitionSPtr = std::shared_ptr<AggSpillPartition>;
500
struct SortSharedState : public BasicSharedState {
501
    ENABLE_FACTORY_CREATOR(SortSharedState)
502
public:
503
    std::unique_ptr<vectorized::Sorter> sorter;
504
};
505
506
struct SpillSortSharedState : public BasicSharedState,
507
                              public std::enable_shared_from_this<SpillSortSharedState> {
508
    ENABLE_FACTORY_CREATOR(SpillSortSharedState)
509
510
0
    SpillSortSharedState() = default;
511
0
    ~SpillSortSharedState() override = default;
512
513
    // This number specifies the maximum size of sub blocks
514
    static constexpr int SORT_BLOCK_SPILL_BATCH_BYTES = 8 * 1024 * 1024;
515
0
    void update_spill_block_batch_row_count(const vectorized::Block* block) {
516
0
        auto rows = block->rows();
517
0
        if (rows > 0 && 0 == avg_row_bytes) {
518
0
            avg_row_bytes = std::max((std::size_t)1, block->bytes() / rows);
519
0
            spill_block_batch_row_count =
520
0
                    (SORT_BLOCK_SPILL_BATCH_BYTES + avg_row_bytes - 1) / avg_row_bytes;
521
0
            LOG(INFO) << "spill sort block batch row count: " << spill_block_batch_row_count;
522
0
        }
523
0
    }
524
    void close();
525
526
    SortSharedState* in_mem_shared_state = nullptr;
527
    bool enable_spill = false;
528
    bool is_spilled = false;
529
    std::atomic_bool is_closed = false;
530
    Status sink_status;
531
    std::shared_ptr<BasicSharedState> in_mem_shared_state_sptr;
532
533
    std::deque<vectorized::SpillStreamSPtr> sorted_streams;
534
    size_t avg_row_bytes = 0;
535
    int spill_block_batch_row_count;
536
};
537
538
struct UnionSharedState : public BasicSharedState {
539
    ENABLE_FACTORY_CREATOR(UnionSharedState)
540
541
public:
542
0
    UnionSharedState(int child_count = 1) : data_queue(child_count), _child_count(child_count) {};
543
0
    int child_count() const { return _child_count; }
544
    DataQueue data_queue;
545
    const int _child_count;
546
};
547
548
struct CacheSharedState : public BasicSharedState {
549
    ENABLE_FACTORY_CREATOR(CacheSharedState)
550
public:
551
    DataQueue data_queue;
552
};
553
554
class MultiCastDataStreamer;
555
556
struct MultiCastSharedState : public BasicSharedState {
557
public:
558
    MultiCastSharedState(const RowDescriptor& row_desc, ObjectPool* pool, int cast_sender_count);
559
    std::unique_ptr<pipeline::MultiCastDataStreamer> multi_cast_data_streamer;
560
};
561
562
struct BlockRowPos {
563
    int64_t block_num {}; //the pos at which block
564
    int64_t row_num {};   //the pos at which row
565
    int64_t pos {};       //pos = all blocks size + row_num
566
0
    std::string debug_string() const {
567
0
        std::string res = "\t block_num: ";
568
0
        res += std::to_string(block_num);
569
0
        res += "\t row_num: ";
570
0
        res += std::to_string(row_num);
571
0
        res += "\t pos: ";
572
0
        res += std::to_string(pos);
573
0
        return res;
574
0
    }
575
};
576
577
struct AnalyticSharedState : public BasicSharedState {
578
    ENABLE_FACTORY_CREATOR(AnalyticSharedState)
579
580
public:
581
0
    AnalyticSharedState() = default;
582
583
    int64_t current_row_position = 0;
584
    BlockRowPos partition_by_end;
585
    vectorized::VExprContextSPtrs partition_by_eq_expr_ctxs;
586
    int64_t input_total_rows = 0;
587
    BlockRowPos all_block_end;
588
    std::vector<vectorized::Block> input_blocks;
589
    bool input_eos = false;
590
    BlockRowPos found_partition_end;
591
    std::vector<int64_t> origin_cols;
592
    vectorized::VExprContextSPtrs order_by_eq_expr_ctxs;
593
    std::vector<int64_t> input_block_first_row_positions;
594
    std::vector<std::vector<vectorized::MutableColumnPtr>> agg_input_columns;
595
596
    // TODO: maybe global?
597
    std::vector<int64_t> partition_by_column_idxs;
598
    std::vector<int64_t> ordey_by_column_idxs;
599
};
600
601
struct JoinSharedState : public BasicSharedState {
602
    // For some join case, we can apply a short circuit strategy
603
    // 1. _has_null_in_build_side = true
604
    // 2. build side rows is empty, Join op is: inner join/right outer join/left semi/right semi/right anti
605
    bool _has_null_in_build_side = false;
606
    bool short_circuit_for_probe = false;
607
    // for some join, when build side rows is empty, we could return directly by add some additional null data in probe table.
608
    bool empty_right_table_need_probe_dispose = false;
609
    JoinOpVariants join_op_variants;
610
};
611
612
struct HashJoinSharedState : public JoinSharedState {
613
    ENABLE_FACTORY_CREATOR(HashJoinSharedState)
614
    // mark the join column whether support null eq
615
    std::vector<bool> is_null_safe_eq_join;
616
    // mark the build hash table whether it needs to store null value
617
    std::vector<bool> store_null_in_hash_table;
618
    std::shared_ptr<vectorized::Arena> arena = std::make_shared<vectorized::Arena>();
619
620
    // maybe share hash table with other fragment instances
621
    std::shared_ptr<HashTableVariants> hash_table_variants = std::make_shared<HashTableVariants>();
622
    const std::vector<TupleDescriptor*> build_side_child_desc;
623
    size_t build_exprs_size = 0;
624
    std::shared_ptr<vectorized::Block> build_block;
625
    std::shared_ptr<std::vector<uint32_t>> build_indexes_null;
626
    bool probe_ignore_null = false;
627
};
628
629
struct PartitionedHashJoinSharedState
630
        : public HashJoinSharedState,
631
          public std::enable_shared_from_this<PartitionedHashJoinSharedState> {
632
    ENABLE_FACTORY_CREATOR(PartitionedHashJoinSharedState)
633
634
    std::unique_ptr<RuntimeState> inner_runtime_state;
635
    std::shared_ptr<HashJoinSharedState> inner_shared_state;
636
    std::vector<std::unique_ptr<vectorized::MutableBlock>> partitioned_build_blocks;
637
    std::vector<vectorized::SpillStreamSPtr> spilled_streams;
638
    bool need_to_spill = false;
639
};
640
641
struct NestedLoopJoinSharedState : public JoinSharedState {
642
    ENABLE_FACTORY_CREATOR(NestedLoopJoinSharedState)
643
    // if true, left child has no more rows to process
644
    bool left_side_eos = false;
645
    // Visited flags for each row in build side.
646
    vectorized::MutableColumns build_side_visited_flags;
647
    // List of build blocks, constructed in prepare()
648
    vectorized::Blocks build_blocks;
649
};
650
651
struct PartitionSortNodeSharedState : public BasicSharedState {
652
    ENABLE_FACTORY_CREATOR(PartitionSortNodeSharedState)
653
public:
654
    std::queue<vectorized::Block> blocks_buffer;
655
    std::mutex buffer_mutex;
656
    std::vector<std::unique_ptr<vectorized::PartitionSorter>> partition_sorts;
657
    bool sink_eos = false;
658
    std::mutex sink_eos_lock;
659
};
660
661
using SetHashTableVariants =
662
        std::variant<std::monostate, vectorized::SetSerializedHashTableContext,
663
                     vectorized::SetMethodOneString,
664
                     vectorized::SetPrimaryTypeHashTableContext<vectorized::UInt8>,
665
                     vectorized::SetPrimaryTypeHashTableContext<vectorized::UInt16>,
666
                     vectorized::SetPrimaryTypeHashTableContext<vectorized::UInt32>,
667
                     vectorized::SetPrimaryTypeHashTableContext<UInt64>,
668
                     vectorized::SetPrimaryTypeHashTableContext<UInt128>,
669
                     vectorized::SetPrimaryTypeHashTableContext<UInt256>,
670
                     vectorized::SetFixedKeyHashTableContext<UInt64, true>,
671
                     vectorized::SetFixedKeyHashTableContext<UInt64, false>,
672
                     vectorized::SetFixedKeyHashTableContext<UInt128, true>,
673
                     vectorized::SetFixedKeyHashTableContext<UInt128, false>,
674
                     vectorized::SetFixedKeyHashTableContext<UInt256, true>,
675
                     vectorized::SetFixedKeyHashTableContext<UInt256, false>,
676
                     vectorized::SetFixedKeyHashTableContext<UInt136, true>,
677
                     vectorized::SetFixedKeyHashTableContext<UInt136, false>>;
678
679
struct SetSharedState : public BasicSharedState {
680
    ENABLE_FACTORY_CREATOR(SetSharedState)
681
public:
682
    /// default init
683
    vectorized::Block build_block; // build to source
684
    //record element size in hashtable
685
    int64_t valid_element_in_hash_tbl = 0;
686
    //first: idx mapped to column types
687
    //second: column_id, could point to origin column or cast column
688
    std::unordered_map<int, int> build_col_idx;
689
690
    //// shared static states (shared, decided in prepare/open...)
691
692
    /// init in setup_local_state
693
    std::unique_ptr<SetHashTableVariants> hash_table_variants =
694
            std::make_unique<SetHashTableVariants>(); // the real data HERE.
695
    std::vector<bool> build_not_ignore_null;
696
697
    /// init in both upstream side.
698
    //The i-th result expr list refers to the i-th child.
699
    std::vector<vectorized::VExprContextSPtrs> child_exprs_lists;
700
701
    /// init in build side
702
    int child_quantity;
703
    vectorized::VExprContextSPtrs build_child_exprs;
704
    std::vector<Dependency*> probe_finished_children_dependency;
705
706
    /// init in probe side
707
    std::vector<vectorized::VExprContextSPtrs> probe_child_exprs_lists;
708
709
    std::atomic<bool> ready_for_read = false;
710
711
    /// called in setup_local_state
712
0
    void hash_table_init() {
713
0
        using namespace vectorized;
714
0
        if (child_exprs_lists[0].size() == 1 && (!build_not_ignore_null[0])) {
715
            // Single column optimization
716
0
            switch (child_exprs_lists[0][0]->root()->result_type()) {
717
0
            case TYPE_BOOLEAN:
718
0
            case TYPE_TINYINT:
719
0
                hash_table_variants->emplace<vectorized::SetPrimaryTypeHashTableContext<UInt8>>();
720
0
                break;
721
0
            case TYPE_SMALLINT:
722
0
                hash_table_variants->emplace<vectorized::SetPrimaryTypeHashTableContext<UInt16>>();
723
0
                break;
724
0
            case TYPE_INT:
725
0
            case TYPE_FLOAT:
726
0
            case TYPE_DATEV2:
727
0
            case TYPE_DECIMAL32:
728
0
                hash_table_variants->emplace<vectorized::SetPrimaryTypeHashTableContext<UInt32>>();
729
0
                break;
730
0
            case TYPE_BIGINT:
731
0
            case TYPE_DOUBLE:
732
0
            case TYPE_DATETIME:
733
0
            case TYPE_DATE:
734
0
            case TYPE_DECIMAL64:
735
0
            case TYPE_DATETIMEV2:
736
0
                hash_table_variants->emplace<vectorized::SetPrimaryTypeHashTableContext<UInt64>>();
737
0
                break;
738
0
            case TYPE_CHAR:
739
0
            case TYPE_VARCHAR:
740
0
            case TYPE_STRING: {
741
0
                hash_table_variants->emplace<vectorized::SetMethodOneString>();
742
0
                break;
743
0
            }
744
0
            case TYPE_LARGEINT:
745
0
            case TYPE_DECIMALV2:
746
0
            case TYPE_DECIMAL128I:
747
0
                hash_table_variants->emplace<vectorized::SetPrimaryTypeHashTableContext<UInt128>>();
748
0
                break;
749
0
            default:
750
0
                hash_table_variants->emplace<SetSerializedHashTableContext>();
751
0
            }
752
0
            return;
753
0
        }
754
755
        // here need to change type to nullable, because some case eg:
756
        // (select 0) intersect (select null) the build side hash table should not
757
        // ignore null value.
758
0
        std::vector<DataTypePtr> data_types;
759
0
        for (int i = 0; i < child_exprs_lists[0].size(); i++) {
760
0
            const auto& ctx = child_exprs_lists[0][i];
761
0
            data_types.emplace_back(build_not_ignore_null[i]
762
0
                                            ? make_nullable(ctx->root()->data_type())
763
0
                                            : ctx->root()->data_type());
764
0
        }
765
0
        if (!try_get_hash_map_context_fixed<NormalHashMap, HashCRC32, RowRefListWithFlags>(
766
0
                    *hash_table_variants, data_types)) {
767
0
            hash_table_variants->emplace<SetSerializedHashTableContext>();
768
0
        }
769
0
    }
770
};
771
772
enum class ExchangeType : uint8_t {
773
    NOOP = 0,
774
    // Shuffle data by Crc32HashPartitioner<LocalExchangeChannelIds>.
775
    HASH_SHUFFLE = 1,
776
    // Round-robin passthrough data blocks.
777
    PASSTHROUGH = 2,
778
    // Shuffle data by Crc32HashPartitioner<ShuffleChannelIds> (e.g. same as storage engine).
779
    BUCKET_HASH_SHUFFLE = 3,
780
    // Passthrough data blocks to all channels.
781
    BROADCAST = 4,
782
    // Passthrough data to channels evenly in an adaptive way.
783
    ADAPTIVE_PASSTHROUGH = 5,
784
    // Send all data to the first channel.
785
    PASS_TO_ONE = 6,
786
    // merge all data to one channel.
787
    LOCAL_MERGE_SORT = 7,
788
};
789
790
0
inline std::string get_exchange_type_name(ExchangeType idx) {
791
0
    switch (idx) {
792
0
    case ExchangeType::NOOP:
793
0
        return "NOOP";
794
0
    case ExchangeType::HASH_SHUFFLE:
795
0
        return "HASH_SHUFFLE";
796
0
    case ExchangeType::PASSTHROUGH:
797
0
        return "PASSTHROUGH";
798
0
    case ExchangeType::BUCKET_HASH_SHUFFLE:
799
0
        return "BUCKET_HASH_SHUFFLE";
800
0
    case ExchangeType::BROADCAST:
801
0
        return "BROADCAST";
802
0
    case ExchangeType::ADAPTIVE_PASSTHROUGH:
803
0
        return "ADAPTIVE_PASSTHROUGH";
804
0
    case ExchangeType::PASS_TO_ONE:
805
0
        return "PASS_TO_ONE";
806
0
    case ExchangeType::LOCAL_MERGE_SORT:
807
0
        return "LOCAL_MERGE_SORT";
808
0
    }
809
0
    LOG(FATAL) << "__builtin_unreachable";
810
0
    __builtin_unreachable();
811
0
}
812
813
struct DataDistribution {
814
0
    DataDistribution(ExchangeType type) : distribution_type(type) {}
815
    DataDistribution(ExchangeType type, const std::vector<TExpr>& partition_exprs_)
816
0
            : distribution_type(type), partition_exprs(partition_exprs_) {}
817
0
    DataDistribution(const DataDistribution& other) = default;
818
0
    bool need_local_exchange() const { return distribution_type != ExchangeType::NOOP; }
819
0
    DataDistribution& operator=(const DataDistribution& other) = default;
820
    ExchangeType distribution_type;
821
    std::vector<TExpr> partition_exprs;
822
};
823
824
class ExchangerBase;
825
826
struct LocalExchangeSharedState : public BasicSharedState {
827
public:
828
    ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
829
    LocalExchangeSharedState(int num_instances);
830
    ~LocalExchangeSharedState() override;
831
    std::unique_ptr<ExchangerBase> exchanger {};
832
    std::vector<RuntimeProfile::Counter*> mem_counters;
833
    std::atomic<int64_t> mem_usage = 0;
834
    // We need to make sure to add mem_usage first and then enqueue, otherwise sub mem_usage may cause negative mem_usage during concurrent dequeue.
835
    std::mutex le_lock;
836
0
    virtual void create_dependencies(int local_exchange_id) {
837
0
        for (auto& source_dep : source_deps) {
838
0
            source_dep = std::make_shared<Dependency>(local_exchange_id, local_exchange_id,
839
0
                                                      "LOCAL_EXCHANGE_OPERATOR_DEPENDENCY");
840
0
            source_dep->set_shared_state(this);
841
0
        }
842
0
    }
843
    void sub_running_sink_operators();
844
    void sub_running_source_operators(LocalExchangeSourceLocalState& local_state);
845
0
    void _set_always_ready() {
846
0
        for (auto& dep : source_deps) {
847
0
            DCHECK(dep);
848
0
            dep->set_always_ready();
849
0
        }
850
0
        for (auto& dep : sink_deps) {
851
0
            DCHECK(dep);
852
0
            dep->set_always_ready();
853
0
        }
854
0
    }
855
856
0
    virtual std::vector<DependencySPtr> get_dep_by_channel_id(int channel_id) {
857
0
        return {source_deps[channel_id]};
858
0
    }
859
0
    virtual Dependency* get_sink_dep_by_channel_id(int channel_id) { return nullptr; }
860
861
0
    void set_ready_to_read(int channel_id) {
862
0
        auto& dep = source_deps[channel_id];
863
0
        DCHECK(dep) << channel_id;
864
0
        dep->set_ready();
865
0
    }
866
867
0
    void add_mem_usage(int channel_id, size_t delta, bool update_total_mem_usage = true) {
868
0
        mem_counters[channel_id]->update(delta);
869
0
        if (update_total_mem_usage) {
870
0
            add_total_mem_usage(delta, channel_id);
871
0
        }
872
0
    }
873
874
0
    void sub_mem_usage(int channel_id, size_t delta) {
875
0
        mem_counters[channel_id]->update(-(int64_t)delta);
876
0
    }
877
878
0
    virtual void add_total_mem_usage(size_t delta, int channel_id) {
879
0
        if (mem_usage.fetch_add(delta) + delta > config::local_exchange_buffer_mem_limit) {
880
0
            sink_deps.front()->block();
881
0
        }
882
0
    }
883
884
0
    virtual void sub_total_mem_usage(size_t delta, int channel_id) {
885
0
        auto prev_usage = mem_usage.fetch_sub(delta);
886
0
        DCHECK_GE(prev_usage - delta, 0) << "prev_usage: " << prev_usage << " delta: " << delta
887
0
                                         << " channel_id: " << channel_id;
888
0
        if (prev_usage - delta <= config::local_exchange_buffer_mem_limit) {
889
0
            sink_deps.front()->set_ready();
890
0
        }
891
0
    }
892
};
893
894
struct LocalMergeExchangeSharedState : public LocalExchangeSharedState {
895
    ENABLE_FACTORY_CREATOR(LocalMergeExchangeSharedState);
896
    LocalMergeExchangeSharedState(int num_instances)
897
            : LocalExchangeSharedState(num_instances),
898
              _queues_mem_usage(num_instances),
899
0
              _each_queue_limit(config::local_exchange_buffer_mem_limit / num_instances) {
900
0
        for (size_t i = 0; i < num_instances; i++) {
901
0
            _queues_mem_usage[i] = 0;
902
0
        }
903
0
    }
904
905
0
    void create_dependencies(int local_exchange_id) override {
906
0
        sink_deps.resize(source_deps.size());
907
0
        for (size_t i = 0; i < source_deps.size(); i++) {
908
0
            source_deps[i] =
909
0
                    std::make_shared<Dependency>(local_exchange_id, local_exchange_id,
910
0
                                                 "LOCAL_MERGE_EXCHANGE_OPERATOR_DEPENDENCY");
911
0
            source_deps[i]->set_shared_state(this);
912
0
            sink_deps[i] = std::make_shared<Dependency>(
913
0
                    local_exchange_id, local_exchange_id,
914
0
                    "LOCAL_MERGE_EXCHANGE_OPERATOR_SINK_DEPENDENCY", true);
915
0
            sink_deps[i]->set_shared_state(this);
916
0
        }
917
0
    }
918
919
0
    void sub_total_mem_usage(size_t delta, int channel_id) override {
920
0
        auto prev_usage = _queues_mem_usage[channel_id].fetch_sub(delta);
921
0
        DCHECK_GE(prev_usage - delta, 0) << "prev_usage: " << prev_usage << " delta: " << delta
922
0
                                         << " channel_id: " << channel_id;
923
0
        if (prev_usage - delta <= _each_queue_limit) {
924
0
            sink_deps[channel_id]->set_ready();
925
0
        }
926
0
        if (_queues_mem_usage[channel_id] == 0) {
927
0
            source_deps[channel_id]->block();
928
0
        }
929
0
    }
930
0
    void add_total_mem_usage(size_t delta, int channel_id) override {
931
0
        if (_queues_mem_usage[channel_id].fetch_add(delta) + delta > _each_queue_limit) {
932
0
            sink_deps[channel_id]->block();
933
0
        }
934
0
        source_deps[channel_id]->set_ready();
935
0
    }
936
937
0
    Dependency* get_sink_dep_by_channel_id(int channel_id) override {
938
0
        return sink_deps[channel_id].get();
939
0
    }
940
941
0
    std::vector<DependencySPtr> get_dep_by_channel_id(int channel_id) override {
942
0
        return source_deps;
943
0
    }
944
945
private:
946
    std::vector<std::atomic_int64_t> _queues_mem_usage;
947
    const int64_t _each_queue_limit;
948
};
949
950
} // namespace doris::pipeline