Coverage Report

Created: 2025-05-29 15:46

/root/doris/be/src/pipeline/dependency.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <concurrentqueue.h>
21
#include <sqltypes.h>
22
23
#include <atomic>
24
#include <functional>
25
#include <memory>
26
#include <mutex>
27
#include <thread>
28
#include <utility>
29
30
#include "common/config.h"
31
#include "common/logging.h"
32
#include "gen_cpp/internal_service.pb.h"
33
#include "gutil/integral_types.h"
34
#include "pipeline/common/agg_utils.h"
35
#include "pipeline/common/join_utils.h"
36
#include "pipeline/common/set_utils.h"
37
#include "pipeline/exec/data_queue.h"
38
#include "pipeline/exec/join/process_hash_table_probe.h"
39
#include "util/brpc_closure.h"
40
#include "util/stack_util.h"
41
#include "vec/common/sort/partition_sorter.h"
42
#include "vec/common/sort/sorter.h"
43
#include "vec/core/block.h"
44
#include "vec/core/types.h"
45
#include "vec/spill/spill_stream.h"
46
47
namespace doris::vectorized {
48
class AggFnEvaluator;
49
class VSlotRef;
50
} // namespace doris::vectorized
51
52
namespace doris::pipeline {
53
#include "common/compile_check_begin.h"
54
class Dependency;
55
class PipelineTask;
56
struct BasicSharedState;
57
using DependencySPtr = std::shared_ptr<Dependency>;
58
class LocalExchangeSourceLocalState;
59
60
static constexpr auto SLOW_DEPENDENCY_THRESHOLD = 60 * 1000L * 1000L * 1000L;
61
static constexpr auto TIME_UNIT_DEPENDENCY_LOG = 30 * 1000L * 1000L * 1000L;
62
static_assert(TIME_UNIT_DEPENDENCY_LOG < SLOW_DEPENDENCY_THRESHOLD);
63
64
struct BasicSharedState {
65
    ENABLE_FACTORY_CREATOR(BasicSharedState)
66
67
    template <class TARGET>
68
3.47M
    TARGET* cast() {
69
18.4E
        DCHECK(dynamic_cast<TARGET*>(this))
70
18.4E
                << " Mismatch type! Current type is " << typeid(*this).name()
71
18.4E
                << " and expect type is" << typeid(TARGET).name();
72
3.47M
        return reinterpret_cast<TARGET*>(this);
73
3.47M
    }
_ZN5doris8pipeline16BasicSharedState4castINS0_26MaterializationSharedStateEEEPT_v
Line
Count
Source
68
2.66k
    TARGET* cast() {
69
2.66k
        DCHECK(dynamic_cast<TARGET*>(this))
70
0
                << " Mismatch type! Current type is " << typeid(*this).name()
71
0
                << " and expect type is" << typeid(TARGET).name();
72
2.66k
        return reinterpret_cast<TARGET*>(this);
73
2.66k
    }
_ZN5doris8pipeline16BasicSharedState4castINS0_19HashJoinSharedStateEEEPT_v
Line
Count
Source
68
307k
    TARGET* cast() {
69
18.4E
        DCHECK(dynamic_cast<TARGET*>(this))
70
18.4E
                << " Mismatch type! Current type is " << typeid(*this).name()
71
18.4E
                << " and expect type is" << typeid(TARGET).name();
72
307k
        return reinterpret_cast<TARGET*>(this);
73
307k
    }
_ZN5doris8pipeline16BasicSharedState4castINS0_30PartitionedHashJoinSharedStateEEEPT_v
Line
Count
Source
68
71.6k
    TARGET* cast() {
69
18.4E
        DCHECK(dynamic_cast<TARGET*>(this))
70
18.4E
                << " Mismatch type! Current type is " << typeid(*this).name()
71
18.4E
                << " and expect type is" << typeid(TARGET).name();
72
71.6k
        return reinterpret_cast<TARGET*>(this);
73
71.6k
    }
_ZN5doris8pipeline16BasicSharedState4castINS0_15SortSharedStateEEEPT_v
Line
Count
Source
68
632k
    TARGET* cast() {
69
18.4E
        DCHECK(dynamic_cast<TARGET*>(this))
70
18.4E
                << " Mismatch type! Current type is " << typeid(*this).name()
71
18.4E
                << " and expect type is" << typeid(TARGET).name();
72
632k
        return reinterpret_cast<TARGET*>(this);
73
632k
    }
_ZN5doris8pipeline16BasicSharedState4castINS0_20SpillSortSharedStateEEEPT_v
Line
Count
Source
68
473k
    TARGET* cast() {
69
18.4E
        DCHECK(dynamic_cast<TARGET*>(this))
70
18.4E
                << " Mismatch type! Current type is " << typeid(*this).name()
71
18.4E
                << " and expect type is" << typeid(TARGET).name();
72
473k
        return reinterpret_cast<TARGET*>(this);
73
473k
    }
_ZN5doris8pipeline16BasicSharedState4castINS0_25NestedLoopJoinSharedStateEEEPT_v
Line
Count
Source
68
27.5k
    TARGET* cast() {
69
18.4E
        DCHECK(dynamic_cast<TARGET*>(this))
70
18.4E
                << " Mismatch type! Current type is " << typeid(*this).name()
71
18.4E
                << " and expect type is" << typeid(TARGET).name();
72
27.5k
        return reinterpret_cast<TARGET*>(this);
73
27.5k
    }
_ZN5doris8pipeline16BasicSharedState4castINS0_19AnalyticSharedStateEEEPT_v
Line
Count
Source
68
43.5k
    TARGET* cast() {
69
18.4E
        DCHECK(dynamic_cast<TARGET*>(this))
70
18.4E
                << " Mismatch type! Current type is " << typeid(*this).name()
71
18.4E
                << " and expect type is" << typeid(TARGET).name();
72
43.5k
        return reinterpret_cast<TARGET*>(this);
73
43.5k
    }
_ZN5doris8pipeline16BasicSharedState4castINS0_14AggSharedStateEEEPT_v
Line
Count
Source
68
441k
    TARGET* cast() {
69
18.4E
        DCHECK(dynamic_cast<TARGET*>(this))
70
18.4E
                << " Mismatch type! Current type is " << typeid(*this).name()
71
18.4E
                << " and expect type is" << typeid(TARGET).name();
72
441k
        return reinterpret_cast<TARGET*>(this);
73
441k
    }
_ZN5doris8pipeline16BasicSharedState4castINS0_25PartitionedAggSharedStateEEEPT_v
Line
Count
Source
68
88.2k
    TARGET* cast() {
69
18.4E
        DCHECK(dynamic_cast<TARGET*>(this))
70
18.4E
                << " Mismatch type! Current type is " << typeid(*this).name()
71
18.4E
                << " and expect type is" << typeid(TARGET).name();
72
88.2k
        return reinterpret_cast<TARGET*>(this);
73
88.2k
    }
_ZN5doris8pipeline16BasicSharedState4castINS0_16UnionSharedStateEEEPT_v
Line
Count
Source
68
15.9k
    TARGET* cast() {
69
18.4E
        DCHECK(dynamic_cast<TARGET*>(this))
70
18.4E
                << " Mismatch type! Current type is " << typeid(*this).name()
71
18.4E
                << " and expect type is" << typeid(TARGET).name();
72
15.9k
        return reinterpret_cast<TARGET*>(this);
73
15.9k
    }
_ZN5doris8pipeline16BasicSharedState4castINS0_28PartitionSortNodeSharedStateEEEPT_v
Line
Count
Source
68
1.00k
    TARGET* cast() {
69
18.4E
        DCHECK(dynamic_cast<TARGET*>(this))
70
18.4E
                << " Mismatch type! Current type is " << typeid(*this).name()
71
18.4E
                << " and expect type is" << typeid(TARGET).name();
72
1.00k
        return reinterpret_cast<TARGET*>(this);
73
1.00k
    }
_ZN5doris8pipeline16BasicSharedState4castINS0_20MultiCastSharedStateEEEPT_v
Line
Count
Source
68
8.04k
    TARGET* cast() {
69
8.04k
        DCHECK(dynamic_cast<TARGET*>(this))
70
0
                << " Mismatch type! Current type is " << typeid(*this).name()
71
0
                << " and expect type is" << typeid(TARGET).name();
72
8.04k
        return reinterpret_cast<TARGET*>(this);
73
8.04k
    }
_ZN5doris8pipeline16BasicSharedState4castINS0_14SetSharedStateEEEPT_v
Line
Count
Source
68
4.55k
    TARGET* cast() {
69
18.4E
        DCHECK(dynamic_cast<TARGET*>(this))
70
18.4E
                << " Mismatch type! Current type is " << typeid(*this).name()
71
18.4E
                << " and expect type is" << typeid(TARGET).name();
72
4.55k
        return reinterpret_cast<TARGET*>(this);
73
4.55k
    }
_ZN5doris8pipeline16BasicSharedState4castINS0_24LocalExchangeSharedStateEEEPT_v
Line
Count
Source
68
962k
    TARGET* cast() {
69
18.4E
        DCHECK(dynamic_cast<TARGET*>(this))
70
18.4E
                << " Mismatch type! Current type is " << typeid(*this).name()
71
18.4E
                << " and expect type is" << typeid(TARGET).name();
72
962k
        return reinterpret_cast<TARGET*>(this);
73
962k
    }
_ZN5doris8pipeline16BasicSharedState4castIS1_EEPT_v
Line
Count
Source
68
390k
    TARGET* cast() {
69
18.4E
        DCHECK(dynamic_cast<TARGET*>(this))
70
18.4E
                << " Mismatch type! Current type is " << typeid(*this).name()
71
18.4E
                << " and expect type is" << typeid(TARGET).name();
72
390k
        return reinterpret_cast<TARGET*>(this);
73
390k
    }
_ZN5doris8pipeline16BasicSharedState4castINS0_20DataQueueSharedStateEEEPT_v
Line
Count
Source
68
358
    TARGET* cast() {
69
358
        DCHECK(dynamic_cast<TARGET*>(this))
70
0
                << " Mismatch type! Current type is " << typeid(*this).name()
71
0
                << " and expect type is" << typeid(TARGET).name();
72
358
        return reinterpret_cast<TARGET*>(this);
73
358
    }
74
    template <class TARGET>
75
    const TARGET* cast() const {
76
        DCHECK(dynamic_cast<const TARGET*>(this))
77
                << " Mismatch type! Current type is " << typeid(*this).name()
78
                << " and expect type is" << typeid(TARGET).name();
79
        return reinterpret_cast<const TARGET*>(this);
80
    }
81
    std::vector<DependencySPtr> source_deps;
82
    std::vector<DependencySPtr> sink_deps;
83
    int id = 0;
84
    std::set<int> related_op_ids;
85
86
2.30M
    virtual ~BasicSharedState() = default;
87
88
    void create_source_dependencies(int num_sources, int operator_id, int node_id,
89
                                    const std::string& name);
90
    virtual Dependency* create_source_dependency(int operator_id, int node_id,
91
                                                 const std::string& name);
92
93
    Dependency* create_sink_dependency(int dest_id, int node_id, const std::string& name);
94
699k
    std::vector<DependencySPtr> get_dep_by_channel_id(int channel_id) {
95
699k
        DCHECK_LT(channel_id, source_deps.size());
96
699k
        return {source_deps[channel_id]};
97
699k
    }
98
};
99
100
class Dependency : public std::enable_shared_from_this<Dependency> {
101
public:
102
    ENABLE_FACTORY_CREATOR(Dependency);
103
    Dependency(int id, int node_id, std::string name, bool ready = false)
104
8.02M
            : _id(id), _node_id(node_id), _name(std::move(name)), _ready(ready) {}
105
8.04M
    virtual ~Dependency() = default;
106
107
    [[nodiscard]] int id() const { return _id; }
108
12.0M
    [[nodiscard]] virtual std::string name() const { return _name; }
109
325k
    BasicSharedState* shared_state() { return _shared_state; }
110
3.33M
    void set_shared_state(BasicSharedState* shared_state) { _shared_state = shared_state; }
111
    virtual std::string debug_string(int indentation_level = 0);
112
657M
    bool ready() const { return _ready; }
113
114
    // Start the watcher. We use it to count how long this dependency block the current pipeline task.
115
7.97M
    void start_watcher() { _watcher.start(); }
116
7.65M
    [[nodiscard]] int64_t watcher_elapse_time() { return _watcher.elapsed_time(); }
117
118
    // Which dependency current pipeline task is blocked by. `nullptr` if this dependency is ready.
119
    [[nodiscard]] Dependency* is_blocked_by(std::shared_ptr<PipelineTask> task = nullptr);
120
    // Notify downstream pipeline tasks this dependency is ready.
121
    void set_ready();
122
770k
    void set_ready_to_read(int channel_id = 0) {
123
770k
        DCHECK_LT(channel_id, _shared_state->source_deps.size()) << debug_string();
124
770k
        _shared_state->source_deps[channel_id]->set_ready();
125
770k
    }
126
782
    void set_ready_to_write() {
127
782
        DCHECK_EQ(_shared_state->sink_deps.size(), 1) << debug_string();
128
782
        _shared_state->sink_deps.front()->set_ready();
129
782
    }
130
131
    // Notify downstream pipeline tasks this dependency is blocked.
132
4.30M
    void block() {
133
4.30M
        if (_always_ready) {
134
390k
            return;
135
390k
        }
136
3.91M
        std::unique_lock<std::mutex> lc(_always_ready_lock);
137
3.91M
        if (_always_ready) {
138
2
            return;
139
2
        }
140
3.91M
        _ready = false;
141
3.91M
    }
142
143
4.14M
    void set_always_ready() {
144
4.14M
        if (_always_ready) {
145
2.00M
            return;
146
2.00M
        }
147
2.13M
        std::unique_lock<std::mutex> lc(_always_ready_lock);
148
2.13M
        if (_always_ready) {
149
0
            return;
150
0
        }
151
2.13M
        _always_ready = true;
152
2.13M
        set_ready();
153
2.13M
    }
154
155
protected:
156
    void _add_block_task(std::shared_ptr<PipelineTask> task);
157
158
    const int _id;
159
    const int _node_id;
160
    const std::string _name;
161
    std::atomic<bool> _ready;
162
163
    BasicSharedState* _shared_state = nullptr;
164
    MonotonicStopWatch _watcher;
165
166
    std::mutex _task_lock;
167
    std::vector<std::weak_ptr<PipelineTask>> _blocked_task;
168
169
    // If `_always_ready` is true, `block()` will never block tasks.
170
    std::atomic<bool> _always_ready = false;
171
    std::mutex _always_ready_lock;
172
};
173
174
struct FakeSharedState final : public BasicSharedState {
175
    ENABLE_FACTORY_CREATOR(FakeSharedState)
176
};
177
178
class CountedFinishDependency final : public Dependency {
179
public:
180
    using SharedState = FakeSharedState;
181
    CountedFinishDependency(int id, int node_id, std::string name)
182
180k
            : Dependency(id, node_id, std::move(name), true) {}
183
184
9.44k
    void add(uint32_t count = 1) {
185
9.44k
        std::unique_lock<std::mutex> l(_mtx);
186
9.44k
        if (!_counter) {
187
9.44k
            block();
188
9.44k
        }
189
9.44k
        _counter += count;
190
9.44k
    }
191
192
8.11k
    void sub() {
193
8.11k
        std::unique_lock<std::mutex> l(_mtx);
194
8.11k
        _counter--;
195
8.11k
        if (!_counter) {
196
8.10k
            set_ready();
197
8.10k
        }
198
8.11k
    }
199
200
    std::string debug_string(int indentation_level = 0) override;
201
202
private:
203
    std::mutex _mtx;
204
    uint32_t _counter = 0;
205
};
206
207
struct RuntimeFilterTimerQueue;
208
class RuntimeFilterTimer {
209
public:
210
    RuntimeFilterTimer(int64_t registration_time, int32_t wait_time_ms,
211
                       std::shared_ptr<Dependency> parent)
212
            : _parent(std::move(parent)),
213
              _registration_time(registration_time),
214
49.6k
              _wait_time_ms(wait_time_ms) {}
215
216
    // Called by runtime filter producer.
217
    void call_ready();
218
219
    // Called by RuntimeFilterTimerQueue which is responsible for checking if this rf is timeout.
220
    void call_timeout();
221
222
2.18M
    int64_t registration_time() const { return _registration_time; }
223
2.18M
    int32_t wait_time_ms() const { return _wait_time_ms; }
224
225
    void set_local_runtime_filter_dependencies(
226
19.3k
            const std::vector<std::shared_ptr<Dependency>>& deps) {
227
19.3k
        _local_runtime_filter_dependencies = deps;
228
19.3k
    }
229
230
    bool should_be_check_timeout();
231
232
private:
233
    friend struct RuntimeFilterTimerQueue;
234
    std::shared_ptr<Dependency> _parent = nullptr;
235
    std::vector<std::shared_ptr<Dependency>> _local_runtime_filter_dependencies;
236
    std::mutex _lock;
237
    int64_t _registration_time;
238
    const int32_t _wait_time_ms;
239
};
240
241
struct RuntimeFilterTimerQueue {
242
    constexpr static int64_t interval = 10;
243
5
    void run() { _thread.detach(); }
244
    void start();
245
246
2
    void stop() {
247
2
        _stop = true;
248
2
        cv.notify_all();
249
2
        wait_for_shutdown();
250
2
    }
251
252
2
    void wait_for_shutdown() const {
253
4
        while (!_shutdown) {
254
2
            std::this_thread::sleep_for(std::chrono::milliseconds(interval));
255
2
        }
256
2
    }
257
258
2
    ~RuntimeFilterTimerQueue() = default;
259
5
    RuntimeFilterTimerQueue() { _thread = std::thread(&RuntimeFilterTimerQueue::start, this); }
260
32.3k
    void push_filter_timer(std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>>&& filter) {
261
32.3k
        std::unique_lock<std::mutex> lc(_que_lock);
262
32.3k
        _que.insert(_que.end(), filter.begin(), filter.end());
263
32.3k
        cv.notify_all();
264
32.3k
    }
265
266
    std::thread _thread;
267
    std::condition_variable cv;
268
    std::mutex cv_m;
269
    std::mutex _que_lock;
270
    std::atomic_bool _stop = false;
271
    std::atomic_bool _shutdown = false;
272
    std::list<std::shared_ptr<pipeline::RuntimeFilterTimer>> _que;
273
};
274
275
struct AggSharedState : public BasicSharedState {
276
    ENABLE_FACTORY_CREATOR(AggSharedState)
277
public:
278
220k
    AggSharedState() {
279
220k
        agg_data = std::make_unique<AggregatedDataVariants>();
280
220k
        agg_arena_pool = std::make_unique<vectorized::Arena>();
281
220k
    }
282
220k
    ~AggSharedState() override {
283
220k
        if (!probe_expr_ctxs.empty()) {
284
85.9k
            _close_with_serialized_key();
285
134k
        } else {
286
134k
            _close_without_key();
287
134k
        }
288
220k
    }
289
290
    Status reset_hash_table();
291
292
    bool do_limit_filter(vectorized::Block* block, size_t num_rows,
293
                         const std::vector<int>* key_locs = nullptr);
294
    void build_limit_heap(size_t hash_table_size);
295
296
    // We should call this function only at 1st phase.
297
    // 1st phase: is_merge=true, only have one SlotRef.
298
    // 2nd phase: is_merge=false, maybe have multiple exprs.
299
    static int get_slot_column_id(const vectorized::AggFnEvaluator* evaluator);
300
301
    AggregatedDataVariantsUPtr agg_data = nullptr;
302
    std::unique_ptr<AggregateDataContainer> aggregate_data_container;
303
    ArenaUPtr agg_arena_pool;
304
    std::vector<vectorized::AggFnEvaluator*> aggregate_evaluators;
305
    // group by k1,k2
306
    vectorized::VExprContextSPtrs probe_expr_ctxs;
307
    size_t input_num_rows = 0;
308
    std::vector<vectorized::AggregateDataPtr> values;
309
    /// The total size of the row from the aggregate functions.
310
    size_t total_size_of_aggregate_states = 0;
311
    size_t align_aggregate_states = 1;
312
    /// The offset to the n-th aggregate function in a row of aggregate functions.
313
    vectorized::Sizes offsets_of_aggregate_states;
314
    std::vector<size_t> make_nullable_keys;
315
316
    bool agg_data_created_without_key = false;
317
    bool enable_spill = false;
318
    bool reach_limit = false;
319
320
    int64_t limit = -1;
321
    bool do_sort_limit = false;
322
    vectorized::MutableColumns limit_columns;
323
    int limit_columns_min = -1;
324
    vectorized::PaddedPODArray<uint8_t> need_computes;
325
    std::vector<uint8_t> cmp_res;
326
    std::vector<int> order_directions;
327
    std::vector<int> null_directions;
328
329
    struct HeapLimitCursor {
330
        HeapLimitCursor(int row_id, vectorized::MutableColumns& limit_columns,
331
                        std::vector<int>& order_directions, std::vector<int>& null_directions)
332
                : _row_id(row_id),
333
                  _limit_columns(limit_columns),
334
                  _order_directions(order_directions),
335
29.5k
                  _null_directions(null_directions) {}
336
337
        HeapLimitCursor(const HeapLimitCursor& other) = default;
338
339
        HeapLimitCursor(HeapLimitCursor&& other) noexcept
340
                : _row_id(other._row_id),
341
                  _limit_columns(other._limit_columns),
342
                  _order_directions(other._order_directions),
343
180k
                  _null_directions(other._null_directions) {}
344
345
0
        HeapLimitCursor& operator=(const HeapLimitCursor& other) noexcept {
346
0
            _row_id = other._row_id;
347
0
            return *this;
348
0
        }
349
350
350k
        HeapLimitCursor& operator=(HeapLimitCursor&& other) noexcept {
351
350k
            _row_id = other._row_id;
352
350k
            return *this;
353
350k
        }
354
355
319k
        bool operator<(const HeapLimitCursor& rhs) const {
356
423k
            for (int i = 0; i < _limit_columns.size(); ++i) {
357
423k
                const auto& _limit_column = _limit_columns[i];
358
423k
                auto res = _limit_column->compare_at(_row_id, rhs._row_id, *_limit_column,
359
423k
                                                     _null_directions[i]) *
360
423k
                           _order_directions[i];
361
423k
                if (res < 0) {
362
162k
                    return true;
363
260k
                } else if (res > 0) {
364
157k
                    return false;
365
157k
                }
366
423k
            }
367
18.4E
            return false;
368
319k
        }
369
370
        int _row_id;
371
        vectorized::MutableColumns& _limit_columns;
372
        std::vector<int>& _order_directions;
373
        std::vector<int>& _null_directions;
374
    };
375
376
    std::priority_queue<HeapLimitCursor> limit_heap;
377
378
    // Refresh the top limit heap with a new row
379
    void refresh_top_limit(size_t row_id, const vectorized::ColumnRawPtrs& key_columns);
380
381
private:
382
    vectorized::MutableColumns _get_keys_hash_table();
383
384
85.8k
    void _close_with_serialized_key() {
385
85.8k
        std::visit(vectorized::Overload {[&](std::monostate& arg) -> void {
386
                                             // Do nothing
387
0
                                         },
388
85.8k
                                         [&](auto& agg_method) -> void {
389
85.8k
                                             auto& data = *agg_method.hash_table;
390
3.27M
                                             data.for_each_mapped([&](auto& mapped) {
391
3.27M
                                                 if (mapped) {
392
3.27M
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
3.27M
                                                     mapped = nullptr;
394
3.27M
                                                 }
395
3.27M
                                             });
_ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized16MethodSerializedI9PHHashMapINS_9StringRefEPc11DefaultHashIS9_vEEEEEEvS3_ENKUlS3_E_clISA_EEDaS3_
Line
Count
Source
390
117k
                                             data.for_each_mapped([&](auto& mapped) {
391
117k
                                                 if (mapped) {
392
117k
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
117k
                                                     mapped = nullptr;
394
117k
                                                 }
395
117k
                                             });
_ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIh9PHHashMapIhPc9HashCRC32IhEEEEEEvS3_ENKUlS3_E_clIS9_EEDaS3_
Line
Count
Source
390
986
                                             data.for_each_mapped([&](auto& mapped) {
391
986
                                                 if (mapped) {
392
986
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
986
                                                     mapped = nullptr;
394
986
                                                 }
395
986
                                             });
_ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIt9PHHashMapItPc9HashCRC32ItEEEEEEvS3_ENKUlS3_E_clIS9_EEDaS3_
Line
Count
Source
390
20
                                             data.for_each_mapped([&](auto& mapped) {
391
20
                                                 if (mapped) {
392
20
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
20
                                                     mapped = nullptr;
394
20
                                                 }
395
20
                                             });
_ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIj9PHHashMapIjPc9HashCRC32IjEEEEEEvS3_ENKUlS3_E_clIS9_EEDaS3_
Line
Count
Source
390
2.26M
                                             data.for_each_mapped([&](auto& mapped) {
391
2.26M
                                                 if (mapped) {
392
2.26M
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
2.26M
                                                     mapped = nullptr;
394
2.26M
                                                 }
395
2.26M
                                             });
_ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIm9PHHashMapImPc9HashCRC32ImEEEEEEvS3_ENKUlS3_E_clIS9_EEDaS3_
Line
Count
Source
390
43.6k
                                             data.for_each_mapped([&](auto& mapped) {
391
43.6k
                                                 if (mapped) {
392
43.6k
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
43.6k
                                                     mapped = nullptr;
394
43.6k
                                                 }
395
43.6k
                                             });
_ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized19MethodStringNoCacheINS_13StringHashMapIPcNS_9AllocatorILb1ELb1ELb0ENS_22DefaultMemoryAllocatorEEEEEEEEEvS3_ENKUlS3_E_clIS9_EEDaS3_
Line
Count
Source
390
2.68k
                                             data.for_each_mapped([&](auto& mapped) {
391
2.68k
                                                 if (mapped) {
392
2.68k
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
2.68k
                                                     mapped = nullptr;
394
2.68k
                                                 }
395
2.68k
                                             });
_ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIN4wide7integerILm128EjEE9PHHashMapISA_Pc9HashCRC32ISA_EEEEEEvS3_ENKUlS3_E_clISC_EEDaS3_
Line
Count
Source
390
80
                                             data.for_each_mapped([&](auto& mapped) {
391
80
                                                 if (mapped) {
392
80
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
80
                                                     mapped = nullptr;
394
80
                                                 }
395
80
                                             });
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIN4wide7integerILm256EjEE9PHHashMapISA_Pc9HashCRC32ISA_EEEEEEvS3_ENKUlS3_E_clISC_EEDaS3_
_ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIj9PHHashMapIjPc14HashMixWrapperIj9HashCRC32IjEEEEEEEvS3_ENKUlS3_E_clIS9_EEDaS3_
Line
Count
Source
390
70.3k
                                             data.for_each_mapped([&](auto& mapped) {
391
70.3k
                                                 if (mapped) {
392
70.3k
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
70.3k
                                                     mapped = nullptr;
394
70.3k
                                                 }
395
70.3k
                                             });
_ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIm9PHHashMapImPc14HashMixWrapperIm9HashCRC32ImEEEEEEEvS3_ENKUlS3_E_clIS9_EEDaS3_
Line
Count
Source
390
42.0k
                                             data.for_each_mapped([&](auto& mapped) {
391
42.0k
                                                 if (mapped) {
392
42.0k
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
42.0k
                                                     mapped = nullptr;
394
42.0k
                                                 }
395
42.0k
                                             });
_ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIhNS6_15DataWithNullKeyI9PHHashMapIhPc9HashCRC32IhEEEEEEEEEEvS3_ENKUlS3_E_clISB_EEDaS3_
Line
Count
Source
390
2.76k
                                             data.for_each_mapped([&](auto& mapped) {
391
2.76k
                                                 if (mapped) {
392
2.76k
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
2.76k
                                                     mapped = nullptr;
394
2.76k
                                                 }
395
2.76k
                                             });
_ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberItNS6_15DataWithNullKeyI9PHHashMapItPc9HashCRC32ItEEEEEEEEEEvS3_ENKUlS3_E_clISB_EEDaS3_
Line
Count
Source
390
888
                                             data.for_each_mapped([&](auto& mapped) {
391
888
                                                 if (mapped) {
392
888
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
888
                                                     mapped = nullptr;
394
888
                                                 }
395
888
                                             });
_ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIjNS6_15DataWithNullKeyI9PHHashMapIjPc9HashCRC32IjEEEEEEEEEEvS3_ENKUlS3_E_clISB_EEDaS3_
Line
Count
Source
390
70.5k
                                             data.for_each_mapped([&](auto& mapped) {
391
70.5k
                                                 if (mapped) {
392
70.5k
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
70.5k
                                                     mapped = nullptr;
394
70.5k
                                                 }
395
70.5k
                                             });
_ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberImNS6_15DataWithNullKeyI9PHHashMapImPc9HashCRC32ImEEEEEEEEEEvS3_ENKUlS3_E_clISB_EEDaS3_
Line
Count
Source
390
6.59k
                                             data.for_each_mapped([&](auto& mapped) {
391
6.59k
                                                 if (mapped) {
392
6.59k
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
6.59k
                                                     mapped = nullptr;
394
6.59k
                                                 }
395
6.59k
                                             });
_ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIjNS6_15DataWithNullKeyI9PHHashMapIjPc14HashMixWrapperIj9HashCRC32IjEEEEEEEEEEEvS3_ENKUlS3_E_clISB_EEDaS3_
Line
Count
Source
390
41.6k
                                             data.for_each_mapped([&](auto& mapped) {
391
41.6k
                                                 if (mapped) {
392
41.6k
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
41.6k
                                                     mapped = nullptr;
394
41.6k
                                                 }
395
41.6k
                                             });
_ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberImNS6_15DataWithNullKeyI9PHHashMapImPc14HashMixWrapperIm9HashCRC32ImEEEEEEEEEEEvS3_ENKUlS3_E_clISB_EEDaS3_
Line
Count
Source
390
798
                                             data.for_each_mapped([&](auto& mapped) {
391
798
                                                 if (mapped) {
392
798
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
798
                                                     mapped = nullptr;
394
798
                                                 }
395
798
                                             });
_ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIN4wide7integerILm128EjEENS6_15DataWithNullKeyI9PHHashMapISB_Pc9HashCRC32ISB_EEEEEEEEEEvS3_ENKUlS3_E_clISE_EEDaS3_
Line
Count
Source
390
86
                                             data.for_each_mapped([&](auto& mapped) {
391
86
                                                 if (mapped) {
392
86
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
86
                                                     mapped = nullptr;
394
86
                                                 }
395
86
                                             });
_ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIN4wide7integerILm256EjEENS6_15DataWithNullKeyI9PHHashMapISB_Pc9HashCRC32ISB_EEEEEEEEEEvS3_ENKUlS3_E_clISE_EEDaS3_
Line
Count
Source
390
6
                                             data.for_each_mapped([&](auto& mapped) {
391
6
                                                 if (mapped) {
392
6
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
6
                                                     mapped = nullptr;
394
6
                                                 }
395
6
                                             });
_ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_19MethodStringNoCacheINS6_15DataWithNullKeyINS_13StringHashMapIPcNS_9AllocatorILb1ELb1ELb0ENS_22DefaultMemoryAllocatorEEEEEEEEEEEEEvS3_ENKUlS3_E_clISB_EEDaS3_
Line
Count
Source
390
5.64k
                                             data.for_each_mapped([&](auto& mapped) {
391
5.64k
                                                 if (mapped) {
392
5.64k
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
5.64k
                                                     mapped = nullptr;
394
5.64k
                                                 }
395
5.64k
                                             });
_ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapImPc9HashCRC32ImEEEEEEvS3_ENKUlS3_E_clIS9_EEDaS3_
Line
Count
Source
390
488k
                                             data.for_each_mapped([&](auto& mapped) {
391
489k
                                                 if (mapped) {
392
489k
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
489k
                                                     mapped = nullptr;
394
489k
                                                 }
395
488k
                                             });
_ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapIN4wide7integerILm128EjEEPc9HashCRC32ISB_EEEEEEvS3_ENKUlS3_E_clISC_EEDaS3_
Line
Count
Source
390
101k
                                             data.for_each_mapped([&](auto& mapped) {
391
101k
                                                 if (mapped) {
392
101k
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
101k
                                                     mapped = nullptr;
394
101k
                                                 }
395
101k
                                             });
_ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapIN4wide7integerILm256EjEEPc9HashCRC32ISB_EEEEEEvS3_ENKUlS3_E_clISC_EEDaS3_
Line
Count
Source
390
8.82k
                                             data.for_each_mapped([&](auto& mapped) {
391
8.82k
                                                 if (mapped) {
392
8.82k
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
8.82k
                                                     mapped = nullptr;
394
8.82k
                                                 }
395
8.82k
                                             });
_ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapINS6_7UInt136EPc9HashCRC32IS9_EEEEEEvS3_ENKUlS3_E_clISA_EEDaS3_
Line
Count
Source
390
4.13k
                                             data.for_each_mapped([&](auto& mapped) {
391
4.13k
                                                 if (mapped) {
392
4.13k
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
4.13k
                                                     mapped = nullptr;
394
4.13k
                                                 }
395
4.13k
                                             });
396
85.8k
                                             if (data.has_null_key_data()) {
397
3.73k
                                                 auto st = _destroy_agg_status(
398
3.73k
                                                         data.template get_null_key_data<
399
3.73k
                                                                 vectorized::AggregateDataPtr>());
400
3.73k
                                                 if (!st) {
401
0
                                                     throw Exception(st.code(), st.to_string());
402
0
                                                 }
403
3.73k
                                             }
404
85.8k
                                         }},
_ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized16MethodSerializedI9PHHashMapINS_9StringRefEPc11DefaultHashIS9_vEEEEEEvS3_
Line
Count
Source
388
9.63k
                                         [&](auto& agg_method) -> void {
389
9.63k
                                             auto& data = *agg_method.hash_table;
390
9.63k
                                             data.for_each_mapped([&](auto& mapped) {
391
9.63k
                                                 if (mapped) {
392
9.63k
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
9.63k
                                                     mapped = nullptr;
394
9.63k
                                                 }
395
9.63k
                                             });
396
9.63k
                                             if (data.has_null_key_data()) {
397
0
                                                 auto st = _destroy_agg_status(
398
0
                                                         data.template get_null_key_data<
399
0
                                                                 vectorized::AggregateDataPtr>());
400
0
                                                 if (!st) {
401
0
                                                     throw Exception(st.code(), st.to_string());
402
0
                                                 }
403
0
                                             }
404
9.63k
                                         }},
_ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIh9PHHashMapIhPc9HashCRC32IhEEEEEEvS3_
Line
Count
Source
388
3.42k
                                         [&](auto& agg_method) -> void {
389
3.42k
                                             auto& data = *agg_method.hash_table;
390
3.42k
                                             data.for_each_mapped([&](auto& mapped) {
391
3.42k
                                                 if (mapped) {
392
3.42k
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
3.42k
                                                     mapped = nullptr;
394
3.42k
                                                 }
395
3.42k
                                             });
396
3.42k
                                             if (data.has_null_key_data()) {
397
0
                                                 auto st = _destroy_agg_status(
398
0
                                                         data.template get_null_key_data<
399
0
                                                                 vectorized::AggregateDataPtr>());
400
0
                                                 if (!st) {
401
0
                                                     throw Exception(st.code(), st.to_string());
402
0
                                                 }
403
0
                                             }
404
3.42k
                                         }},
_ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIt9PHHashMapItPc9HashCRC32ItEEEEEEvS3_
Line
Count
Source
388
116
                                         [&](auto& agg_method) -> void {
389
116
                                             auto& data = *agg_method.hash_table;
390
116
                                             data.for_each_mapped([&](auto& mapped) {
391
116
                                                 if (mapped) {
392
116
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
116
                                                     mapped = nullptr;
394
116
                                                 }
395
116
                                             });
396
116
                                             if (data.has_null_key_data()) {
397
0
                                                 auto st = _destroy_agg_status(
398
0
                                                         data.template get_null_key_data<
399
0
                                                                 vectorized::AggregateDataPtr>());
400
0
                                                 if (!st) {
401
0
                                                     throw Exception(st.code(), st.to_string());
402
0
                                                 }
403
0
                                             }
404
116
                                         }},
_ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIj9PHHashMapIjPc9HashCRC32IjEEEEEEvS3_
Line
Count
Source
388
5.50k
                                         [&](auto& agg_method) -> void {
389
5.50k
                                             auto& data = *agg_method.hash_table;
390
5.50k
                                             data.for_each_mapped([&](auto& mapped) {
391
5.50k
                                                 if (mapped) {
392
5.50k
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
5.50k
                                                     mapped = nullptr;
394
5.50k
                                                 }
395
5.50k
                                             });
396
5.50k
                                             if (data.has_null_key_data()) {
397
0
                                                 auto st = _destroy_agg_status(
398
0
                                                         data.template get_null_key_data<
399
0
                                                                 vectorized::AggregateDataPtr>());
400
0
                                                 if (!st) {
401
0
                                                     throw Exception(st.code(), st.to_string());
402
0
                                                 }
403
0
                                             }
404
5.50k
                                         }},
_ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIm9PHHashMapImPc9HashCRC32ImEEEEEEvS3_
Line
Count
Source
388
562
                                         [&](auto& agg_method) -> void {
389
562
                                             auto& data = *agg_method.hash_table;
390
562
                                             data.for_each_mapped([&](auto& mapped) {
391
562
                                                 if (mapped) {
392
562
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
562
                                                     mapped = nullptr;
394
562
                                                 }
395
562
                                             });
396
562
                                             if (data.has_null_key_data()) {
397
0
                                                 auto st = _destroy_agg_status(
398
0
                                                         data.template get_null_key_data<
399
0
                                                                 vectorized::AggregateDataPtr>());
400
0
                                                 if (!st) {
401
0
                                                     throw Exception(st.code(), st.to_string());
402
0
                                                 }
403
0
                                             }
404
562
                                         }},
_ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized19MethodStringNoCacheINS_13StringHashMapIPcNS_9AllocatorILb1ELb1ELb0ENS_22DefaultMemoryAllocatorEEEEEEEEEvS3_
Line
Count
Source
388
1.58k
                                         [&](auto& agg_method) -> void {
389
1.58k
                                             auto& data = *agg_method.hash_table;
390
1.58k
                                             data.for_each_mapped([&](auto& mapped) {
391
1.58k
                                                 if (mapped) {
392
1.58k
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
1.58k
                                                     mapped = nullptr;
394
1.58k
                                                 }
395
1.58k
                                             });
396
1.58k
                                             if (data.has_null_key_data()) {
397
0
                                                 auto st = _destroy_agg_status(
398
0
                                                         data.template get_null_key_data<
399
0
                                                                 vectorized::AggregateDataPtr>());
400
0
                                                 if (!st) {
401
0
                                                     throw Exception(st.code(), st.to_string());
402
0
                                                 }
403
0
                                             }
404
1.58k
                                         }},
_ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIN4wide7integerILm128EjEE9PHHashMapISA_Pc9HashCRC32ISA_EEEEEEvS3_
Line
Count
Source
388
122
                                         [&](auto& agg_method) -> void {
389
122
                                             auto& data = *agg_method.hash_table;
390
122
                                             data.for_each_mapped([&](auto& mapped) {
391
122
                                                 if (mapped) {
392
122
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
122
                                                     mapped = nullptr;
394
122
                                                 }
395
122
                                             });
396
122
                                             if (data.has_null_key_data()) {
397
0
                                                 auto st = _destroy_agg_status(
398
0
                                                         data.template get_null_key_data<
399
0
                                                                 vectorized::AggregateDataPtr>());
400
0
                                                 if (!st) {
401
0
                                                     throw Exception(st.code(), st.to_string());
402
0
                                                 }
403
0
                                             }
404
122
                                         }},
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIN4wide7integerILm256EjEE9PHHashMapISA_Pc9HashCRC32ISA_EEEEEEvS3_
_ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIj9PHHashMapIjPc14HashMixWrapperIj9HashCRC32IjEEEEEEEvS3_
Line
Count
Source
388
3.20k
                                         [&](auto& agg_method) -> void {
389
3.20k
                                             auto& data = *agg_method.hash_table;
390
3.20k
                                             data.for_each_mapped([&](auto& mapped) {
391
3.20k
                                                 if (mapped) {
392
3.20k
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
3.20k
                                                     mapped = nullptr;
394
3.20k
                                                 }
395
3.20k
                                             });
396
3.20k
                                             if (data.has_null_key_data()) {
397
0
                                                 auto st = _destroy_agg_status(
398
0
                                                         data.template get_null_key_data<
399
0
                                                                 vectorized::AggregateDataPtr>());
400
0
                                                 if (!st) {
401
0
                                                     throw Exception(st.code(), st.to_string());
402
0
                                                 }
403
0
                                             }
404
3.20k
                                         }},
_ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIm9PHHashMapImPc14HashMixWrapperIm9HashCRC32ImEEEEEEEvS3_
Line
Count
Source
388
682
                                         [&](auto& agg_method) -> void {
389
682
                                             auto& data = *agg_method.hash_table;
390
682
                                             data.for_each_mapped([&](auto& mapped) {
391
682
                                                 if (mapped) {
392
682
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
682
                                                     mapped = nullptr;
394
682
                                                 }
395
682
                                             });
396
682
                                             if (data.has_null_key_data()) {
397
0
                                                 auto st = _destroy_agg_status(
398
0
                                                         data.template get_null_key_data<
399
0
                                                                 vectorized::AggregateDataPtr>());
400
0
                                                 if (!st) {
401
0
                                                     throw Exception(st.code(), st.to_string());
402
0
                                                 }
403
0
                                             }
404
682
                                         }},
_ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIhNS6_15DataWithNullKeyI9PHHashMapIhPc9HashCRC32IhEEEEEEEEEEvS3_
Line
Count
Source
388
7.54k
                                         [&](auto& agg_method) -> void {
389
7.54k
                                             auto& data = *agg_method.hash_table;
390
7.54k
                                             data.for_each_mapped([&](auto& mapped) {
391
7.54k
                                                 if (mapped) {
392
7.54k
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
7.54k
                                                     mapped = nullptr;
394
7.54k
                                                 }
395
7.54k
                                             });
396
7.54k
                                             if (data.has_null_key_data()) {
397
998
                                                 auto st = _destroy_agg_status(
398
998
                                                         data.template get_null_key_data<
399
998
                                                                 vectorized::AggregateDataPtr>());
400
998
                                                 if (!st) {
401
0
                                                     throw Exception(st.code(), st.to_string());
402
0
                                                 }
403
998
                                             }
404
7.54k
                                         }},
_ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberItNS6_15DataWithNullKeyI9PHHashMapItPc9HashCRC32ItEEEEEEEEEEvS3_
Line
Count
Source
388
900
                                         [&](auto& agg_method) -> void {
389
900
                                             auto& data = *agg_method.hash_table;
390
900
                                             data.for_each_mapped([&](auto& mapped) {
391
900
                                                 if (mapped) {
392
900
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
900
                                                     mapped = nullptr;
394
900
                                                 }
395
900
                                             });
396
900
                                             if (data.has_null_key_data()) {
397
78
                                                 auto st = _destroy_agg_status(
398
78
                                                         data.template get_null_key_data<
399
78
                                                                 vectorized::AggregateDataPtr>());
400
78
                                                 if (!st) {
401
0
                                                     throw Exception(st.code(), st.to_string());
402
0
                                                 }
403
78
                                             }
404
900
                                         }},
_ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIjNS6_15DataWithNullKeyI9PHHashMapIjPc9HashCRC32IjEEEEEEEEEEvS3_
Line
Count
Source
388
14.9k
                                         [&](auto& agg_method) -> void {
389
14.9k
                                             auto& data = *agg_method.hash_table;
390
14.9k
                                             data.for_each_mapped([&](auto& mapped) {
391
14.9k
                                                 if (mapped) {
392
14.9k
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
14.9k
                                                     mapped = nullptr;
394
14.9k
                                                 }
395
14.9k
                                             });
396
14.9k
                                             if (data.has_null_key_data()) {
397
1.28k
                                                 auto st = _destroy_agg_status(
398
1.28k
                                                         data.template get_null_key_data<
399
1.28k
                                                                 vectorized::AggregateDataPtr>());
400
1.28k
                                                 if (!st) {
401
0
                                                     throw Exception(st.code(), st.to_string());
402
0
                                                 }
403
1.28k
                                             }
404
14.9k
                                         }},
_ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberImNS6_15DataWithNullKeyI9PHHashMapImPc9HashCRC32ImEEEEEEEEEEvS3_
Line
Count
Source
388
2.21k
                                         [&](auto& agg_method) -> void {
389
2.21k
                                             auto& data = *agg_method.hash_table;
390
2.21k
                                             data.for_each_mapped([&](auto& mapped) {
391
2.21k
                                                 if (mapped) {
392
2.21k
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
2.21k
                                                     mapped = nullptr;
394
2.21k
                                                 }
395
2.21k
                                             });
396
2.21k
                                             if (data.has_null_key_data()) {
397
146
                                                 auto st = _destroy_agg_status(
398
146
                                                         data.template get_null_key_data<
399
146
                                                                 vectorized::AggregateDataPtr>());
400
146
                                                 if (!st) {
401
0
                                                     throw Exception(st.code(), st.to_string());
402
0
                                                 }
403
146
                                             }
404
2.21k
                                         }},
_ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIjNS6_15DataWithNullKeyI9PHHashMapIjPc14HashMixWrapperIj9HashCRC32IjEEEEEEEEEEEvS3_
Line
Count
Source
388
9.87k
                                         [&](auto& agg_method) -> void {
389
9.87k
                                             auto& data = *agg_method.hash_table;
390
9.87k
                                             data.for_each_mapped([&](auto& mapped) {
391
9.87k
                                                 if (mapped) {
392
9.87k
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
9.87k
                                                     mapped = nullptr;
394
9.87k
                                                 }
395
9.87k
                                             });
396
9.87k
                                             if (data.has_null_key_data()) {
397
786
                                                 auto st = _destroy_agg_status(
398
786
                                                         data.template get_null_key_data<
399
786
                                                                 vectorized::AggregateDataPtr>());
400
786
                                                 if (!st) {
401
0
                                                     throw Exception(st.code(), st.to_string());
402
0
                                                 }
403
786
                                             }
404
9.87k
                                         }},
_ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberImNS6_15DataWithNullKeyI9PHHashMapImPc14HashMixWrapperIm9HashCRC32ImEEEEEEEEEEEvS3_
Line
Count
Source
388
1.68k
                                         [&](auto& agg_method) -> void {
389
1.68k
                                             auto& data = *agg_method.hash_table;
390
1.68k
                                             data.for_each_mapped([&](auto& mapped) {
391
1.68k
                                                 if (mapped) {
392
1.68k
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
1.68k
                                                     mapped = nullptr;
394
1.68k
                                                 }
395
1.68k
                                             });
396
1.68k
                                             if (data.has_null_key_data()) {
397
141
                                                 auto st = _destroy_agg_status(
398
141
                                                         data.template get_null_key_data<
399
141
                                                                 vectorized::AggregateDataPtr>());
400
141
                                                 if (!st) {
401
0
                                                     throw Exception(st.code(), st.to_string());
402
0
                                                 }
403
141
                                             }
404
1.68k
                                         }},
_ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIN4wide7integerILm128EjEENS6_15DataWithNullKeyI9PHHashMapISB_Pc9HashCRC32ISB_EEEEEEEEEEvS3_
Line
Count
Source
388
64
                                         [&](auto& agg_method) -> void {
389
64
                                             auto& data = *agg_method.hash_table;
390
64
                                             data.for_each_mapped([&](auto& mapped) {
391
64
                                                 if (mapped) {
392
64
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
64
                                                     mapped = nullptr;
394
64
                                                 }
395
64
                                             });
396
64
                                             if (data.has_null_key_data()) {
397
8
                                                 auto st = _destroy_agg_status(
398
8
                                                         data.template get_null_key_data<
399
8
                                                                 vectorized::AggregateDataPtr>());
400
8
                                                 if (!st) {
401
0
                                                     throw Exception(st.code(), st.to_string());
402
0
                                                 }
403
8
                                             }
404
64
                                         }},
_ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIN4wide7integerILm256EjEENS6_15DataWithNullKeyI9PHHashMapISB_Pc9HashCRC32ISB_EEEEEEEEEEvS3_
Line
Count
Source
388
2
                                         [&](auto& agg_method) -> void {
389
2
                                             auto& data = *agg_method.hash_table;
390
2
                                             data.for_each_mapped([&](auto& mapped) {
391
2
                                                 if (mapped) {
392
2
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
2
                                                     mapped = nullptr;
394
2
                                                 }
395
2
                                             });
396
2
                                             if (data.has_null_key_data()) {
397
0
                                                 auto st = _destroy_agg_status(
398
0
                                                         data.template get_null_key_data<
399
0
                                                                 vectorized::AggregateDataPtr>());
400
0
                                                 if (!st) {
401
0
                                                     throw Exception(st.code(), st.to_string());
402
0
                                                 }
403
0
                                             }
404
2
                                         }},
_ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_19MethodStringNoCacheINS6_15DataWithNullKeyINS_13StringHashMapIPcNS_9AllocatorILb1ELb1ELb0ENS_22DefaultMemoryAllocatorEEEEEEEEEEEEEvS3_
Line
Count
Source
388
2.76k
                                         [&](auto& agg_method) -> void {
389
2.76k
                                             auto& data = *agg_method.hash_table;
390
2.76k
                                             data.for_each_mapped([&](auto& mapped) {
391
2.76k
                                                 if (mapped) {
392
2.76k
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
2.76k
                                                     mapped = nullptr;
394
2.76k
                                                 }
395
2.76k
                                             });
396
2.76k
                                             if (data.has_null_key_data()) {
397
294
                                                 auto st = _destroy_agg_status(
398
294
                                                         data.template get_null_key_data<
399
294
                                                                 vectorized::AggregateDataPtr>());
400
294
                                                 if (!st) {
401
0
                                                     throw Exception(st.code(), st.to_string());
402
0
                                                 }
403
294
                                             }
404
2.76k
                                         }},
_ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapImPc9HashCRC32ImEEEEEEvS3_
Line
Count
Source
388
1.17k
                                         [&](auto& agg_method) -> void {
389
1.17k
                                             auto& data = *agg_method.hash_table;
390
1.17k
                                             data.for_each_mapped([&](auto& mapped) {
391
1.17k
                                                 if (mapped) {
392
1.17k
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
1.17k
                                                     mapped = nullptr;
394
1.17k
                                                 }
395
1.17k
                                             });
396
1.17k
                                             if (data.has_null_key_data()) {
397
0
                                                 auto st = _destroy_agg_status(
398
0
                                                         data.template get_null_key_data<
399
0
                                                                 vectorized::AggregateDataPtr>());
400
0
                                                 if (!st) {
401
0
                                                     throw Exception(st.code(), st.to_string());
402
0
                                                 }
403
0
                                             }
404
1.17k
                                         }},
_ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapIN4wide7integerILm128EjEEPc9HashCRC32ISB_EEEEEEvS3_
Line
Count
Source
388
12.2k
                                         [&](auto& agg_method) -> void {
389
12.2k
                                             auto& data = *agg_method.hash_table;
390
12.2k
                                             data.for_each_mapped([&](auto& mapped) {
391
12.2k
                                                 if (mapped) {
392
12.2k
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
12.2k
                                                     mapped = nullptr;
394
12.2k
                                                 }
395
12.2k
                                             });
396
12.2k
                                             if (data.has_null_key_data()) {
397
0
                                                 auto st = _destroy_agg_status(
398
0
                                                         data.template get_null_key_data<
399
0
                                                                 vectorized::AggregateDataPtr>());
400
0
                                                 if (!st) {
401
0
                                                     throw Exception(st.code(), st.to_string());
402
0
                                                 }
403
0
                                             }
404
12.2k
                                         }},
_ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapIN4wide7integerILm256EjEEPc9HashCRC32ISB_EEEEEEvS3_
Line
Count
Source
388
1.46k
                                         [&](auto& agg_method) -> void {
389
1.46k
                                             auto& data = *agg_method.hash_table;
390
1.46k
                                             data.for_each_mapped([&](auto& mapped) {
391
1.46k
                                                 if (mapped) {
392
1.46k
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
1.46k
                                                     mapped = nullptr;
394
1.46k
                                                 }
395
1.46k
                                             });
396
1.46k
                                             if (data.has_null_key_data()) {
397
0
                                                 auto st = _destroy_agg_status(
398
0
                                                         data.template get_null_key_data<
399
0
                                                                 vectorized::AggregateDataPtr>());
400
0
                                                 if (!st) {
401
0
                                                     throw Exception(st.code(), st.to_string());
402
0
                                                 }
403
0
                                             }
404
1.46k
                                         }},
_ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapINS6_7UInt136EPc9HashCRC32IS9_EEEEEEvS3_
Line
Count
Source
388
6.15k
                                         [&](auto& agg_method) -> void {
389
6.15k
                                             auto& data = *agg_method.hash_table;
390
6.15k
                                             data.for_each_mapped([&](auto& mapped) {
391
6.15k
                                                 if (mapped) {
392
6.15k
                                                     static_cast<void>(_destroy_agg_status(mapped));
393
6.15k
                                                     mapped = nullptr;
394
6.15k
                                                 }
395
6.15k
                                             });
396
6.15k
                                             if (data.has_null_key_data()) {
397
0
                                                 auto st = _destroy_agg_status(
398
0
                                                         data.template get_null_key_data<
399
0
                                                                 vectorized::AggregateDataPtr>());
400
0
                                                 if (!st) {
401
0
                                                     throw Exception(st.code(), st.to_string());
402
0
                                                 }
403
0
                                             }
404
6.15k
                                         }},
405
85.8k
                   agg_data->method_variant);
406
85.8k
    }
407
408
134k
    void _close_without_key() {
409
        //because prepare maybe failed, and couldn't create agg data.
410
        //but finally call close to destory agg data, if agg data has bitmapValue
411
        //will be core dump, it's not initialized
412
134k
        if (agg_data_created_without_key) {
413
134k
            static_cast<void>(_destroy_agg_status(agg_data->without_key));
414
134k
            agg_data_created_without_key = false;
415
134k
        }
416
134k
    }
417
    Status _destroy_agg_status(vectorized::AggregateDataPtr data);
418
};
419
420
struct BasicSpillSharedState {
421
319k
    virtual ~BasicSpillSharedState() = default;
422
423
    // These two counters are shared to spill source operators as the initial value
424
    // of 'SpillWriteFileCurrentBytes' and 'SpillWriteFileCurrentCount'.
425
    // Total bytes of spill data written to disk file(after serialized)
426
    RuntimeProfile::Counter* _spill_write_file_total_size = nullptr;
427
    RuntimeProfile::Counter* _spill_file_total_count = nullptr;
428
429
319k
    void setup_shared_profile(RuntimeProfile* sink_profile) {
430
319k
        _spill_file_total_count =
431
319k
                ADD_COUNTER_WITH_LEVEL(sink_profile, "SpillWriteFileTotalCount", TUnit::UNIT, 1);
432
319k
        _spill_write_file_total_size =
433
319k
                ADD_COUNTER_WITH_LEVEL(sink_profile, "SpillWriteFileBytes", TUnit::BYTES, 1);
434
319k
    }
435
436
    virtual void update_spill_stream_profiles(RuntimeProfile* source_profile) = 0;
437
};
438
439
struct AggSpillPartition;
440
struct PartitionedAggSharedState : public BasicSharedState,
441
                                   public BasicSpillSharedState,
442
                                   public std::enable_shared_from_this<PartitionedAggSharedState> {
443
    ENABLE_FACTORY_CREATOR(PartitionedAggSharedState)
444
445
44.1k
    PartitionedAggSharedState() = default;
446
44.1k
    ~PartitionedAggSharedState() override = default;
447
448
    void update_spill_stream_profiles(RuntimeProfile* source_profile) override;
449
450
    void init_spill_params(size_t spill_partition_count);
451
452
    void close();
453
454
    AggSharedState* in_mem_shared_state = nullptr;
455
    std::shared_ptr<BasicSharedState> in_mem_shared_state_sptr;
456
457
    size_t partition_count;
458
    size_t max_partition_index;
459
    bool is_spilled = false;
460
    std::atomic_bool is_closed = false;
461
    std::deque<std::shared_ptr<AggSpillPartition>> spill_partitions;
462
463
1.72M
    size_t get_partition_index(size_t hash_value) const { return hash_value % partition_count; }
464
};
465
466
struct AggSpillPartition {
467
    static constexpr int64_t AGG_SPILL_FILE_SIZE = 1024 * 1024 * 1024; // 1G
468
469
1.41M
    AggSpillPartition() = default;
470
471
    void close();
472
473
    Status get_spill_stream(RuntimeState* state, int node_id, RuntimeProfile* profile,
474
                            vectorized::SpillStreamSPtr& spilling_stream);
475
476
117k
    Status flush_if_full() {
477
117k
        DCHECK(spilling_stream_);
478
117k
        Status status;
479
        // avoid small spill files
480
117k
        if (spilling_stream_->get_written_bytes() >= AGG_SPILL_FILE_SIZE) {
481
0
            status = spilling_stream_->spill_eof();
482
0
            spilling_stream_.reset();
483
0
        }
484
117k
        return status;
485
117k
    }
486
487
648k
    Status finish_current_spilling(bool eos = false) {
488
648k
        if (spilling_stream_) {
489
167k
            if (eos || spilling_stream_->get_written_bytes() >= AGG_SPILL_FILE_SIZE) {
490
27.6k
                auto status = spilling_stream_->spill_eof();
491
27.6k
                spilling_stream_.reset();
492
27.6k
                return status;
493
27.6k
            }
494
167k
        }
495
620k
        return Status::OK();
496
648k
    }
497
498
    std::deque<vectorized::SpillStreamSPtr> spill_streams_;
499
    vectorized::SpillStreamSPtr spilling_stream_;
500
};
501
using AggSpillPartitionSPtr = std::shared_ptr<AggSpillPartition>;
502
struct SortSharedState : public BasicSharedState {
503
    ENABLE_FACTORY_CREATOR(SortSharedState)
504
public:
505
    std::shared_ptr<vectorized::Sorter> sorter;
506
};
507
508
struct SpillSortSharedState : public BasicSharedState,
509
                              public BasicSpillSharedState,
