/root/doris/be/src/pipeline/dependency.h
| Line | Count | Source | 
| 1 |  | // Licensed to the Apache Software Foundation (ASF) under one | 
| 2 |  | // or more contributor license agreements.  See the NOTICE file | 
| 3 |  | // distributed with this work for additional information | 
| 4 |  | // regarding copyright ownership.  The ASF licenses this file | 
| 5 |  | // to you under the Apache License, Version 2.0 (the | 
| 6 |  | // "License"); you may not use this file except in compliance | 
| 7 |  | // with the License.  You may obtain a copy of the License at | 
| 8 |  | // | 
| 9 |  | //   http://www.apache.org/licenses/LICENSE-2.0 | 
| 10 |  | // | 
| 11 |  | // Unless required by applicable law or agreed to in writing, | 
| 12 |  | // software distributed under the License is distributed on an | 
| 13 |  | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
| 14 |  | // KIND, either express or implied.  See the License for the | 
| 15 |  | // specific language governing permissions and limitations | 
| 16 |  | // under the License. | 
| 17 |  |  | 
| 18 |  | #pragma once | 
| 19 |  |  | 
| 20 |  | #ifdef __APPLE__ | 
| 21 |  | #include <netinet/in.h> | 
| 22 |  | #include <sys/_types/_u_int.h> | 
| 23 |  | #endif | 
| 24 |  |  | 
| 25 |  | #include <concurrentqueue.h> | 
| 26 |  | #include <sqltypes.h> | 
| 27 |  |  | 
| 28 |  | #include <atomic> | 
| 29 |  | #include <functional> | 
| 30 |  | #include <memory> | 
| 31 |  | #include <mutex> | 
| 32 |  | #include <thread> | 
| 33 |  | #include <utility> | 
| 34 |  |  | 
| 35 |  | #include "common/config.h" | 
| 36 |  | #include "common/logging.h" | 
| 37 |  | #include "gen_cpp/internal_service.pb.h" | 
| 38 |  | #include "pipeline/common/agg_utils.h" | 
| 39 |  | #include "pipeline/common/join_utils.h" | 
| 40 |  | #include "pipeline/common/set_utils.h" | 
| 41 |  | #include "pipeline/exec/data_queue.h" | 
| 42 |  | #include "pipeline/exec/join/process_hash_table_probe.h" | 
| 43 |  | #include "util/brpc_closure.h" | 
| 44 |  | #include "util/stack_util.h" | 
| 45 |  | #include "vec/common/sort/partition_sorter.h" | 
| 46 |  | #include "vec/common/sort/sorter.h" | 
| 47 |  | #include "vec/core/block.h" | 
| 48 |  | #include "vec/core/types.h" | 
| 49 |  | #include "vec/spill/spill_stream.h" | 
| 50 |  |  | 
| 51 |  | namespace doris::vectorized { | 
| 52 |  | class AggFnEvaluator; | 
| 53 |  | class VSlotRef; | 
| 54 |  | } // namespace doris::vectorized | 
| 55 |  |  | 
| 56 |  | namespace doris::pipeline { | 
| 57 |  | #include "common/compile_check_begin.h" | 
| 58 |  | class Dependency; | 
| 59 |  | class PipelineTask; | 
| 60 |  | struct BasicSharedState; | 
| 61 |  | using DependencySPtr = std::shared_ptr<Dependency>; | 
| 62 |  | class LocalExchangeSourceLocalState; | 
| 63 |  |  | 
| 64 |  | static constexpr auto SLOW_DEPENDENCY_THRESHOLD = 60 * 1000L * 1000L * 1000L; | 
| 65 |  | static constexpr auto TIME_UNIT_DEPENDENCY_LOG = 30 * 1000L * 1000L * 1000L; | 
| 66 |  | static_assert(TIME_UNIT_DEPENDENCY_LOG < SLOW_DEPENDENCY_THRESHOLD); | 
| 67 |  |  | 
| 68 |  | struct BasicSharedState { | 
| 69 |  |     ENABLE_FACTORY_CREATOR(BasicSharedState) | 
| 70 |  |  | 
| 71 |  |     template <class TARGET> | 
| 72 | 96.4k |     TARGET* cast() { | 
| 73 | 96.4k |         DCHECK(dynamic_cast<TARGET*>(this)) | 
| 74 | 0 |                 << " Mismatch type! Current type is " << typeid(*this).name() | 
| 75 | 0 |                 << " and expect type is" << typeid(TARGET).name(); | 
| 76 | 96.4k |         return reinterpret_cast<TARGET*>(this); | 
| 77 | 96.4k |     } _ZN5doris8pipeline16BasicSharedState4castINS0_19HashJoinSharedStateEEEPT_v| Line | Count | Source |  | 72 | 96.0k |     TARGET* cast() { |  | 73 | 96.0k |         DCHECK(dynamic_cast<TARGET*>(this)) |  | 74 | 0 |                 << " Mismatch type! Current type is " << typeid(*this).name() |  | 75 | 0 |                 << " and expect type is" << typeid(TARGET).name(); |  | 76 | 96.0k |         return reinterpret_cast<TARGET*>(this); |  | 77 | 96.0k |     } | 
_ZN5doris8pipeline16BasicSharedState4castINS0_30PartitionedHashJoinSharedStateEEEPT_v| Line | Count | Source |  | 72 | 3 |     TARGET* cast() { |  | 73 | 3 |         DCHECK(dynamic_cast<TARGET*>(this)) |  | 74 | 0 |                 << " Mismatch type! Current type is " << typeid(*this).name() |  | 75 | 0 |                 << " and expect type is" << typeid(TARGET).name(); |  | 76 | 3 |         return reinterpret_cast<TARGET*>(this); |  | 77 | 3 |     } | 
_ZN5doris8pipeline16BasicSharedState4castINS0_15SortSharedStateEEEPT_v| Line | Count | Source |  | 72 | 34 |     TARGET* cast() { |  | 73 | 34 |         DCHECK(dynamic_cast<TARGET*>(this)) |  | 74 | 0 |                 << " Mismatch type! Current type is " << typeid(*this).name() |  | 75 | 0 |                 << " and expect type is" << typeid(TARGET).name(); |  | 76 | 34 |         return reinterpret_cast<TARGET*>(this); |  | 77 | 34 |     } | 
_ZN5doris8pipeline16BasicSharedState4castINS0_20SpillSortSharedStateEEEPT_v| Line | Count | Source |  | 72 | 13 |     TARGET* cast() { |  | 73 | 13 |         DCHECK(dynamic_cast<TARGET*>(this)) |  | 74 | 0 |                 << " Mismatch type! Current type is " << typeid(*this).name() |  | 75 | 0 |                 << " and expect type is" << typeid(TARGET).name(); |  | 76 | 13 |         return reinterpret_cast<TARGET*>(this); |  | 77 | 13 |     } | 
Unexecuted instantiation: _ZN5doris8pipeline16BasicSharedState4castINS0_25NestedLoopJoinSharedStateEEEPT_v_ZN5doris8pipeline16BasicSharedState4castINS0_19AnalyticSharedStateEEEPT_v| Line | Count | Source |  | 72 | 18 |     TARGET* cast() { |  | 73 | 18 |         DCHECK(dynamic_cast<TARGET*>(this)) |  | 74 | 0 |                 << " Mismatch type! Current type is " << typeid(*this).name() |  | 75 | 0 |                 << " and expect type is" << typeid(TARGET).name(); |  | 76 | 18 |         return reinterpret_cast<TARGET*>(this); |  | 77 | 18 |     } | 
_ZN5doris8pipeline16BasicSharedState4castINS0_14AggSharedStateEEEPT_v| Line | Count | Source |  | 72 | 70 |     TARGET* cast() { |  | 73 | 70 |         DCHECK(dynamic_cast<TARGET*>(this)) |  | 74 | 0 |                 << " Mismatch type! Current type is " << typeid(*this).name() |  | 75 | 0 |                 << " and expect type is" << typeid(TARGET).name(); |  | 76 | 70 |         return reinterpret_cast<TARGET*>(this); |  | 77 | 70 |     } | 
_ZN5doris8pipeline16BasicSharedState4castINS0_25PartitionedAggSharedStateEEEPT_v| Line | Count | Source |  | 72 | 16 |     TARGET* cast() { |  | 73 | 16 |         DCHECK(dynamic_cast<TARGET*>(this)) |  | 74 | 0 |                 << " Mismatch type! Current type is " << typeid(*this).name() |  | 75 | 0 |                 << " and expect type is" << typeid(TARGET).name(); |  | 76 | 16 |         return reinterpret_cast<TARGET*>(this); |  | 77 | 16 |     } | 
_ZN5doris8pipeline16BasicSharedState4castINS0_16UnionSharedStateEEEPT_v| Line | Count | Source |  | 72 | 4 |     TARGET* cast() { |  | 73 | 4 |         DCHECK(dynamic_cast<TARGET*>(this)) |  | 74 | 0 |                 << " Mismatch type! Current type is " << typeid(*this).name() |  | 75 | 0 |                 << " and expect type is" << typeid(TARGET).name(); |  | 76 | 4 |         return reinterpret_cast<TARGET*>(this); |  | 77 | 4 |     } | 
_ZN5doris8pipeline16BasicSharedState4castINS0_28PartitionSortNodeSharedStateEEEPT_v| Line | Count | Source |  | 72 | 204 |     TARGET* cast() { |  | 73 | 204 |         DCHECK(dynamic_cast<TARGET*>(this)) |  | 74 | 0 |                 << " Mismatch type! Current type is " << typeid(*this).name() |  | 75 | 0 |                 << " and expect type is" << typeid(TARGET).name(); |  | 76 | 204 |         return reinterpret_cast<TARGET*>(this); |  | 77 | 204 |     } | 
Unexecuted instantiation: _ZN5doris8pipeline16BasicSharedState4castINS0_20MultiCastSharedStateEEEPT_v_ZN5doris8pipeline16BasicSharedState4castINS0_14SetSharedStateEEEPT_v| Line | Count | Source |  | 72 | 33 |     TARGET* cast() { |  | 73 | 33 |         DCHECK(dynamic_cast<TARGET*>(this)) |  | 74 | 0 |                 << " Mismatch type! Current type is " << typeid(*this).name() |  | 75 | 0 |                 << " and expect type is" << typeid(TARGET).name(); |  | 76 | 33 |         return reinterpret_cast<TARGET*>(this); |  | 77 | 33 |     } | 
Unexecuted instantiation: _ZN5doris8pipeline16BasicSharedState4castINS0_24LocalExchangeSharedStateEEEPT_v_ZN5doris8pipeline16BasicSharedState4castIS1_EEPT_v| Line | Count | Source |  | 72 | 10 |     TARGET* cast() { |  | 73 | 10 |         DCHECK(dynamic_cast<TARGET*>(this)) |  | 74 | 0 |                 << " Mismatch type! Current type is " << typeid(*this).name() |  | 75 | 0 |                 << " and expect type is" << typeid(TARGET).name(); |  | 76 | 10 |         return reinterpret_cast<TARGET*>(this); |  | 77 | 10 |     } | 
_ZN5doris8pipeline16BasicSharedState4castINS0_20DataQueueSharedStateEEEPT_v| Line | Count | Source |  | 72 | 6 |     TARGET* cast() { |  | 73 | 6 |         DCHECK(dynamic_cast<TARGET*>(this)) |  | 74 | 0 |                 << " Mismatch type! Current type is " << typeid(*this).name() |  | 75 | 0 |                 << " and expect type is" << typeid(TARGET).name(); |  | 76 | 6 |         return reinterpret_cast<TARGET*>(this); |  | 77 | 6 |     } | 
 | 
| 78 |  |     template <class TARGET> | 
| 79 |  |     const TARGET* cast() const { | 
| 80 |  |         DCHECK(dynamic_cast<const TARGET*>(this)) | 
| 81 |  |                 << " Mismatch type! Current type is " << typeid(*this).name() | 
| 82 |  |                 << " and expect type is" << typeid(TARGET).name(); | 
| 83 |  |         return reinterpret_cast<const TARGET*>(this); | 
| 84 |  |     } | 
| 85 |  |     std::vector<DependencySPtr> source_deps; | 
| 86 |  |     std::vector<DependencySPtr> sink_deps; | 
| 87 |  |     int id = 0; | 
| 88 |  |     std::set<int> related_op_ids; | 
| 89 |  |  | 
| 90 | 72.3k |     virtual ~BasicSharedState() = default; | 
| 91 |  |  | 
| 92 |  |     void create_source_dependencies(int num_sources, int operator_id, int node_id, | 
| 93 |  |                                     const std::string& name); | 
| 94 |  |     Dependency* create_source_dependency(int operator_id, int node_id, const std::string& name); | 
| 95 |  |  | 
| 96 |  |     Dependency* create_sink_dependency(int dest_id, int node_id, const std::string& name); | 
| 97 | 24 |     std::vector<DependencySPtr> get_dep_by_channel_id(int channel_id) { | 
| 98 | 24 |         DCHECK_LT(channel_id, source_deps.size()); | 
| 99 | 24 |         return {source_deps[channel_id]}; | 
| 100 | 24 |     } | 
| 101 |  | }; | 
| 102 |  |  | 
| 103 |  | class Dependency : public std::enable_shared_from_this<Dependency> { | 
| 104 |  | public: | 
| 105 |  |     ENABLE_FACTORY_CREATOR(Dependency); | 
| 106 |  |     Dependency(int id, int node_id, std::string name, bool ready = false) | 
| 107 | 488k |             : _id(id), _node_id(node_id), _name(std::move(name)), _ready(ready) {} | 
| 108 | 488k |     virtual ~Dependency() = default; | 
| 109 |  |  | 
| 110 | 0 |     [[nodiscard]] int id() const { return _id; } | 
| 111 | 96.5k |     [[nodiscard]] virtual std::string name() const { return _name; } | 
| 112 | 4 |     BasicSharedState* shared_state() { return _shared_state; } | 
| 113 | 144k |     void set_shared_state(BasicSharedState* shared_state) { _shared_state = shared_state; } | 
| 114 |  |     virtual std::string debug_string(int indentation_level = 0); | 
| 115 | 645M |     bool ready() const { return _ready; } | 
| 116 |  |  | 
| 117 |  |     // Start the watcher. We use it to count how long this dependency block the current pipeline task. | 
| 118 | 23 |     void start_watcher() { _watcher.start(); } | 
| 119 | 96.1k |     [[nodiscard]] int64_t watcher_elapse_time() { return _watcher.elapsed_time(); } | 
| 120 |  |  | 
| 121 |  |     // Which dependency current pipeline task is blocked by. `nullptr` if this dependency is ready. | 
| 122 |  |     [[nodiscard]] Dependency* is_blocked_by(std::shared_ptr<PipelineTask> task = nullptr); | 
| 123 |  |     // Notify downstream pipeline tasks this dependency is ready. | 
| 124 |  |     void set_ready(); | 
| 125 | 48.6k |     void set_ready_to_read(int channel_id = 0) { | 
| 126 | 48.6k |         DCHECK_LT(channel_id, _shared_state->source_deps.size()) << debug_string(); | 
| 127 | 48.6k |         _shared_state->source_deps[channel_id]->set_ready(); | 
| 128 | 48.6k |     } | 
| 129 | 0 |     void set_ready_to_write() { | 
| 130 | 0 |         DCHECK_EQ(_shared_state->sink_deps.size(), 1) << debug_string(); | 
| 131 | 0 |         _shared_state->sink_deps.front()->set_ready(); | 
| 132 | 0 |     } | 
| 133 |  |  | 
| 134 |  |     // Notify downstream pipeline tasks this dependency is blocked. | 
| 135 | 1.84k |     void block() { | 
| 136 | 1.84k |         if (_always_ready) { | 
| 137 | 10 |             return; | 
| 138 | 10 |         } | 
| 139 | 1.83k |         std::unique_lock<std::mutex> lc(_always_ready_lock); | 
| 140 | 1.83k |         if (_always_ready) { | 
| 141 | 0 |             return; | 
| 142 | 0 |         } | 
| 143 | 1.83k |         _ready = false; | 
| 144 | 1.83k |     } | 
| 145 |  |  | 
| 146 | 131 |     void set_always_ready() { | 
| 147 | 131 |         if (_always_ready) { | 
| 148 | 35 |             return; | 
| 149 | 35 |         } | 
| 150 | 96 |         std::unique_lock<std::mutex> lc(_always_ready_lock); | 
| 151 | 96 |         if (_always_ready) { | 
| 152 | 0 |             return; | 
| 153 | 0 |         } | 
| 154 | 96 |         _always_ready = true; | 
| 155 | 96 |         set_ready(); | 
| 156 | 96 |     } | 
| 157 |  |  | 
| 158 |  | protected: | 
| 159 |  |     void _add_block_task(std::shared_ptr<PipelineTask> task); | 
| 160 |  |  | 
| 161 |  |     const int _id; | 
| 162 |  |     const int _node_id; | 
| 163 |  |     const std::string _name; | 
| 164 |  |     std::atomic<bool> _ready; | 
| 165 |  |  | 
| 166 |  |     BasicSharedState* _shared_state = nullptr; | 
| 167 |  |     MonotonicStopWatch _watcher; | 
| 168 |  |  | 
| 169 |  |     std::mutex _task_lock; | 
| 170 |  |     std::vector<std::weak_ptr<PipelineTask>> _blocked_task; | 
| 171 |  |  | 
| 172 |  |     // If `_always_ready` is true, `block()` will never block tasks. | 
| 173 |  |     std::atomic<bool> _always_ready = false; | 
| 174 |  |     std::mutex _always_ready_lock; | 
| 175 |  | }; | 
| 176 |  |  | 
| 177 |  | struct FakeSharedState final : public BasicSharedState { | 
| 178 |  |     ENABLE_FACTORY_CREATOR(FakeSharedState) | 
| 179 |  | }; | 
| 180 |  |  | 
| 181 |  | class CountedFinishDependency final : public Dependency { | 
| 182 |  | public: | 
| 183 |  |     using SharedState = FakeSharedState; | 
| 184 |  |     CountedFinishDependency(int id, int node_id, std::string name) | 
| 185 | 96.0k |             : Dependency(id, node_id, std::move(name), true) {} | 
| 186 |  |  | 
| 187 | 8 |     void add(uint32_t count = 1) { | 
| 188 | 8 |         std::unique_lock<std::mutex> l(_mtx); | 
| 189 | 8 |         if (!_counter) { | 
| 190 | 7 |             block(); | 
| 191 | 7 |         } | 
| 192 | 8 |         _counter += count; | 
| 193 | 8 |     } | 
| 194 |  |  | 
| 195 | 7 |     void sub() { | 
| 196 | 7 |         std::unique_lock<std::mutex> l(_mtx); | 
| 197 | 7 |         _counter--; | 
| 198 | 7 |         if (!_counter) { | 
| 199 | 6 |             set_ready(); | 
| 200 | 6 |         } | 
| 201 | 7 |     } | 
| 202 |  |  | 
| 203 |  |     std::string debug_string(int indentation_level = 0) override; | 
| 204 |  |  | 
| 205 |  | private: | 
| 206 |  |     std::mutex _mtx; | 
| 207 |  |     uint32_t _counter = 0; | 
| 208 |  | }; | 
| 209 |  |  | 
| 210 |  | struct RuntimeFilterTimerQueue; | 
| 211 |  | class RuntimeFilterTimer { | 
| 212 |  | public: | 
| 213 |  |     RuntimeFilterTimer(int64_t registration_time, int32_t wait_time_ms, | 
| 214 |  |                        std::shared_ptr<Dependency> parent, bool force_wait_timeout = false) | 
| 215 | 2 |             : _parent(std::move(parent)), | 
| 216 | 2 |               _registration_time(registration_time), | 
| 217 | 2 |               _wait_time_ms(wait_time_ms), | 
| 218 | 2 |               _force_wait_timeout(force_wait_timeout) {} | 
| 219 |  |  | 
| 220 |  |     // Called by runtime filter producer. | 
| 221 |  |     void call_ready(); | 
| 222 |  |  | 
| 223 |  |     // Called by RuntimeFilterTimerQueue which is responsible for checking if this rf is timeout. | 
| 224 |  |     void call_timeout(); | 
| 225 |  |  | 
| 226 | 2 |     int64_t registration_time() const { return _registration_time; } | 
| 227 | 2 |     int32_t wait_time_ms() const { return _wait_time_ms; } | 
| 228 |  |  | 
| 229 |  |     void set_local_runtime_filter_dependencies( | 
| 230 | 0 |             const std::vector<std::shared_ptr<Dependency>>& deps) { | 
| 231 | 0 |         _local_runtime_filter_dependencies = deps; | 
| 232 | 0 |     } | 
| 233 |  |  | 
| 234 |  |     bool should_be_check_timeout(); | 
| 235 |  |  | 
| 236 | 2 |     bool force_wait_timeout() { return _force_wait_timeout; } | 
| 237 |  |  | 
| 238 |  | private: | 
| 239 |  |     friend struct RuntimeFilterTimerQueue; | 
| 240 |  |     std::shared_ptr<Dependency> _parent = nullptr; | 
| 241 |  |     std::vector<std::shared_ptr<Dependency>> _local_runtime_filter_dependencies; | 
| 242 |  |     std::mutex _lock; | 
| 243 |  |     int64_t _registration_time; | 
| 244 |  |     const int32_t _wait_time_ms; | 
| 245 |  |     // true only for group_commit_scan_operator | 
| 246 |  |     bool _force_wait_timeout; | 
| 247 |  | }; | 
| 248 |  |  | 
| 249 |  | struct RuntimeFilterTimerQueue { | 
| 250 |  |     constexpr static int64_t interval = 10; | 
| 251 | 1 |     void run() { _thread.detach(); } | 
| 252 |  |     void start(); | 
| 253 |  |  | 
| 254 | 0 |     void stop() { | 
| 255 | 0 |         _stop = true; | 
| 256 | 0 |         cv.notify_all(); | 
| 257 | 0 |         wait_for_shutdown(); | 
| 258 | 0 |     } | 
| 259 |  |  | 
| 260 | 0 |     void wait_for_shutdown() const { | 
| 261 | 0 |         while (!_shutdown) { | 
| 262 | 0 |             std::this_thread::sleep_for(std::chrono::milliseconds(interval)); | 
| 263 | 0 |         } | 
| 264 | 0 |     } | 
| 265 |  |  | 
| 266 | 0 |     ~RuntimeFilterTimerQueue() = default; | 
| 267 | 1 |     RuntimeFilterTimerQueue() { _thread = std::thread(&RuntimeFilterTimerQueue::start, this); } | 
| 268 | 1 |     void push_filter_timer(std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>>&& filter) { | 
| 269 | 1 |         std::unique_lock<std::mutex> lc(_que_lock); | 
| 270 | 1 |         _que.insert(_que.end(), filter.begin(), filter.end()); | 
| 271 | 1 |         cv.notify_all(); | 
| 272 | 1 |     } | 
| 273 |  |  | 
| 274 |  |     std::thread _thread; | 
| 275 |  |     std::condition_variable cv; | 
| 276 |  |     std::mutex cv_m; | 
| 277 |  |     std::mutex _que_lock; | 
| 278 |  |     std::atomic_bool _stop = false; | 
| 279 |  |     std::atomic_bool _shutdown = false; | 
| 280 |  |     std::list<std::shared_ptr<pipeline::RuntimeFilterTimer>> _que; | 
| 281 |  | }; | 
| 282 |  |  | 
| 283 |  | struct AggSharedState : public BasicSharedState { | 
| 284 |  |     ENABLE_FACTORY_CREATOR(AggSharedState) | 
| 285 |  | public: | 
| 286 | 40 |     AggSharedState() { agg_data = std::make_unique<AggregatedDataVariants>(); } | 
| 287 | 40 |     ~AggSharedState() override { | 
| 288 | 40 |         if (!probe_expr_ctxs.empty()) { | 
| 289 | 30 |             _close_with_serialized_key(); | 
| 290 | 30 |         } else { | 
| 291 | 10 |             _close_without_key(); | 
| 292 | 10 |         } | 
| 293 | 40 |     } | 
| 294 |  |  | 
| 295 |  |     Status reset_hash_table(); | 
| 296 |  |  | 
| 297 |  |     bool do_limit_filter(vectorized::Block* block, size_t num_rows, | 
| 298 |  |                          const std::vector<int>* key_locs = nullptr); | 
| 299 |  |     void build_limit_heap(size_t hash_table_size); | 
| 300 |  |  | 
| 301 |  |     // We should call this function only at 1st phase. | 
| 302 |  |     // 1st phase: is_merge=true, only have one SlotRef. | 
| 303 |  |     // 2nd phase: is_merge=false, maybe have multiple exprs. | 
| 304 |  |     static int get_slot_column_id(const vectorized::AggFnEvaluator* evaluator); | 
| 305 |  |  | 
| 306 |  |     AggregatedDataVariantsUPtr agg_data = nullptr; | 
| 307 |  |     std::unique_ptr<AggregateDataContainer> aggregate_data_container; | 
| 308 |  |     std::vector<vectorized::AggFnEvaluator*> aggregate_evaluators; | 
| 309 |  |     // group by k1,k2 | 
| 310 |  |     vectorized::VExprContextSPtrs probe_expr_ctxs; | 
| 311 |  |     size_t input_num_rows = 0; | 
| 312 |  |     std::vector<vectorized::AggregateDataPtr> values; | 
| 313 |  |     /// The total size of the row from the aggregate functions. | 
| 314 |  |     size_t total_size_of_aggregate_states = 0; | 
| 315 |  |     size_t align_aggregate_states = 1; | 
| 316 |  |     /// The offset to the n-th aggregate function in a row of aggregate functions. | 
| 317 |  |     vectorized::Sizes offsets_of_aggregate_states; | 
| 318 |  |     std::vector<size_t> make_nullable_keys; | 
| 319 |  |  | 
| 320 |  |     bool agg_data_created_without_key = false; | 
| 321 |  |     bool enable_spill = false; | 
| 322 |  |     bool reach_limit = false; | 
| 323 |  |  | 
| 324 |  |     int64_t limit = -1; | 
| 325 |  |     bool do_sort_limit = false; | 
| 326 |  |     vectorized::MutableColumns limit_columns; | 
| 327 |  |     int limit_columns_min = -1; | 
| 328 |  |     vectorized::PaddedPODArray<uint8_t> need_computes; | 
| 329 |  |     std::vector<uint8_t> cmp_res; | 
| 330 |  |     std::vector<int> order_directions; | 
| 331 |  |     std::vector<int> null_directions; | 
| 332 |  |  | 
| 333 |  |     struct HeapLimitCursor { | 
| 334 |  |         HeapLimitCursor(int row_id, vectorized::MutableColumns& limit_columns, | 
| 335 |  |                         std::vector<int>& order_directions, std::vector<int>& null_directions) | 
| 336 | 32 |                 : _row_id(row_id), | 
| 337 | 32 |                   _limit_columns(limit_columns), | 
| 338 | 32 |                   _order_directions(order_directions), | 
| 339 | 32 |                   _null_directions(null_directions) {} | 
| 340 |  |  | 
| 341 |  |         HeapLimitCursor(const HeapLimitCursor& other) = default; | 
| 342 |  |  | 
| 343 |  |         HeapLimitCursor(HeapLimitCursor&& other) noexcept | 
| 344 | 152 |                 : _row_id(other._row_id), | 
| 345 | 152 |                   _limit_columns(other._limit_columns), | 
| 346 | 152 |                   _order_directions(other._order_directions), | 
| 347 | 152 |                   _null_directions(other._null_directions) {} | 
| 348 |  |  | 
| 349 | 0 |         HeapLimitCursor& operator=(const HeapLimitCursor& other) noexcept { | 
| 350 | 0 |             _row_id = other._row_id; | 
| 351 | 0 |             return *this; | 
| 352 | 0 |         } | 
| 353 |  |  | 
| 354 | 129 |         HeapLimitCursor& operator=(HeapLimitCursor&& other) noexcept { | 
| 355 | 129 |             _row_id = other._row_id; | 
| 356 | 129 |             return *this; | 
| 357 | 129 |         } | 
| 358 |  |  | 
| 359 | 79 |         bool operator<(const HeapLimitCursor& rhs) const { | 
| 360 | 85 |             for (int i = 0; i < _limit_columns.size(); ++i) { | 
| 361 | 79 |                 const auto& _limit_column = _limit_columns[i]; | 
| 362 | 79 |                 auto res = _limit_column->compare_at(_row_id, rhs._row_id, *_limit_column, | 
| 363 | 79 |                                                      _null_directions[i]) * | 
| 364 | 79 |                            _order_directions[i]; | 
| 365 | 79 |                 if (res < 0) { | 
| 366 | 46 |                     return true; | 
| 367 | 46 |                 } else if (res > 0) { | 
| 368 | 27 |                     return false; | 
| 369 | 27 |                 } | 
| 370 | 79 |             } | 
| 371 | 6 |             return false; | 
| 372 | 79 |         } | 
| 373 |  |  | 
| 374 |  |         int _row_id; | 
| 375 |  |         vectorized::MutableColumns& _limit_columns; | 
| 376 |  |         std::vector<int>& _order_directions; | 
| 377 |  |         std::vector<int>& _null_directions; | 
| 378 |  |     }; | 
| 379 |  |  | 
| 380 |  |     std::priority_queue<HeapLimitCursor> limit_heap; | 
| 381 |  |  | 
| 382 |  |     // Refresh the top limit heap with a new row | 
| 383 |  |     void refresh_top_limit(size_t row_id, const vectorized::ColumnRawPtrs& key_columns); | 
| 384 |  |  | 
| 385 |  | private: | 
| 386 |  |     vectorized::MutableColumns _get_keys_hash_table(); | 
| 387 |  |  | 
| 388 | 30 |     void _close_with_serialized_key() { | 
| 389 | 30 |         std::visit(vectorized::Overload {[&](std::monostate& arg) -> void { | 
| 390 |  |                                              // Do nothing | 
| 391 | 0 |                                          }, | 
| 392 | 30 |                                          [&](auto& agg_method) -> void { | 
| 393 | 30 |                                              auto& data = *agg_method.hash_table; | 
| 394 | 91 |                                              data.for_each_mapped([&](auto& mapped) { | 
| 395 | 91 |                                                  if (mapped) { | 
| 396 | 91 |                                                      static_cast<void>(_destroy_agg_status(mapped)); | 
| 397 | 91 |                                                      mapped = nullptr; | 
| 398 | 91 |                                                  } | 
| 399 | 91 |                                              }); Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapINS6_7UInt136EPc9HashCRC32IS9_EEEEEEvS3_ENKUlS3_E_clISA_EEDaS3_Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapIN4wide7integerILm256EjEEPc9HashCRC32ISB_EEEEEEvS3_ENKUlS3_E_clISC_EEDaS3_Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapIN4wide7integerILm128EjEEPc9HashCRC32ISB_EEEEEEvS3_ENKUlS3_E_clISC_EEDaS3_Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapImPc9HashCRC32ImEEEEEEvS3_ENKUlS3_E_clIS9_EEDaS3_Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_19MethodStringNoCacheINS6_15DataWithNullKeyINS_13StringHashMapIPcNS_9AllocatorILb1ELb1ELb0ENS_22DefaultMemoryAllocatorELb1EEEEEEEEEEEEEvS3_ENKUlS3_E_clISB_EEDaS3_Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIN4wide7integerILm256EjEENS6_15DataWithNullKeyI9PHHashMapISB_Pc9HashCRC32ISB_EEEEEEEEEEvS3_ENKUlS3_E_clISE_EEDaS3_Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIN4wide7integerILm128EjEENS6_15DataWithNullKeyI9PHHashMapISB_Pc9HashCRC32ISB_EEEEEEEEEEvS3_ENKUlS3_E_clISE_EEDaS3__ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberImNS6_15DataWithNullKeyI9PHHashMapImPc14HashMixWrapperIm9HashCRC32ImEEEEEEEEEEEvS3_ENKUlS3_E_clISB_EEDaS3_| Line | Count | Source |  | 394 | 20 |                                              data.for_each_mapped([&](auto& mapped) { |  | 395 | 20 |                                                  if (mapped) { |  | 396 | 20 |                                                      static_cast<void>(_destroy_agg_status(mapped)); |  | 397 | 20 |                                                      mapped = nullptr; |  | 398 | 20 |                                                  } |  | 399 | 20 |                                              }); | 
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIjNS6_15DataWithNullKeyI9PHHashMapIjPc14HashMixWrapperIj9HashCRC32IjEEEEEEEEEEEvS3_ENKUlS3_E_clISB_EEDaS3_Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberImNS6_15DataWithNullKeyI9PHHashMapImPc9HashCRC32ImEEEEEEEEEEvS3_ENKUlS3_E_clISB_EEDaS3_Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIjNS6_15DataWithNullKeyI9PHHashMapIjPc9HashCRC32IjEEEEEEEEEEvS3_ENKUlS3_E_clISB_EEDaS3_Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberItNS6_15DataWithNullKeyI9PHHashMapItPc9HashCRC32ItEEEEEEEEEEvS3_ENKUlS3_E_clISB_EEDaS3_Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIhNS6_15DataWithNullKeyI9PHHashMapIhPc9HashCRC32IhEEEEEEEEEEvS3_ENKUlS3_E_clISB_EEDaS3__ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIm9PHHashMapImPc14HashMixWrapperIm9HashCRC32ImEEEEEEEvS3_ENKUlS3_E_clIS9_EEDaS3_| Line | Count | Source |  | 394 | 55 |                                              data.for_each_mapped([&](auto& mapped) { |  | 395 | 55 |                                                  if (mapped) { |  | 396 | 55 |                                                      static_cast<void>(_destroy_agg_status(mapped)); |  | 397 | 55 |                                                      mapped = nullptr; |  | 398 | 55 |                                                  } |  | 399 | 55 |                                              }); | 
_ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIj9PHHashMapIjPc14HashMixWrapperIj9HashCRC32IjEEEEEEEvS3_ENKUlS3_E_clIS9_EEDaS3_| Line | Count | Source |  | 394 | 16 |                                              data.for_each_mapped([&](auto& mapped) { |  | 395 | 16 |                                                  if (mapped) { |  | 396 | 16 |                                                      static_cast<void>(_destroy_agg_status(mapped)); |  | 397 | 16 |                                                      mapped = nullptr; |  | 398 | 16 |                                                  } |  | 399 | 16 |                                              }); | 
Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIN4wide7integerILm256EjEE9PHHashMapISA_Pc9HashCRC32ISA_EEEEEEvS3_ENKUlS3_E_clISC_EEDaS3_Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIN4wide7integerILm128EjEE9PHHashMapISA_Pc9HashCRC32ISA_EEEEEEvS3_ENKUlS3_E_clISC_EEDaS3_Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized19MethodStringNoCacheINS_13StringHashMapIPcNS_9AllocatorILb1ELb1ELb0ENS_22DefaultMemoryAllocatorELb1EEEEEEEEEvS3_ENKUlS3_E_clIS9_EEDaS3_Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIm9PHHashMapImPc9HashCRC32ImEEEEEEvS3_ENKUlS3_E_clIS9_EEDaS3_Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIj9PHHashMapIjPc9HashCRC32IjEEEEEEvS3_ENKUlS3_E_clIS9_EEDaS3_Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIt9PHHashMapItPc9HashCRC32ItEEEEEEvS3_ENKUlS3_E_clIS9_EEDaS3_Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIh9PHHashMapIhPc9HashCRC32IhEEEEEEvS3_ENKUlS3_E_clIS9_EEDaS3_Unexecuted instantiation: _ZZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized16MethodSerializedI9PHHashMapINS_9StringRefEPc11DefaultHashIS9_vEEEEEEvS3_ENKUlS3_E_clISA_EEDaS3_ | 
| 400 | 30 |                                              if (data.has_null_key_data()) { | 
| 401 | 5 |                                                  auto st = _destroy_agg_status( | 
| 402 | 5 |                                                          data.template get_null_key_data< | 
| 403 | 5 |                                                                  vectorized::AggregateDataPtr>()); | 
| 404 | 5 |                                                  if (!st) { | 
| 405 | 0 |                                                      throw Exception(st.code(), st.to_string()); | 
| 406 | 0 |                                                  } | 
| 407 | 5 |                                              } | 
| 408 | 30 |                                          }}, Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapINS6_7UInt136EPc9HashCRC32IS9_EEEEEEvS3_Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapIN4wide7integerILm256EjEEPc9HashCRC32ISB_EEEEEEvS3_Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapIN4wide7integerILm128EjEEPc9HashCRC32ISB_EEEEEEvS3_Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodKeysFixedI9PHHashMapImPc9HashCRC32ImEEEEEEvS3_Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_19MethodStringNoCacheINS6_15DataWithNullKeyINS_13StringHashMapIPcNS_9AllocatorILb1ELb1ELb0ENS_22DefaultMemoryAllocatorELb1EEEEEEEEEEEEEvS3_Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIN4wide7integerILm256EjEENS6_15DataWithNullKeyI9PHHashMapISB_Pc9HashCRC32ISB_EEEEEEEEEEvS3_Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIN4wide7integerILm128EjEENS6_15DataWithNullKeyI9PHHashMapISB_Pc9HashCRC32ISB_EEEEEEEEEEvS3__ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberImNS6_15DataWithNullKeyI9PHHashMapImPc14HashMixWrapperIm9HashCRC32ImEEEEEEEEEEEvS3_| Line | Count | Source |  | 392 | 5 |                                          [&](auto& agg_method) -> void { |  | 393 | 5 |                                              auto& data = *agg_method.hash_table; |  | 394 | 5 |                                              data.for_each_mapped([&](auto& mapped) { |  | 395 | 5 |                                                  if (mapped) { |  | 396 | 5 |                                                      static_cast<void>(_destroy_agg_status(mapped)); |  | 397 | 5 |                                                      mapped = nullptr; |  | 398 | 5 |                                                  } |  | 399 | 5 |                                              }); |  | 400 | 5 |                                              if (data.has_null_key_data()) { |  | 401 | 5 |                                                  auto st = _destroy_agg_status( |  | 402 | 5 |                                                          data.template get_null_key_data< |  | 403 | 5 |                                                                  vectorized::AggregateDataPtr>()); |  | 404 | 5 |                                                  if (!st) { |  | 405 | 0 |                                                      throw Exception(st.code(), st.to_string()); |  | 406 | 0 |                                                  } |  | 407 | 5 |                                              } |  | 408 | 5 |                                          }}, | 
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIjNS6_15DataWithNullKeyI9PHHashMapIjPc14HashMixWrapperIj9HashCRC32IjEEEEEEEEEEEvS3_Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberImNS6_15DataWithNullKeyI9PHHashMapImPc9HashCRC32ImEEEEEEEEEEvS3_Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIjNS6_15DataWithNullKeyI9PHHashMapIjPc9HashCRC32IjEEEEEEEEEEvS3_Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberItNS6_15DataWithNullKeyI9PHHashMapItPc9HashCRC32ItEEEEEEEEEEvS3_Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized26MethodSingleNullableColumnINS6_15MethodOneNumberIhNS6_15DataWithNullKeyI9PHHashMapIhPc9HashCRC32IhEEEEEEEEEEvS3__ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIm9PHHashMapImPc14HashMixWrapperIm9HashCRC32ImEEEEEEEvS3_| Line | Count | Source |  | 392 | 14 |                                          [&](auto& agg_method) -> void { |  | 393 | 14 |                                              auto& data = *agg_method.hash_table; |  | 394 | 14 |                                              data.for_each_mapped([&](auto& mapped) { |  | 395 | 14 |                                                  if (mapped) { |  | 396 | 14 |                                                      static_cast<void>(_destroy_agg_status(mapped)); |  | 397 | 14 |                                                      mapped = nullptr; |  | 398 | 14 |                                                  } |  | 399 | 14 |                                              }); |  | 400 | 14 |                                              if (data.has_null_key_data()) { |  | 401 | 0 |                                                  auto st = _destroy_agg_status( |  | 402 | 0 |                                                          data.template get_null_key_data< |  | 403 | 0 |                                                                  vectorized::AggregateDataPtr>()); |  | 404 | 0 |                                                  if (!st) { |  | 405 | 0 |                                                      throw Exception(st.code(), st.to_string()); |  | 406 | 0 |                                                  } |  | 407 | 0 |                                              } |  | 408 | 14 |                                          }}, | 
_ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIj9PHHashMapIjPc14HashMixWrapperIj9HashCRC32IjEEEEEEEvS3_| Line | Count | Source |  | 392 | 11 |                                          [&](auto& agg_method) -> void { |  | 393 | 11 |                                              auto& data = *agg_method.hash_table; |  | 394 | 11 |                                              data.for_each_mapped([&](auto& mapped) { |  | 395 | 11 |                                                  if (mapped) { |  | 396 | 11 |                                                      static_cast<void>(_destroy_agg_status(mapped)); |  | 397 | 11 |                                                      mapped = nullptr; |  | 398 | 11 |                                                  } |  | 399 | 11 |                                              }); |  | 400 | 11 |                                              if (data.has_null_key_data()) { |  | 401 | 0 |                                                  auto st = _destroy_agg_status( |  | 402 | 0 |                                                          data.template get_null_key_data< |  | 403 | 0 |                                                                  vectorized::AggregateDataPtr>()); |  | 404 | 0 |                                                  if (!st) { |  | 405 | 0 |                                                      throw Exception(st.code(), st.to_string()); |  | 406 | 0 |                                                  } |  | 407 | 0 |                                              } |  | 408 | 11 |                                          }}, | 
Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIN4wide7integerILm256EjEE9PHHashMapISA_Pc9HashCRC32ISA_EEEEEEvS3_Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIN4wide7integerILm128EjEE9PHHashMapISA_Pc9HashCRC32ISA_EEEEEEvS3_Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized19MethodStringNoCacheINS_13StringHashMapIPcNS_9AllocatorILb1ELb1ELb0ENS_22DefaultMemoryAllocatorELb1EEEEEEEEEvS3_Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIm9PHHashMapImPc9HashCRC32ImEEEEEEvS3_Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIj9PHHashMapIjPc9HashCRC32IjEEEEEEvS3_Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIt9PHHashMapItPc9HashCRC32ItEEEEEEvS3_Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized15MethodOneNumberIh9PHHashMapIhPc9HashCRC32IhEEEEEEvS3_Unexecuted instantiation: _ZZN5doris8pipeline14AggSharedState26_close_with_serialized_keyEvENKUlRT_E_clINS_10vectorized16MethodSerializedI9PHHashMapINS_9StringRefEPc11DefaultHashIS9_vEEEEEEvS3_ | 
| 409 | 30 |                    agg_data->method_variant); | 
| 410 | 30 |     } | 
| 411 |  |  | 
| 412 | 10 |     void _close_without_key() { | 
| 413 |  |         //because prepare maybe failed, and couldn't create agg data. | 
| 414 |  |         //but finally call close to destory agg data, if agg data has bitmapValue | 
| 415 |  |         //will be core dump, it's not initialized | 
| 416 | 10 |         if (agg_data_created_without_key) { | 
| 417 | 8 |             static_cast<void>(_destroy_agg_status(agg_data->without_key)); | 
| 418 | 8 |             agg_data_created_without_key = false; | 
| 419 | 8 |         } | 
| 420 | 10 |     } | 
| 421 |  |     Status _destroy_agg_status(vectorized::AggregateDataPtr data); | 
| 422 |  | }; | 
| 423 |  |  | 
| 424 |  | struct BasicSpillSharedState { | 
| 425 | 55 |     virtual ~BasicSpillSharedState() = default; | 
| 426 |  |  | 
| 427 |  |     // These two counters are shared to spill source operators as the initial value | 
| 428 |  |     // of 'SpillWriteFileCurrentBytes' and 'SpillWriteFileCurrentCount'. | 
| 429 |  |     // Total bytes of spill data written to disk file(after serialized) | 
| 430 |  |     RuntimeProfile::Counter* _spill_write_file_total_size = nullptr; | 
| 431 |  |     RuntimeProfile::Counter* _spill_file_total_count = nullptr; | 
| 432 |  |  | 
| 433 | 31 |     void setup_shared_profile(RuntimeProfile* sink_profile) { | 
| 434 | 31 |         _spill_file_total_count = | 
| 435 | 31 |                 ADD_COUNTER_WITH_LEVEL(sink_profile, "SpillWriteFileTotalCount", TUnit::UNIT, 1); | 
| 436 | 31 |         _spill_write_file_total_size = | 
| 437 | 31 |                 ADD_COUNTER_WITH_LEVEL(sink_profile, "SpillWriteFileBytes", TUnit::BYTES, 1); | 
| 438 | 31 |     } | 
| 439 |  |  | 
| 440 |  |     virtual void update_spill_stream_profiles(RuntimeProfile* source_profile) = 0; | 
| 441 |  | }; | 
| 442 |  |  | 
| 443 |  | struct AggSpillPartition; | 
| 444 |  | struct PartitionedAggSharedState : public BasicSharedState, | 
| 445 |  |                                    public BasicSpillSharedState, | 
| 446 |  |                                    public std::enable_shared_from_this<PartitionedAggSharedState> { | 
| 447 |  |     ENABLE_FACTORY_CREATOR(PartitionedAggSharedState) | 
| 448 |  |  | 
| 449 | 12 |     PartitionedAggSharedState() = default; | 
| 450 | 12 |     ~PartitionedAggSharedState() override = default; | 
| 451 |  |  | 
| 452 |  |     void update_spill_stream_profiles(RuntimeProfile* source_profile) override; | 
| 453 |  |  | 
| 454 |  |     void init_spill_params(size_t spill_partition_count); | 
| 455 |  |  | 
| 456 |  |     void close(); | 
| 457 |  |  | 
| 458 |  |     AggSharedState* in_mem_shared_state = nullptr; | 
| 459 |  |     std::shared_ptr<BasicSharedState> in_mem_shared_state_sptr; | 
| 460 |  |  | 
| 461 |  |     size_t partition_count; | 
| 462 |  |     size_t max_partition_index; | 
| 463 |  |     bool is_spilled = false; | 
| 464 |  |     std::atomic_bool is_closed = false; | 
| 465 |  |     std::deque<std::shared_ptr<AggSpillPartition>> spill_partitions; | 
| 466 |  |  | 
| 467 | 1.04M |     size_t get_partition_index(size_t hash_value) const { return hash_value % partition_count; } | 
| 468 |  | }; | 
| 469 |  |  | 
| 470 |  | struct AggSpillPartition { | 
| 471 |  |     static constexpr int64_t AGG_SPILL_FILE_SIZE = 1024 * 1024 * 1024; // 1G | 
| 472 |  |  | 
| 473 | 352 |     AggSpillPartition() = default; | 
| 474 |  |  | 
| 475 |  |     void close(); | 
| 476 |  |  | 
| 477 |  |     Status get_spill_stream(RuntimeState* state, int node_id, RuntimeProfile* profile, | 
| 478 |  |                             vectorized::SpillStreamSPtr& spilling_stream); | 
| 479 |  |  | 
| 480 | 64 |     Status flush_if_full() { | 
| 481 | 64 |         DCHECK(spilling_stream_); | 
| 482 | 64 |         Status status; | 
| 483 |  |         // avoid small spill files | 
| 484 | 64 |         if (spilling_stream_->get_written_bytes() >= AGG_SPILL_FILE_SIZE) { | 
| 485 | 0 |             status = spilling_stream_->spill_eof(); | 
| 486 | 0 |             spilling_stream_.reset(); | 
| 487 | 0 |         } | 
| 488 | 64 |         return status; | 
| 489 | 64 |     } | 
| 490 |  |  | 
| 491 | 352 |     Status finish_current_spilling(bool eos = false) { | 
| 492 | 352 |         if (spilling_stream_) { | 
| 493 | 100 |             if (eos || spilling_stream_->get_written_bytes() >= AGG_SPILL_FILE_SIZE) { | 
| 494 | 48 |                 auto status = spilling_stream_->spill_eof(); | 
| 495 | 48 |                 spilling_stream_.reset(); | 
| 496 | 48 |                 return status; | 
| 497 | 48 |             } | 
| 498 | 100 |         } | 
| 499 | 304 |         return Status::OK(); | 
| 500 | 352 |     } | 
| 501 |  |  | 
| 502 |  |     std::deque<vectorized::SpillStreamSPtr> spill_streams_; | 
| 503 |  |     vectorized::SpillStreamSPtr spilling_stream_; | 
| 504 |  | }; | 
| 505 |  | using AggSpillPartitionSPtr = std::shared_ptr<AggSpillPartition>; | 
| 506 |  | struct SortSharedState : public BasicSharedState { | 
| 507 |  |     ENABLE_FACTORY_CREATOR(SortSharedState) | 
| 508 |  | public: | 
| 509 |  |     std::shared_ptr<vectorized::Sorter> sorter; | 
| 510 |  | }; | 
| 511 |  |  | 
| 512 |  | struct SpillSortSharedState : public BasicSharedState, | 
| 513 |  |                               public BasicSpillSharedState, | 
| 514 |  |                               public std::enable_shared_from_this<SpillSortSharedState> { | 
| 515 |  |     ENABLE_FACTORY_CREATOR(SpillSortSharedState) | 
| 516 |  |  | 
| 517 | 10 |     SpillSortSharedState() = default; | 
| 518 | 10 |     ~SpillSortSharedState() override = default; | 
| 519 |  |  | 
| 520 | 5 |     void update_spill_block_batch_row_count(RuntimeState* state, const vectorized::Block* block) { | 
| 521 | 5 |         auto rows = block->rows(); | 
| 522 | 5 |         if (rows > 0 && 0 == avg_row_bytes) { | 
| 523 | 4 |             avg_row_bytes = std::max((std::size_t)1, block->bytes() / rows); | 
| 524 | 4 |             spill_block_batch_row_count = | 
| 525 | 4 |                     (state->spill_sort_batch_bytes() + avg_row_bytes - 1) / avg_row_bytes; | 
| 526 | 4 |             LOG(INFO) << "spill sort block batch row count: " << spill_block_batch_row_count; | 
| 527 | 4 |         } | 
| 528 | 5 |     } | 
| 529 |  |  | 
| 530 |  |     void update_spill_stream_profiles(RuntimeProfile* source_profile) override; | 
| 531 |  |  | 
| 532 |  |     void close(); | 
| 533 |  |  | 
| 534 |  |     SortSharedState* in_mem_shared_state = nullptr; | 
| 535 |  |     bool enable_spill = false; | 
| 536 |  |     bool is_spilled = false; | 
| 537 |  |     int64_t limit = -1; | 
| 538 |  |     int64_t offset = 0; | 
| 539 |  |     std::atomic_bool is_closed = false; | 
| 540 |  |     std::shared_ptr<BasicSharedState> in_mem_shared_state_sptr; | 
| 541 |  |  | 
| 542 |  |     std::deque<vectorized::SpillStreamSPtr> sorted_streams; | 
| 543 |  |     size_t avg_row_bytes = 0; | 
| 544 |  |     size_t spill_block_batch_row_count; | 
| 545 |  | }; | 
| 546 |  |  | 
| 547 |  | struct UnionSharedState : public BasicSharedState { | 
| 548 |  |     ENABLE_FACTORY_CREATOR(UnionSharedState) | 
| 549 |  |  | 
| 550 |  | public: | 
| 551 | 1 |     UnionSharedState(int child_count = 1) : data_queue(child_count), _child_count(child_count) {}; | 
| 552 | 0 |     int child_count() const { return _child_count; } | 
| 553 |  |     DataQueue data_queue; | 
| 554 |  |     const int _child_count; | 
| 555 |  | }; | 
| 556 |  |  | 
| 557 |  | struct DataQueueSharedState : public BasicSharedState { | 
| 558 |  |     ENABLE_FACTORY_CREATOR(DataQueueSharedState) | 
| 559 |  | public: | 
| 560 |  |     DataQueue data_queue; | 
| 561 |  | }; | 
| 562 |  |  | 
| 563 |  | class MultiCastDataStreamer; | 
| 564 |  |  | 
| 565 |  | struct MultiCastSharedState : public BasicSharedState, | 
| 566 |  |                               public BasicSpillSharedState, | 
| 567 |  |                               public std::enable_shared_from_this<MultiCastSharedState> { | 
| 568 |  |     MultiCastSharedState(ObjectPool* pool, int cast_sender_count, int node_id); | 
| 569 |  |     std::unique_ptr<pipeline::MultiCastDataStreamer> multi_cast_data_streamer; | 
| 570 |  |  | 
| 571 |  |     void update_spill_stream_profiles(RuntimeProfile* source_profile) override; | 
| 572 |  | }; | 
| 573 |  |  | 
| 574 |  | struct AnalyticSharedState : public BasicSharedState { | 
| 575 |  |     ENABLE_FACTORY_CREATOR(AnalyticSharedState) | 
| 576 |  |  | 
| 577 |  | public: | 
| 578 | 9 |     AnalyticSharedState() = default; | 
| 579 |  |     std::queue<vectorized::Block> blocks_buffer; | 
| 580 |  |     std::mutex buffer_mutex; | 
| 581 |  |     bool sink_eos = false; | 
| 582 |  |     std::mutex sink_eos_lock; | 
| 583 |  | }; | 
| 584 |  |  | 
| 585 |  | struct JoinSharedState : public BasicSharedState { | 
| 586 |  |     // For some join case, we can apply a short circuit strategy | 
| 587 |  |     // 1. _has_null_in_build_side = true | 
| 588 |  |     // 2. build side rows is empty, Join op is: inner join/right outer join/left semi/right semi/right anti | 
| 589 |  |     bool _has_null_in_build_side = false; | 
| 590 |  |     bool short_circuit_for_probe = false; | 
| 591 |  |     // for some join, when build side rows is empty, we could return directly by add some additional null data in probe table. | 
| 592 |  |     bool empty_right_table_need_probe_dispose = false; | 
| 593 |  |     JoinOpVariants join_op_variants; | 
| 594 |  | }; | 
| 595 |  |  | 
| 596 |  | struct HashJoinSharedState : public JoinSharedState { | 
| 597 |  |     ENABLE_FACTORY_CREATOR(HashJoinSharedState) | 
| 598 | 72.1k |     HashJoinSharedState() { | 
| 599 | 72.1k |         hash_table_variant_vector.push_back(std::make_shared<JoinDataVariants>()); | 
| 600 | 72.1k |     } | 
| 601 | 1 |     HashJoinSharedState(int num_instances) { | 
| 602 | 1 |         source_deps.resize(num_instances, nullptr); | 
| 603 | 1 |         hash_table_variant_vector.resize(num_instances, nullptr); | 
| 604 | 9 |         for (int i = 0; i < num_instances; i++) { | 
| 605 | 8 |             hash_table_variant_vector[i] = std::make_shared<JoinDataVariants>(); | 
| 606 | 8 |         } | 
| 607 | 1 |     } | 
| 608 |  |     std::shared_ptr<vectorized::Arena> arena = std::make_shared<vectorized::Arena>(); | 
| 609 |  |  | 
| 610 |  |     const std::vector<TupleDescriptor*> build_side_child_desc; | 
| 611 |  |     size_t build_exprs_size = 0; | 
| 612 |  |     std::shared_ptr<vectorized::Block> build_block; | 
| 613 |  |     std::shared_ptr<std::vector<uint32_t>> build_indexes_null; | 
| 614 |  |  | 
| 615 |  |     // Used by shared hash table | 
| 616 |  |     // For probe operator, hash table in _hash_table_variants is read-only if visited flags is not | 
| 617 |  |     // used. (visited flags will be used only in right / full outer join). | 
| 618 |  |     // | 
| 619 |  |     // For broadcast join, although hash table is read-only, some states in `_hash_table_variants` | 
| 620 |  |     // are still could be written. For example, serialized keys will be written in a continuous | 
| 621 |  |     // memory in `_hash_table_variants`. So before execution, we should use a local _hash_table_variants | 
| 622 |  |     // which has a shared hash table in it. | 
| 623 |  |     std::vector<std::shared_ptr<JoinDataVariants>> hash_table_variant_vector; | 
| 624 |  | }; | 
| 625 |  |  | 
| 626 |  | struct PartitionedHashJoinSharedState | 
| 627 |  |         : public HashJoinSharedState, | 
| 628 |  |           public BasicSpillSharedState, | 
| 629 |  |           public std::enable_shared_from_this<PartitionedHashJoinSharedState> { | 
| 630 |  |     ENABLE_FACTORY_CREATOR(PartitionedHashJoinSharedState) | 
| 631 |  |  | 
| 632 | 0 |     void update_spill_stream_profiles(RuntimeProfile* source_profile) override { | 
| 633 | 0 |         for (auto& stream : spilled_streams) { | 
| 634 | 0 |             if (stream) { | 
| 635 | 0 |                 stream->update_shared_profiles(source_profile); | 
| 636 | 0 |             } | 
| 637 | 0 |         } | 
| 638 | 0 |     } | 
| 639 |  |  | 
| 640 |  |     std::unique_ptr<RuntimeState> inner_runtime_state; | 
| 641 |  |     std::shared_ptr<HashJoinSharedState> inner_shared_state; | 
| 642 |  |     std::vector<std::unique_ptr<vectorized::MutableBlock>> partitioned_build_blocks; | 
| 643 |  |     std::vector<vectorized::SpillStreamSPtr> spilled_streams; | 
| 644 |  |     bool is_spilled = false; | 
| 645 |  | }; | 
| 646 |  |  | 
| 647 |  | struct NestedLoopJoinSharedState : public JoinSharedState { | 
| 648 |  |     ENABLE_FACTORY_CREATOR(NestedLoopJoinSharedState) | 
| 649 |  |     // if true, left child has no more rows to process | 
| 650 |  |     bool left_side_eos = false; | 
| 651 |  |     // Visited flags for each row in build side. | 
| 652 |  |     vectorized::MutableColumns build_side_visited_flags; | 
| 653 |  |     // List of build blocks, constructed in prepare() | 
| 654 |  |     vectorized::Blocks build_blocks; | 
| 655 |  | }; | 
| 656 |  |  | 
| 657 |  | struct PartitionSortNodeSharedState : public BasicSharedState { | 
| 658 |  |     ENABLE_FACTORY_CREATOR(PartitionSortNodeSharedState) | 
| 659 |  | public: | 
| 660 |  |     std::queue<vectorized::Block> blocks_buffer; | 
| 661 |  |     std::mutex buffer_mutex; | 
| 662 |  |     std::vector<std::unique_ptr<vectorized::PartitionSorter>> partition_sorts; | 
| 663 |  |     bool sink_eos = false; | 
| 664 |  |     std::mutex sink_eos_lock; | 
| 665 |  |     std::mutex prepared_finish_lock; | 
| 666 |  | }; | 
| 667 |  |  | 
| 668 |  | struct SetSharedState : public BasicSharedState { | 
| 669 |  |     ENABLE_FACTORY_CREATOR(SetSharedState) | 
| 670 |  | public: | 
| 671 |  |     /// default init | 
| 672 |  |     vectorized::Block build_block; // build to source | 
| 673 |  |     //record element size in hashtable | 
| 674 |  |     int64_t valid_element_in_hash_tbl = 0; | 
| 675 |  |     //first: idx mapped to column types | 
| 676 |  |     //second: column_id, could point to origin column or cast column | 
| 677 |  |     std::unordered_map<int, int> build_col_idx; | 
| 678 |  |  | 
| 679 |  |     //// shared static states (shared, decided in prepare/open...) | 
| 680 |  |  | 
| 681 |  |     /// init in setup_local_state | 
| 682 |  |     std::unique_ptr<SetDataVariants> hash_table_variants = | 
| 683 |  |             std::make_unique<SetDataVariants>(); // the real data HERE. | 
| 684 |  |     std::vector<bool> build_not_ignore_null; | 
| 685 |  |  | 
| 686 |  |     // The SET operator's child might have different nullable attributes. | 
| 687 |  |     // If a calculation involves both nullable and non-nullable columns, the final output should be a nullable column | 
| 688 |  |     Status update_build_not_ignore_null(const vectorized::VExprContextSPtrs& ctxs); | 
| 689 |  |  | 
| 690 |  |     size_t get_hash_table_size() const; | 
| 691 |  |     /// init in both upstream side. | 
| 692 |  |     //The i-th result expr list refers to the i-th child. | 
| 693 |  |     std::vector<vectorized::VExprContextSPtrs> child_exprs_lists; | 
| 694 |  |  | 
| 695 |  |     /// init in build side | 
| 696 |  |     size_t child_quantity; | 
| 697 |  |     vectorized::VExprContextSPtrs build_child_exprs; | 
| 698 |  |     std::vector<Dependency*> probe_finished_children_dependency; | 
| 699 |  |  | 
| 700 |  |     /// init in probe side | 
| 701 |  |     std::vector<vectorized::VExprContextSPtrs> probe_child_exprs_lists; | 
| 702 |  |  | 
| 703 |  |     std::atomic<bool> ready_for_read = false; | 
| 704 |  |  | 
| 705 |  |     /// called in setup_local_state | 
| 706 |  |     Status hash_table_init(); | 
| 707 |  | }; | 
| 708 |  |  | 
| 709 |  | enum class ExchangeType : uint8_t { | 
| 710 |  |     NOOP = 0, | 
| 711 |  |     // Shuffle data by Crc32HashPartitioner<LocalExchangeChannelIds>. | 
| 712 |  |     HASH_SHUFFLE = 1, | 
| 713 |  |     // Round-robin passthrough data blocks. | 
| 714 |  |     PASSTHROUGH = 2, | 
| 715 |  |     // Shuffle data by Crc32HashPartitioner<ShuffleChannelIds> (e.g. same as storage engine). | 
| 716 |  |     BUCKET_HASH_SHUFFLE = 3, | 
| 717 |  |     // Passthrough data blocks to all channels. | 
| 718 |  |     BROADCAST = 4, | 
| 719 |  |     // Passthrough data to channels evenly in an adaptive way. | 
| 720 |  |     ADAPTIVE_PASSTHROUGH = 5, | 
| 721 |  |     // Send all data to the first channel. | 
| 722 |  |     PASS_TO_ONE = 6, | 
| 723 |  | }; | 
| 724 |  |  | 
| 725 | 63 | inline std::string get_exchange_type_name(ExchangeType idx) { | 
| 726 | 63 |     switch (idx) { | 
| 727 | 13 |     case ExchangeType::NOOP: | 
| 728 | 13 |         return "NOOP"; | 
| 729 | 49 |     case ExchangeType::HASH_SHUFFLE: | 
| 730 | 49 |         return "HASH_SHUFFLE"; | 
| 731 | 1 |     case ExchangeType::PASSTHROUGH: | 
| 732 | 1 |         return "PASSTHROUGH"; | 
| 733 | 0 |     case ExchangeType::BUCKET_HASH_SHUFFLE: | 
| 734 | 0 |         return "BUCKET_HASH_SHUFFLE"; | 
| 735 | 0 |     case ExchangeType::BROADCAST: | 
| 736 | 0 |         return "BROADCAST"; | 
| 737 | 0 |     case ExchangeType::ADAPTIVE_PASSTHROUGH: | 
| 738 | 0 |         return "ADAPTIVE_PASSTHROUGH"; | 
| 739 | 0 |     case ExchangeType::PASS_TO_ONE: | 
| 740 | 0 |         return "PASS_TO_ONE"; | 
| 741 | 63 |     } | 
| 742 | 0 |     throw Exception(Status::FatalError("__builtin_unreachable")); | 
| 743 | 63 | } | 
| 744 |  |  | 
| 745 |  | struct DataDistribution { | 
| 746 | 144k |     DataDistribution(ExchangeType type) : distribution_type(type) {} | 
| 747 |  |     DataDistribution(ExchangeType type, const std::vector<TExpr>& partition_exprs_) | 
| 748 | 52 |             : distribution_type(type), partition_exprs(partition_exprs_) {} | 
| 749 | 0 |     DataDistribution(const DataDistribution& other) = default; | 
| 750 | 5 |     bool need_local_exchange() const { return distribution_type != ExchangeType::NOOP; } | 
| 751 | 5 |     DataDistribution& operator=(const DataDistribution& other) = default; | 
| 752 |  |     ExchangeType distribution_type; | 
| 753 |  |     std::vector<TExpr> partition_exprs; | 
| 754 |  | }; | 
| 755 |  |  | 
| 756 |  | class ExchangerBase; | 
| 757 |  |  | 
| 758 |  | struct LocalExchangeSharedState : public BasicSharedState { | 
| 759 |  | public: | 
| 760 |  |     ENABLE_FACTORY_CREATOR(LocalExchangeSharedState); | 
| 761 |  |     LocalExchangeSharedState(int num_instances); | 
| 762 |  |     ~LocalExchangeSharedState() override; | 
| 763 |  |     std::unique_ptr<ExchangerBase> exchanger {}; | 
| 764 |  |     std::vector<RuntimeProfile::Counter*> mem_counters; | 
| 765 |  |     std::atomic<int64_t> mem_usage = 0; | 
| 766 |  |     std::atomic<size_t> _buffer_mem_limit = config::local_exchange_buffer_mem_limit; | 
| 767 |  |     // We need to make sure to add mem_usage first and then enqueue, otherwise sub mem_usage may cause negative mem_usage during concurrent dequeue. | 
| 768 |  |     std::mutex le_lock; | 
| 769 |  |     void sub_running_sink_operators(); | 
| 770 |  |     void sub_running_source_operators(); | 
| 771 | 10 |     void _set_always_ready() { | 
| 772 | 40 |         for (auto& dep : source_deps) { | 
| 773 | 40 |             DCHECK(dep); | 
| 774 | 40 |             dep->set_always_ready(); | 
| 775 | 40 |         } | 
| 776 | 10 |         for (auto& dep : sink_deps) { | 
| 777 | 10 |             DCHECK(dep); | 
| 778 | 10 |             dep->set_always_ready(); | 
| 779 | 10 |         } | 
| 780 | 10 |     } | 
| 781 |  |  | 
| 782 | 0 |     Dependency* get_sink_dep_by_channel_id(int channel_id) { return nullptr; } | 
| 783 |  |  | 
| 784 | 129 |     void set_ready_to_read(int channel_id) { | 
| 785 | 129 |         auto& dep = source_deps[channel_id]; | 
| 786 | 129 |         DCHECK(dep) << channel_id; | 
| 787 | 129 |         dep->set_ready(); | 
| 788 | 129 |     } | 
| 789 |  |  | 
| 790 | 161 |     void add_mem_usage(int channel_id, size_t delta) { mem_counters[channel_id]->update(delta); } | 
| 791 |  |  | 
| 792 | 125 |     void sub_mem_usage(int channel_id, size_t delta) { | 
| 793 | 125 |         mem_counters[channel_id]->update(-(int64_t)delta); | 
| 794 | 125 |     } | 
| 795 |  |  | 
| 796 | 114 |     void add_total_mem_usage(size_t delta) { | 
| 797 | 114 |         if (cast_set<int64_t>(mem_usage.fetch_add(delta) + delta) > _buffer_mem_limit) { | 
| 798 | 15 |             sink_deps.front()->block(); | 
| 799 | 15 |         } | 
| 800 | 114 |     } | 
| 801 |  |  | 
| 802 | 114 |     void sub_total_mem_usage(size_t delta) { | 
| 803 | 114 |         auto prev_usage = mem_usage.fetch_sub(delta); | 
| 804 | 114 |         DCHECK_GE(prev_usage - delta, 0) << "prev_usage: " << prev_usage << " delta: " << delta; | 
| 805 | 114 |         if (cast_set<int64_t>(prev_usage - delta) <= _buffer_mem_limit) { | 
| 806 | 102 |             sink_deps.front()->set_ready(); | 
| 807 | 102 |         } | 
| 808 | 114 |     } | 
| 809 |  |  | 
| 810 | 0 |     void set_low_memory_mode(RuntimeState* state) { | 
| 811 | 0 |         _buffer_mem_limit = std::min<int64_t>(config::local_exchange_buffer_mem_limit, | 
| 812 | 0 |                                               state->low_memory_mode_buffer_limit()); | 
| 813 | 0 |     } | 
| 814 |  | }; | 
| 815 |  |  | 
| 816 |  | #include "common/compile_check_end.h" | 
| 817 |  | } // namespace doris::pipeline |