Coverage Report

Created: 2025-07-26 00:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/runtime/fragment_mgr.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/FrontendService_types.h>
21
#include <gen_cpp/QueryPlanExtra_types.h>
22
#include <gen_cpp/Types_types.h>
23
#include <gen_cpp/types.pb.h>
24
25
#include <cstdint>
26
#include <functional>
27
#include <iosfwd>
28
#include <memory>
29
#include <mutex>
30
#include <string>
31
#include <unordered_map>
32
#include <vector>
33
34
#include "common/be_mock_util.h"
35
#include "common/status.h"
36
#include "gutil/ref_counted.h"
37
#include "http/rest_monitor_iface.h"
38
#include "runtime/query_context.h"
39
#include "runtime_filter/runtime_filter_mgr.h"
40
#include "util/countdown_latch.h"
41
#include "util/hash_util.hpp" // IWYU pragma: keep
42
#include "util/metrics.h"
43
44
namespace butil {
45
class IOBufAsZeroCopyInputStream;
46
}
47
48
namespace doris {
49
#include "common/compile_check_begin.h"
50
extern bvar::Adder<uint64_t> g_fragment_executing_count;
51
extern bvar::Status<uint64_t> g_fragment_last_active_time;
52
53
namespace pipeline {
54
class PipelineFragmentContext;
55
} // namespace pipeline
56
class QueryContext;
57
class ExecEnv;
58
class ThreadPool;
59
class TExecPlanFragmentParams;
60
class PExecPlanFragmentStartRequest;
61
class PMergeFilterRequest;
62
class RuntimeProfile;
63
class RuntimeState;
64
class TPipelineFragmentParams;
65
class TPipelineInstanceParams;
66
class TScanColumnDesc;
67
class TScanOpenParams;
68
class Thread;
69
class WorkloadQueryInfo;
70
71
std::string to_load_error_http_path(const std::string& file_name);
72
73
template <typename Key, typename Value, typename ValueType>
74
class ConcurrentContextMap {
75
public:
76
    using ApplyFunction = std::function<Status(phmap::flat_hash_map<Key, Value>&)>;
77
    ConcurrentContextMap();
78
    Value find(const Key& query_id);
79
    void insert(const Key& query_id, std::shared_ptr<ValueType>);
80
    void clear();
81
    void erase(const Key& query_id);
82
19
    size_t num_items() const {
83
19
        size_t n = 0;
84
2.43k
        for (auto& pair : _internal_map) {
85
2.43k
            std::shared_lock lock(*pair.first);
86
2.43k
            auto& map = pair.second;
87
2.43k
            n += map.size();
88
2.43k
        }
89
19
        return n;
90
19
    }
_ZNK5doris20ConcurrentContextMapINS_9TUniqueIdESt8weak_ptrINS_12QueryContextEES3_E9num_itemsEv
Line
Count
Source
82
19
    size_t num_items() const {
83
19
        size_t n = 0;
84
2.43k
        for (auto& pair : _internal_map) {
85
2.43k
            std::shared_lock lock(*pair.first);
86
2.43k
            auto& map = pair.second;
87
2.43k
            n += map.size();
88
2.43k
        }
89
19
        return n;
90
19
    }
Unexecuted instantiation: _ZNK5doris20ConcurrentContextMapISt4pairINS_9TUniqueIdEiESt10shared_ptrINS_8pipeline23PipelineFragmentContextEES6_E9num_itemsEv
91
57
    void apply(ApplyFunction&& function) {
92
7.29k
        for (auto& pair : _internal_map) {
93
            // TODO: Now only the cancel worker do the GC the _query_ctx_map. each query must
94
            // do erase the finish query unless in _query_ctx_map. Rethink the logic is ok
95
7.29k
            std::unique_lock lock(*pair.first);
96
7.29k
            static_cast<void>(function(pair.second));
97
7.29k
        }
98
57
    }
_ZN5doris20ConcurrentContextMapISt4pairINS_9TUniqueIdEiESt10shared_ptrINS_8pipeline23PipelineFragmentContextEES6_E5applyEOSt8functionIFNS_6StatusERN5phmap13flat_hash_mapIS3_S7_NSB_4HashIS3_EENSB_7EqualToIS3_EESaIS1_IKS3_S7_EEEEEE
Line
Count
Source
91
19
    void apply(ApplyFunction&& function) {
92
2.43k
        for (auto& pair : _internal_map) {
93
            // TODO: Now only the cancel worker do the GC the _query_ctx_map. each query must
94
            // do erase the finish query unless in _query_ctx_map. Rethink the logic is ok
95
2.43k
            std::unique_lock lock(*pair.first);
96
2.43k
            static_cast<void>(function(pair.second));
97
2.43k
        }
98
19
    }
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt8weak_ptrINS_12QueryContextEES3_E5applyEOSt8functionIFNS_6StatusERN5phmap13flat_hash_mapIS1_S4_NS8_4HashIS1_EENS8_7EqualToIS1_EESaISt4pairIKS1_S4_EEEEEE
Line
Count
Source
91
38
    void apply(ApplyFunction&& function) {
92
4.86k
        for (auto& pair : _internal_map) {
93
            // TODO: Now only the cancel worker do the GC the _query_ctx_map. each query must
94
            // do erase the finish query unless in _query_ctx_map. Rethink the logic is ok
95
4.86k
            std::unique_lock lock(*pair.first);
96
4.86k
            static_cast<void>(function(pair.second));
97
4.86k
        }
98
38
    }
99
100
    Status apply_if_not_exists(const Key& query_id, std::shared_ptr<ValueType>& query_ctx,
101
                               ApplyFunction&& function);
102
103
private:
104
    // The lock should only be used to protect the structures in fragment manager. Has to be
105
    // used in a very small scope because it may dead lock. For example, if the _lock is used
106
    // in prepare stage, the call path is  prepare --> expr prepare --> may call allocator
107
    // when allocate failed, allocator may call query_is_cancelled, query is callced will also
108
    // call _lock, so that there is dead lock.
109
    std::vector<std::pair<std::unique_ptr<std::shared_mutex>, phmap::flat_hash_map<Key, Value>>>
110
            _internal_map;
111
};
112
113
// This class used to manage all the fragment execute in this instance
114
class FragmentMgr : public RestMonitorIface {
115
public:
116
    using FinishCallback = std::function<void(RuntimeState*, Status*)>;
117
118
    FragmentMgr(ExecEnv* exec_env);
119
    ~FragmentMgr() override;
120
121
    void stop();
122
123
    // execute one plan fragment
124
    Status exec_plan_fragment(const TExecPlanFragmentParams& params, const QuerySource query_type);
125
126
    Status exec_plan_fragment(const TPipelineFragmentParams& params, const QuerySource query_type,
127
                              const TPipelineFragmentParamsList& parent);
128
129
    void remove_pipeline_context(std::pair<TUniqueId, int> key);
130
131
    // TODO(zc): report this is over
132
    Status exec_plan_fragment(const TExecPlanFragmentParams& params, const QuerySource query_type,
133
                              const FinishCallback& cb);
134
135
    Status exec_plan_fragment(const TPipelineFragmentParams& params, const QuerySource query_type,
136
                              const FinishCallback& cb, const TPipelineFragmentParamsList& parent);
137
138
    Status start_query_execution(const PExecPlanFragmentStartRequest* request);
139
140
    Status trigger_pipeline_context_report(const ReportStatusRequest,
141
                                           std::shared_ptr<pipeline::PipelineFragmentContext>&&);
142
143
    // Can be used in both version.
144
    MOCK_FUNCTION void cancel_query(const TUniqueId query_id, const Status reason);
145
146
    void cancel_worker();
147
148
    void debug(std::stringstream& ss) override;
149
150
    // input: TQueryPlanInfo fragment_instance_id
151
    // output: selected_columns
152
    // execute external query, all query info are packed in TScanOpenParams
153
    Status exec_external_plan_fragment(const TScanOpenParams& params,
154
                                       const TQueryPlanInfo& t_query_plan_info,
155
                                       const TUniqueId& query_id,
156
                                       const TUniqueId& fragment_instance_id,
157
                                       std::vector<TScanColumnDesc>* selected_columns);
158
159
    Status apply_filterv2(const PPublishFilterRequestV2* request,
160
                          butil::IOBufAsZeroCopyInputStream* attach_data);
161
162
    Status merge_filter(const PMergeFilterRequest* request,
163
                        butil::IOBufAsZeroCopyInputStream* attach_data);
164
165
    Status send_filter_size(const PSendFilterSizeRequest* request);
166
167
    Status sync_filter_size(const PSyncFilterSizeRequest* request);
168
169
    std::string to_http_path(const std::string& file_name);
170
171
    void coordinator_callback(const ReportStatusRequest& req);
172
173
0
    ThreadPool* get_thread_pool() { return _thread_pool.get(); }
174
175
    // When fragment mgr is going to stop, the _stop_background_threads_latch is set to 0
176
    // and other module that use fragment mgr's thread pool should get this signal and exit.
177
0
    bool shutting_down() { return _stop_background_threads_latch.count() == 0; }
178
179
0
    int32_t running_query_num() { return cast_set<int32_t>(_query_ctx_map.num_items()); }
180
181
    std::string dump_pipeline_tasks(int64_t duration = 0);
182
    std::string dump_pipeline_tasks(TUniqueId& query_id);
183
184
    void get_runtime_query_info(std::vector<std::weak_ptr<ResourceContext>>* _resource_ctx_list);
185
186
    Status get_realtime_exec_status(const TUniqueId& query_id,
187
                                    TReportExecStatusParams* exec_status);
188
    // get the query statistics of with a given query id
189
    Status get_query_statistics(const TUniqueId& query_id, TQueryStatistics* query_stats);
190
191
    std::shared_ptr<QueryContext> get_query_ctx(const TUniqueId& query_id);
192
193
private:
194
    struct BrpcItem {
195
        TNetworkAddress network_address;
196
        std::vector<std::weak_ptr<QueryContext>> queries;
197
    };
198
199
    Status _get_or_create_query_ctx(const TPipelineFragmentParams& params,
200
                                    const TPipelineFragmentParamsList& parent,
201
                                    QuerySource query_type,
202
                                    std::shared_ptr<QueryContext>& query_ctx);
203
204
    void _check_brpc_available(const std::shared_ptr<PBackendService_Stub>& brpc_stub,
205
                               const BrpcItem& brpc_item);
206
207
    // This is input params
208
    ExecEnv* _exec_env = nullptr;
209
210
    // (QueryID, FragmentID) -> PipelineFragmentContext
211
    ConcurrentContextMap<std::pair<TUniqueId, int>,
212
                         std::shared_ptr<pipeline::PipelineFragmentContext>,
213
                         pipeline::PipelineFragmentContext>
214
            _pipeline_map;
215
216
    // query id -> QueryContext
217
    ConcurrentContextMap<TUniqueId, std::weak_ptr<QueryContext>, QueryContext> _query_ctx_map;
218
    std::unordered_map<TUniqueId, std::unordered_map<int, int64_t>> _bf_size_map;
219
220
    CountDownLatch _stop_background_threads_latch;
221
    scoped_refptr<Thread> _cancel_thread;
222
    // This pool is used as global async task pool
223
    std::unique_ptr<ThreadPool> _thread_pool;
224
225
    std::shared_ptr<MetricEntity> _entity;
226
    UIntGauge* timeout_canceled_fragment_count = nullptr;
227
};
228
229
uint64_t get_fragment_executing_count();
230
uint64_t get_fragment_last_active_time();
231
#include "common/compile_check_end.h"
232
} // namespace doris