510
                              public std::enable_shared_from_this<SpillSortSharedState> {
511
    ENABLE_FACTORY_CREATOR(SpillSortSharedState)
512
513
237k
    SpillSortSharedState() = default;
514
236k
    ~SpillSortSharedState() override = default;
515
516
179k
    void update_spill_block_batch_row_count(RuntimeState* state, const vectorized::Block* block) {
517
179k
        auto rows = block->rows();
518
179k
        if (rows > 0 && 0 == avg_row_bytes) {
519
112k
            avg_row_bytes = std::max((std::size_t)1, block->bytes() / rows);
520
112k
            spill_block_batch_row_count =
521
112k
                    (state->spill_sort_batch_bytes() + avg_row_bytes - 1) / avg_row_bytes;
522
112k
            LOG(INFO) << "spill sort block batch row count: " << spill_block_batch_row_count;
523
112k
        }
524
179k
    }
525
526
    void update_spill_stream_profiles(RuntimeProfile* source_profile) override;
527
528
    void close();
529
530
    SortSharedState* in_mem_shared_state = nullptr;
531
    bool enable_spill = false;
532
    bool is_spilled = false;
533
    std::atomic_bool is_closed = false;
534
    std::shared_ptr<BasicSharedState> in_mem_shared_state_sptr;
535
536
    std::deque<vectorized::SpillStreamSPtr> sorted_streams;
537
    size_t avg_row_bytes = 0;
538
    size_t spill_block_batch_row_count;
539
};
540
541
struct UnionSharedState : public BasicSharedState {
542
    ENABLE_FACTORY_CREATOR(UnionSharedState)
543
544
public:
545
4.61k
    UnionSharedState(int child_count = 1) : data_queue(child_count), _child_count(child_count) {};
546
0
    int child_count() const { return _child_count; }
547
    DataQueue data_queue;
548
    const int _child_count;
549
};
550
551
struct DataQueueSharedState : public BasicSharedState {
552
    ENABLE_FACTORY_CREATOR(DataQueueSharedState)
553
public:
554
    DataQueue data_queue;
555
};
556
557
class MultiCastDataStreamer;
558
559
struct MultiCastSharedState : public BasicSharedState,
560
                              public BasicSpillSharedState,
561
                              public std::enable_shared_from_this<MultiCastSharedState> {
562
    MultiCastSharedState(ObjectPool* pool, int cast_sender_count, int node_id);
563
    std::unique_ptr<pipeline::MultiCastDataStreamer> multi_cast_data_streamer;
564
565
    void update_spill_stream_profiles(RuntimeProfile* source_profile) override;
566
};
567
568
struct AnalyticSharedState : public BasicSharedState {
569
    ENABLE_FACTORY_CREATOR(AnalyticSharedState)
570
571
public:
572
21.7k
    AnalyticSharedState() = default;
573
    std::queue<vectorized::Block> blocks_buffer;
574
    std::mutex buffer_mutex;
575
    bool sink_eos = false;
576
    std::mutex sink_eos_lock;
577
};
578
579
struct JoinSharedState : public BasicSharedState {
580
    // For some join case, we can apply a short circuit strategy
581
    // 1. _has_null_in_build_side = true
582
    // 2. build side rows is empty, Join op is: inner join/right outer join/left semi/right semi/right anti
583
    bool _has_null_in_build_side = false;
584
    bool short_circuit_for_probe = false;
585
    // for some join, when build side rows is empty, we could return directly by add some additional null data in probe table.
586
    bool empty_right_table_need_probe_dispose = false;
587
    JoinOpVariants join_op_variants;
588
};
589
590
struct HashJoinSharedState : public JoinSharedState {
591
    ENABLE_FACTORY_CREATOR(HashJoinSharedState)
592
188k
    HashJoinSharedState() {
593
188k
        hash_table_variant_vector.push_back(std::make_shared<JoinDataVariants>());
594
188k
    }
595
5.14k
    HashJoinSharedState(int num_instances) {
596
5.14k
        source_deps.resize(num_instances, nullptr);
597
5.14k
        hash_table_variant_vector.resize(num_instances, nullptr);
598
31.9k
        for (int i = 0; i < num_instances; i++) {
599
26.7k
            hash_table_variant_vector[i] = std::make_shared<JoinDataVariants>();
600
26.7k
        }
601
5.14k
    }
602
    std::shared_ptr<vectorized::Arena> arena = std::make_shared<vectorized::Arena>();
603
604
    const std::vector<TupleDescriptor*> build_side_child_desc;
605
    size_t build_exprs_size = 0;
606
    std::shared_ptr<vectorized::Block> build_block;
607
    std::shared_ptr<std::vector<uint32_t>> build_indexes_null;
608
609
    // Used by shared hash table
610
    // For probe operator, hash table in _hash_table_variants is read-only if visited flags is not
611
    // used. (visited flags will be used only in right / full outer join).
612
    //
613
    // For broadcast join, although hash table is read-only, some states in `_hash_table_variants`
614
    // are still could be written. For example, serialized keys will be written in a continuous
615
    // memory in `_hash_table_variants`. So before execution, we should use a local _hash_table_variants
616
    // which has a shared hash table in it.
617
    std::vector<std::shared_ptr<JoinDataVariants>> hash_table_variant_vector;
618
};
619
620
struct PartitionedHashJoinSharedState
621
        : public HashJoinSharedState,
622
          public BasicSpillSharedState,
