Coverage Report

Created: 2026-03-27 15:49

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/fragment_mgr.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <brpc/closure_guard.h>
21
#include <gen_cpp/FrontendService_types.h>
22
#include <gen_cpp/QueryPlanExtra_types.h>
23
#include <gen_cpp/Types_types.h>
24
#include <gen_cpp/types.pb.h>
25
26
#include <cstdint>
27
#include <functional>
28
#include <iosfwd>
29
#include <map>
30
#include <memory>
31
#include <mutex>
32
#include <set>
33
#include <string>
34
#include <unordered_map>
35
#include <vector>
36
37
#include "common/be_mock_util.h"
38
#include "common/metrics/metrics.h"
39
#include "common/status.h"
40
#include "exec/runtime_filter/runtime_filter_mgr.h"
41
#include "runtime/query_context.h"
42
#include "service/http/rest_monitor_iface.h"
43
#include "util/countdown_latch.h"
44
#include "util/hash_util.hpp" // IWYU pragma: keep
45
46
namespace butil {
47
class IOBufAsZeroCopyInputStream;
48
}
49
50
namespace doris {
51
#include "common/compile_check_begin.h"
52
extern bvar::Adder<uint64_t> g_fragment_executing_count;
53
extern bvar::Status<uint64_t> g_fragment_last_active_time;
54
55
class PipelineFragmentContext;
56
class QueryContext;
57
class ExecEnv;
58
class ThreadPool;
59
class PExecPlanFragmentStartRequest;
60
class PMergeFilterRequest;
61
class RuntimeProfile;
62
class RuntimeState;
63
class TPipelineFragmentParams;
64
class TPipelineInstanceParams;
65
class TScanColumnDesc;
66
class TScanOpenParams;
67
class Thread;
68
class WorkloadQueryInfo;
69
70
std::string to_load_error_http_path(const std::string& file_name);
71
72
template <typename Key, typename Value, typename ValueType>
73
class ConcurrentContextMap {
74
public:
75
    using ApplyFunction = std::function<Status(phmap::flat_hash_map<Key, Value>&)>;
76
    ConcurrentContextMap();
77
    Value find(const Key& query_id);
78
    void insert(const Key& query_id, std::shared_ptr<ValueType>);
79
    void clear();
80
    bool erase(const Key& query_id);
81
770
    size_t num_items() const {
82
770
        size_t n = 0;
83
98.5k
        for (auto& pair : _internal_map) {
84
98.5k
            std::shared_lock lock(*pair.first);
85
98.5k
            auto& map = pair.second;
86
98.5k
            n += map.size();
87
98.5k
        }
88
770
        return n;
89
770
    }
_ZNK5doris20ConcurrentContextMapINS_9TUniqueIdESt8weak_ptrINS_12QueryContextEES3_E9num_itemsEv
Line
Count
Source
81
698
    size_t num_items() const {
82
698
        size_t n = 0;
83
89.3k
        for (auto& pair : _internal_map) {
84
89.3k
            std::shared_lock lock(*pair.first);
85
89.3k
            auto& map = pair.second;
86
89.3k
            n += map.size();
87
89.3k
        }
88
698
        return n;
89
698
    }
_ZNK5doris20ConcurrentContextMapISt4pairINS_9TUniqueIdEiESt10shared_ptrINS_23PipelineFragmentContextEES5_E9num_itemsEv
Line
Count
Source
81
72
    size_t num_items() const {
82
72
        size_t n = 0;
83
9.21k
        for (auto& pair : _internal_map) {
84
9.21k
            std::shared_lock lock(*pair.first);
85
9.21k
            auto& map = pair.second;
86
9.21k
            n += map.size();
87
9.21k
        }
88
72
        return n;
89
72
    }
90
30.6k
    void apply(ApplyFunction&& function) {
91
3.92M
        for (auto& pair : _internal_map) {
92
            // TODO: Now only the cancel worker do the GC the _query_ctx_map. each query must
93
            // do erase the finish query unless in _query_ctx_map. Rethink the logic is ok
94
3.92M
            std::unique_lock lock(*pair.first);
95
3.92M
            static_cast<void>(function(pair.second));
96
3.92M
        }
97
30.6k
    }
_ZN5doris20ConcurrentContextMapISt4pairINS_9TUniqueIdEiESt10shared_ptrINS_23PipelineFragmentContextEES5_E5applyEOSt8functionIFNS_6StatusERN5phmap13flat_hash_mapIS3_S6_NSA_4HashIS3_EENSA_7EqualToIS3_EESaIS1_IKS3_S6_EEEEEE
Line
Count
Source
90
6.22k
    void apply(ApplyFunction&& function) {
91
796k
        for (auto& pair : _internal_map) {
92
            // TODO: Now only the cancel worker do the GC the _query_ctx_map. each query must
93
            // do erase the finish query unless in _query_ctx_map. Rethink the logic is ok
94
796k
            std::unique_lock lock(*pair.first);
95
796k
            static_cast<void>(function(pair.second));
96
796k
        }
97
6.22k
    }
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt8weak_ptrINS_12QueryContextEES3_E5applyEOSt8functionIFNS_6StatusERN5phmap13flat_hash_mapIS1_S4_NS8_4HashIS1_EENS8_7EqualToIS1_EESaISt4pairIKS1_S4_EEEEEE
Line
Count
Source
90
24.4k
    void apply(ApplyFunction&& function) {
91
3.12M
        for (auto& pair : _internal_map) {
92
            // TODO: Now only the cancel worker do the GC the _query_ctx_map. each query must
93
            // do erase the finish query unless in _query_ctx_map. Rethink the logic is ok
94
3.12M
            std::unique_lock lock(*pair.first);
95
3.12M
            static_cast<void>(function(pair.second));
96
3.12M
        }
97
24.4k
    }
98
99
    Status apply_if_not_exists(const Key& query_id, std::shared_ptr<ValueType>& query_ctx,
100
                               ApplyFunction&& function);
101
102
private:
103
    // The lock should only be used to protect the structures in fragment manager. Has to be
104
    // used in a very small scope because it may dead lock. For example, if the _lock is used
105
    // in prepare stage, the call path is  prepare --> expr prepare --> may call allocator
106
    // when allocate failed, allocator may call query_is_cancelled, query is callced will also
107
    // call _lock, so that there is dead lock.
108
    std::vector<std::pair<std::unique_ptr<std::shared_mutex>, phmap::flat_hash_map<Key, Value>>>
109
            _internal_map;
110
};
111
112
// This class used to manage all the fragment execute in this instance
113
class FragmentMgr : public RestMonitorIface {
114
public:
115
    using FinishCallback = std::function<void(RuntimeState*, Status*)>;
116
117
    FragmentMgr(ExecEnv* exec_env);
118
    ~FragmentMgr() override;
119
120
    void stop();
121
122
    // execute one plan fragment
123
124
    Status exec_plan_fragment(const TPipelineFragmentParams& params, const QuerySource query_type,
125
                              const TPipelineFragmentParamsList& parent);
126
127
    void remove_pipeline_context(std::pair<TUniqueId, int> key);
128
    void remove_query_context(const TUniqueId& key);
129
130
    // `is_prepare_success` is used by invoker to ensure callback can be handle correctly (eg. stream_load_executor)
131
    Status exec_plan_fragment(const TPipelineFragmentParams& params, const QuerySource query_type,
132
                              const FinishCallback& cb, const TPipelineFragmentParamsList& parent,
133
                              std::shared_ptr<bool> is_prepare_success = nullptr);
134
135
    Status start_query_execution(const PExecPlanFragmentStartRequest* request);
136
137
    Status trigger_pipeline_context_report(const ReportStatusRequest,
138
                                           std::shared_ptr<PipelineFragmentContext>&&);
139
140
    // Can be used in both version.
141
    MOCK_FUNCTION void cancel_query(const TUniqueId query_id, const Status reason);
142
143
    void cancel_worker();
144
145
    void debug(std::stringstream& ss) override;
146
147
    // input: TQueryPlanInfo fragment_instance_id
148
    // output: selected_columns
149
    // execute external query, all query info are packed in TScanOpenParams
150
    Status exec_external_plan_fragment(const TScanOpenParams& params,
151
                                       const TQueryPlanInfo& t_query_plan_info,
152
                                       const TUniqueId& query_id,
153
                                       const TUniqueId& fragment_instance_id,
154
                                       std::vector<TScanColumnDesc>* selected_columns);
155
156
    Status apply_filterv2(const PPublishFilterRequestV2* request,
157
                          butil::IOBufAsZeroCopyInputStream* attach_data);
158
159
    Status merge_filter(const PMergeFilterRequest* request,
160
                        butil::IOBufAsZeroCopyInputStream* attach_data);
161
162
    Status send_filter_size(const PSendFilterSizeRequest* request);
163
164
    Status sync_filter_size(const PSyncFilterSizeRequest* request);
165
166
    std::string to_http_path(const std::string& file_name);
167
168
    void coordinator_callback(const ReportStatusRequest& req);
169
170
77.5k
    ThreadPool* get_thread_pool() { return _thread_pool.get(); }
171
172
    // When fragment mgr is going to stop, the _stop_background_threads_latch is set to 0
173
    // and other module that use fragment mgr's thread pool should get this signal and exit.
174
172k
    bool shutting_down() { return _stop_background_threads_latch.count() == 0; }
175
176
470
    int32_t running_query_num() { return cast_set<int32_t>(_query_ctx_map.num_items()); }
177
178
    std::string dump_pipeline_tasks(int64_t duration = 0);
179
    std::string dump_pipeline_tasks(TUniqueId& query_id);
180
181
    void get_runtime_query_info(std::vector<std::weak_ptr<ResourceContext>>* _resource_ctx_list);
182
183
    Status get_realtime_exec_status(const TUniqueId& query_id,
184
                                    TReportExecStatusParams* exec_status);
185
    // get the query statistics of with a given query id
186
    Status get_query_statistics(const TUniqueId& query_id, TQueryStatistics* query_stats);
187
188
    std::shared_ptr<QueryContext> get_query_ctx(const TUniqueId& query_id);
189
190
    Status transmit_rec_cte_block(const TUniqueId& query_id, const TUniqueId& instance_id,
191
                                  int node_id,
192
                                  const google::protobuf::RepeatedPtrField<PBlock>& pblocks,
193
                                  bool eos);
194
195
    Status rerun_fragment(const std::shared_ptr<brpc::ClosureGuard>& guard,
196
                          const TUniqueId& query_id, int fragment,
197
                          PRerunFragmentParams_Opcode stage);
198
199
    Status reset_global_rf(const TUniqueId& query_id,
200
                           const google::protobuf::RepeatedField<int32_t>& filter_ids);
201
202
private:
203
    struct BrpcItem {
204
        TNetworkAddress network_address;
205
        std::vector<std::weak_ptr<QueryContext>> queries;
206
    };
207
208
    Status _get_or_create_query_ctx(const TPipelineFragmentParams& params,
209
                                    const TPipelineFragmentParamsList& parent,
210
                                    QuerySource query_type,
211
                                    std::shared_ptr<QueryContext>& query_ctx);
212
213
    void _collect_timeout_queries_and_brpc_items(
214
            std::vector<TUniqueId>& queries_timeout,
215
            std::unordered_map<std::shared_ptr<PBackendService_Stub>, BrpcItem>&
216
                    brpc_stub_with_queries,
217
            timespec now);
218
219
    void _collect_invalid_queries(
220
            std::vector<TUniqueId>& queries_lost_coordinator,
221
            std::vector<TUniqueId>& queries_pipeline_task_leak,
222
            const std::map<int64_t, std::unordered_set<TUniqueId>>& running_queries_on_all_fes,
223
            const std::map<TNetworkAddress, FrontendInfo>& running_fes,
224
            timespec check_invalid_query_last_timestamp);
225
226
    void _check_brpc_available(const std::shared_ptr<PBackendService_Stub>& brpc_stub,
227
                               const BrpcItem& brpc_item);
228
229
    // This is input params
230
    ExecEnv* _exec_env = nullptr;
231
232
    // (QueryID, FragmentID) -> PipelineFragmentContext
233
    ConcurrentContextMap<std::pair<TUniqueId, int>, std::shared_ptr<PipelineFragmentContext>,
234
                         PipelineFragmentContext>
235
            _pipeline_map;
236
237
    // Saved params and callback for rerunnable (recursive CTE) fragments.
238
    // Only populated when need_notify_close == true during exec_plan_fragment.
239
    // Lifecycle: created in exec_plan_fragment(), used in rerun_fragment(rebuild)
240
    // to recreate PFC with fresh state, cleaned up in remove_query_context().
241
    struct RerunableFragmentInfo {
242
        // Runtime filter IDs registered by the old PFC, collected during wait_for_destroy.
243
        // These are deregistered from the RuntimeFilterMgr before the new PFC is created.
244
        std::set<int> deregister_runtime_filter_ids;
245
        // Original params from FE, used to recreate the PFC each round.
246
        TPipelineFragmentParams params;
247
        TPipelineFragmentParamsList parent;
248
        FinishCallback finish_callback;
249
        // Hold query_ctx to prevent it from being destroyed while rerunnable fragments exist.
250
        std::shared_ptr<QueryContext> query_ctx;
251
        // Monotonically increasing stage counter, stamps runtime filter RPCs.
252
        uint32_t stage = 0;
253
    };
254
    std::mutex _rerunnable_params_lock;
255
    std::map<std::pair<TUniqueId, int>, RerunableFragmentInfo> _rerunnable_params_map;
256
257
    // query id -> QueryContext
258
    ConcurrentContextMap<TUniqueId, std::weak_ptr<QueryContext>, QueryContext> _query_ctx_map;
259
    // keep query ctx do not delete immediately to make rf coordinator merge filter work well after query eos
260
    ConcurrentContextMap<TUniqueId, std::shared_ptr<QueryContext>, QueryContext>
261
            _query_ctx_map_delay_delete;
262
263
    CountDownLatch _stop_background_threads_latch;
264
    std::shared_ptr<Thread> _cancel_thread;
265
    // This pool is used as global async task pool
266
    std::unique_ptr<ThreadPool> _thread_pool;
267
268
    std::shared_ptr<MetricEntity> _entity;
269
    UIntGauge* timeout_canceled_fragment_count = nullptr;
270
};
271
272
uint64_t get_fragment_executing_count();
273
uint64_t get_fragment_last_active_time();
274
#include "common/compile_check_end.h"
275
} // namespace doris