623
          public std::enable_shared_from_this<PartitionedHashJoinSharedState> {
624
    ENABLE_FACTORY_CREATOR(PartitionedHashJoinSharedState)
625
626
35.7k
    void update_spill_stream_profiles(RuntimeProfile* source_profile) override {
627
1.14M
        for (auto& stream : spilled_streams) {
628
1.14M
            if (stream) {
629
1.14M
                stream->update_shared_profiles(source_profile);
630
1.14M
            }
631
1.14M
        }
632
35.7k
    }
633
634
    std::unique_ptr<RuntimeState> inner_runtime_state;
635
    std::shared_ptr<HashJoinSharedState> inner_shared_state;
636
    std::vector<std::unique_ptr<vectorized::MutableBlock>> partitioned_build_blocks;
637
    std::vector<vectorized::SpillStreamSPtr> spilled_streams;
638
    bool need_to_spill = false;
639
};
640
641
struct NestedLoopJoinSharedState : public JoinSharedState {
642
    ENABLE_FACTORY_CREATOR(NestedLoopJoinSharedState)
643
    // if true, left child has no more rows to process
644
    bool left_side_eos = false;
645
    // Visited flags for each row in build side.
646
    vectorized::MutableColumns build_side_visited_flags;
647
    // List of build blocks, constructed in prepare()
648
    vectorized::Blocks build_blocks;
649
};
650
651
struct PartitionSortNodeSharedState : public BasicSharedState {
652
    ENABLE_FACTORY_CREATOR(PartitionSortNodeSharedState)
653
public:
654
    std::queue<vectorized::Block> blocks_buffer;
655
    std::mutex buffer_mutex;
656
    std::vector<std::unique_ptr<vectorized::PartitionSorter>> partition_sorts;
657
    bool sink_eos = false;
658
    std::mutex sink_eos_lock;
659
    std::mutex prepared_finish_lock;
660
};
661
662
struct SetSharedState : public BasicSharedState {
663
    ENABLE_FACTORY_CREATOR(SetSharedState)
664
public:
665
    /// default init
666
    vectorized::Block build_block; // build to source
667
    //record element size in hashtable
668
    int64_t valid_element_in_hash_tbl = 0;
669
    //first: idx mapped to column types
670
    //second: column_id, could point to origin column or cast column
671
    std::unordered_map<int, int> build_col_idx;
672
673
    //// shared static states (shared, decided in prepare/open...)
674
675
    /// init in setup_local_state
676
    std::unique_ptr<SetDataVariants> hash_table_variants =
677
            std::make_unique<SetDataVariants>(); // the real data HERE.
678
    std::vector<bool> build_not_ignore_null;
679
680
    // The SET operator's child might have different nullable attributes.
681
    // If a calculation involves both nullable and non-nullable columns, the final output should be a nullable column
682
    Status update_build_not_ignore_null(const vectorized::VExprContextSPtrs& ctxs);
683
684
    size_t get_hash_table_size() const;
685
    /// init in both upstream side.
686
    //The i-th result expr list refers to the i-th child.
687
    std::vector<vectorized::VExprContextSPtrs> child_exprs_lists;
688
689
    /// init in build side
690
    size_t child_quantity;
691
    vectorized::VExprContextSPtrs build_child_exprs;
692
    std::vector<Dependency*> probe_finished_children_dependency;
693
694
    /// init in probe side
695
    std::vector<vectorized::VExprContextSPtrs> probe_child_exprs_lists;
696
697
    std::atomic<bool> ready_for_read = false;
698
699
    /// called in setup_local_state
700
    Status hash_table_init();
701
};
702
703
enum class ExchangeType : uint8_t {
704
    NOOP = 0,
705
    // Shuffle data by Crc32HashPartitioner<LocalExchangeChannelIds>.
706
    HASH_SHUFFLE = 1,
707
    // Round-robin passthrough data blocks.
708
    PASSTHROUGH = 2,
709
    // Shuffle data by Crc32HashPartitioner<ShuffleChannelIds> (e.g. same as storage engine).
710
    BUCKET_HASH_SHUFFLE = 3,
711
    // Passthrough data blocks to all channels.
712
    BROADCAST = 4,
713
    // Passthrough data to channels evenly in an adaptive way.
714
    ADAPTIVE_PASSTHROUGH = 5,
715
    // Send all data to the first channel.
716
    PASS_TO_ONE = 6,
717
};
718
719
214k
inline std::string get_exchange_type_name(ExchangeType idx) {
720
214k
    switch (idx) {
721
13
    case ExchangeType::NOOP:
722
13
        return "NOOP";
723
10.8k
    case ExchangeType::HASH_SHUFFLE:
724
10.8k
        return "HASH_SHUFFLE";
725
186k
    case ExchangeType::PASSTHROUGH:
726
186k
        return "PASSTHROUGH";
727
6.14k
    case ExchangeType::BUCKET_HASH_SHUFFLE:
728
6.14k
        return "BUCKET_HASH_SHUFFLE";
729
636
    case ExchangeType::BROADCAST:
730
636
        return "BROADCAST";
731
4.93k
    case ExchangeType::ADAPTIVE_PASSTHROUGH:
732
4.93k
        return "ADAPTIVE_PASSTHROUGH";
733
5.84k
    case ExchangeType::PASS_TO_ONE:
734
5.84k
        return "PASS_TO_ONE";
735
214k
    }
736
0
    throw Exception(Status::FatalError("__builtin_unreachable"));
737
214k
}
738
739
struct DataDistribution {
740
3.25M
    DataDistribution(ExchangeType type) : distribution_type(type) {}
741
    DataDistribution(ExchangeType type, const std::vector<TExpr>& partition_exprs_)
742
152k
            : distribution_type(type), partition_exprs(partition_exprs_) {}
743
239k
    DataDistribution(const DataDistribution& other) = default;
744
916k
    bool need_local_exchange() const { return distribution_type != ExchangeType::NOOP; }
745
1.02M
    DataDistribution& operator=(const DataDistribution& other) = default;
746
    ExchangeType distribution_type;
747
    std::vector<TExpr> partition_exprs;
748
};
749
750
class ExchangerBase;
751
752
struct LocalExchangeSharedState : public BasicSharedState {
753
public:
754
    ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
755
    LocalExchangeSharedState(int num_instances);
756
    ~LocalExchangeSharedState() override;
757
    std::unique_ptr<ExchangerBase> exchanger {};
758
    std::vector<RuntimeProfile::Counter*> mem_counters;
759
    std::atomic<int64_t> mem_usage = 0;
760
    std::atomic<size_t> _buffer_mem_limit = config::local_exchange_buffer_mem_limit;
761
    // We need to make sure to add mem_usage first and then enqueue, otherwise sub mem_usage may cause negative mem_usage during concurrent dequeue.
762
    std::mutex le_lock;
763
    void sub_running_sink_operators();
764
    void sub_running_source_operators();
765
214k
    void _set_always_ready() {
766
1.34M
        for (auto& dep : source_deps) {
767
1.34M
            DCHECK(dep);
768
1.34M
            dep->set_always_ready();
769
1.34M
        }
770
214k
        for (auto& dep : sink_deps) {
771
214k
            DCHECK(dep);
772
214k
            dep->set_always_ready();
773
214k
        }
774
214k
    }
775
776
290k
    Dependency* get_sink_dep_by_channel_id(int channel_id) { return nullptr; }
777
778
495k
    void set_ready_to_read(int channel_id) {
779
495k
        auto& dep = source_deps[channel_id];
780
18.4E
        DCHECK(dep) << channel_id;
781
495k
        dep->set_ready();
782
495k
    }
783
784
495k
    void add_mem_usage(int channel_id, size_t delta) { mem_counters[channel_id]->update(delta); }
785
786
495k
    void sub_mem_usage(int channel_id, size_t delta) {
787
495k
        mem_counters[channel_id]->update(-(int64_t)delta);
788
495k
    }
789
790
412k
    void add_total_mem_usage(size_t delta) {
791
412k
        if (cast_set<int64_t>(mem_usage.fetch_add(delta) + delta) > _buffer_mem_limit) {
792
17
            sink_deps.front()->block();
793
17
        }
794
412k
    }
795
796
412k
    void sub_total_mem_usage(size_t delta) {
797
412k
        auto prev_usage = mem_usage.fetch_sub(delta);
798
412k
        DCHECK_GE(prev_usage - delta, 0) << "prev_usage: " << prev_usage << " delta: " << delta;
799
412k
        if (cast_set<int64_t>(prev_usage - delta) <= _buffer_mem_limit) {
800
412k
            sink_deps.front()->set_ready();
801
412k
        }
802
412k
    }
803
804
69.7k
    void set_low_memory_mode(RuntimeState* state) {
805
69.7k
        _buffer_mem_limit = std::min<int64_t>(config::local_exchange_buffer_mem_limit,
806
69.7k
                                              state->low_memory_mode_buffer_limit());
807
69.7k
    }
808
};
809
810
//struct LocalMergeExchangeSharedState : public LocalExchangeSharedState {
811
//    ENABLE_FACTORY_CREATOR(LocalMergeExchangeSharedState);
812
//    LocalMergeExchangeSharedState(int num_instances)
813
//            : LocalExchangeSharedState(num_instances),
814
//              _each_queue_limit(config::local_exchange_buffer_mem_limit / num_instances) {}
815
//
816
//    void create_dependencies(int local_exchange_id) override {
817
//        sink_deps.resize(source_deps.size());
818
//        for (size_t i = 0; i < source_deps.size(); i++) {
819
//            source_deps[i] =
820
//                    std::make_shared<Dependency>(local_exchange_id, local_exchange_id,
821
//                                                 "LOCAL_MERGE_EXCHANGE_OPERATOR_DEPENDENCY");
822
//            source_deps[i]->set_shared_state(this);
823
//            sink_deps[i] = std::make_shared<Dependency>(
824
//                    local_exchange_id, local_exchange_id,
825
//                    "LOCAL_MERGE_EXCHANGE_OPERATOR_SINK_DEPENDENCY", true);
826
//            sink_deps[i]->set_shared_state(this);
827
//        }
828
//    }
829
//
830
//    void sub_total_mem_usage(size_t delta) override { mem_usage.fetch_sub(delta); }
831
//    void add_total_mem_usage(size_t delta) override { mem_usage.fetch_add(delta); }
832
//
833
//    void add_mem_usage(int channel_id, size_t delta) override {
834
//        LocalExchangeSharedState::add_mem_usage(channel_id, delta);
835
//        if (mem_counters[channel_id]->value() > _each_queue_limit.load()) {
836
//            sink_deps[channel_id]->block();
837
//        }
838
//    }
839
//
840
//    void sub_mem_usage(int channel_id, size_t delta) override {
841
//        LocalExchangeSharedState::sub_mem_usage(channel_id, delta);
842
//        if (mem_counters[channel_id]->value() <= _each_queue_limit.load()) {
843
//            sink_deps[channel_id]->set_ready();
844
//        }
845
//    }
846
//
847
//    void set_low_memory_mode(RuntimeState* state) override {
848
//        _buffer_mem_limit = std::min<int64_t>(config::local_exchange_buffer_mem_limit,
849
//                                              state->low_memory_mode_buffer_limit());
850
//        _each_queue_limit = std::max<int64_t>(64 * 1024, _buffer_mem_limit / source_deps.size());
851
//    }
852
//
853
//    Dependency* get_sink_dep_by_channel_id(int channel_id) override {
854
//        return sink_deps[channel_id].get();
855
//    }
856
//
857
//    std::vector<DependencySPtr> get_dep_by_channel_id(int channel_id) override {
858
//        return source_deps;
859
//    }
860
//
861
//private:
862
//    std::atomic_int64_t _each_queue_limit;
863
//};
864
865
//class QueryGlobalDependency final : public Dependency {
866
//    ENABLE_FACTORY_CREATOR(QueryGlobalDependency);
867
//    QueryGlobalDependency(std::string name, bool ready = false) : Dependency(-1, -1, name, ready) {}
868
//    ~QueryGlobalDependency() override = default;
869
//    Dependency* is_blocked_by(PipelineTask* task = nullptr) override;
870
//};
871
872
struct FetchRpcStruct {
873
    std::shared_ptr<PBackendService_Stub> stub;
874
    PMultiGetRequestV2 request;
875
    std::shared_ptr<doris::DummyBrpcCallback<PMultiGetResponseV2>> callback;
876
    MonotonicStopWatch rpc_timer;
877
};
878
879
struct MaterializationSharedState : public BasicSharedState {
880
    ENABLE_FACTORY_CREATOR(MaterializationSharedState)
881
public:
882
1.33k
    MaterializationSharedState() = default;
883
884
    Status init_multi_requests(const TMaterializationNode& tnode, RuntimeState* state);
885
    Status create_muiltget_result(const vectorized::Columns& columns, bool eos, bool gc_id_map);
886
    Status merge_multi_response(vectorized::Block* block);
887
888
    Dependency* create_source_dependency(int operator_id, int node_id,
889
                                         const std::string& name) override;
890
891
    bool rpc_struct_inited = false;
892
    Status rpc_status = Status::OK();
893
    bool last_block = false;
894
    // empty materialization sink block not need to merge block
895
    bool need_merge_block = true;
896
    vectorized::Block origin_block;
897
    // The rowid column of the origin block. should be replaced by the column of the result block.
898
    std::vector<int> rowid_locs;
899
    std::vector<vectorized::MutableBlock> response_blocks;
900
    std::map<int64_t, FetchRpcStruct> rpc_struct_map;
901
    // Register each line in which block to ensure the order of the result.
902
    // Zero means NULL value.
903
    std::vector<std::vector<int64_t>> block_order_results;
904
};
905
#include "common/compile_check_end.h"
906
} // namespace doris::pipeline