Coverage Report

Created: 2025-05-21 16:20

/root/doris/be/src/runtime/fragment_mgr.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/Types_types.h>
21
#include <gen_cpp/types.pb.h>
22
#include <stdint.h>
23
24
#include <condition_variable>
25
#include <functional>
26
#include <iosfwd>
27
#include <memory>
28
#include <mutex>
29
#include <string>
30
#include <unordered_map>
31
#include <vector>
32
33
#include "common/status.h"
34
#include "gutil/ref_counted.h"
35
#include "http/rest_monitor_iface.h"
36
#include "runtime/query_context.h"
37
#include "runtime_filter_mgr.h"
38
#include "util/countdown_latch.h"
39
#include "util/hash_util.hpp" // IWYU pragma: keep
40
#include "util/metrics.h"
41
42
namespace butil {
43
class IOBufAsZeroCopyInputStream;
44
}
45
46
namespace doris {
47
48
namespace pipeline {
49
class PipelineFragmentContext;
50
class PipelineXFragmentContext;
51
} // namespace pipeline
52
class QueryContext;
53
class ExecEnv;
54
class PlanFragmentExecutor;
55
class ThreadPool;
56
class TExecPlanFragmentParams;
57
class PExecPlanFragmentStartRequest;
58
class PMergeFilterRequest;
59
class PPublishFilterRequest;
60
class RuntimeProfile;
61
class RuntimeState;
62
class TPipelineFragmentParams;
63
class TPipelineInstanceParams;
64
class TScanColumnDesc;
65
class TScanOpenParams;
66
class Thread;
67
class WorkloadQueryInfo;
68
69
std::string to_load_error_http_path(const std::string& file_name);
70
71
template <typename Key, typename Value, typename ValueType>
72
class ConcurrentContextMap {
73
public:
74
    using ApplyFunction = std::function<Status(phmap::flat_hash_map<Key, Value>&)>;
75
    ConcurrentContextMap();
76
    Value find(const Key& query_id);
77
    void insert(const Key& query_id, std::shared_ptr<ValueType>);
78
    void clear();
79
    void erase(const Key& query_id);
80
4
    size_t num_items() const {
81
4
        size_t n = 0;
82
512
        for (auto& pair : _internal_map) {
83
512
            std::shared_lock lock(*pair.first);
84
512
            auto& map = pair.second;
85
512
            n += map.size();
86
512
        }
87
4
        return n;
88
4
    }
_ZNK5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_12QueryContextEES3_E9num_itemsEv
Line
Count
Source
80
4
    size_t num_items() const {
81
4
        size_t n = 0;
82
512
        for (auto& pair : _internal_map) {
83
512
            std::shared_lock lock(*pair.first);
84
512
            auto& map = pair.second;
85
512
            n += map.size();
86
512
        }
87
4
        return n;
88
4
    }
Unexecuted instantiation: _ZNK5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_20PlanFragmentExecutorEES3_E9num_itemsEv
Unexecuted instantiation: _ZNK5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_8pipeline23PipelineFragmentContextEES4_E9num_itemsEv
89
20
    void apply(ApplyFunction&& function) {
90
2.56k
        for (auto& pair : _internal_map) {
91
            // TODO: Now only the cancel worker do the GC the _query_ctx_map. each query must
92
            // do erase the finish query unless in _query_ctx_map. Rethink the logic is ok
93
2.56k
            std::unique_lock lock(*pair.first);
94
2.56k
            static_cast<void>(function(pair.second));
95
2.56k
        }
96
20
    }
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_8pipeline23PipelineFragmentContextEES4_E5applyEOSt8functionIFNS_6StatusERN5phmap13flat_hash_mapIS1_S5_NS9_4HashIS1_EENS9_7EqualToIS1_EESaISt4pairIKS1_S5_EEEEEE
Line
Count
Source
89
8
    void apply(ApplyFunction&& function) {
90
1.02k
        for (auto& pair : _internal_map) {
91
            // TODO: Now only the cancel worker do the GC the _query_ctx_map. each query must
92
            // do erase the finish query unless in _query_ctx_map. Rethink the logic is ok
93
1.02k
            std::unique_lock lock(*pair.first);
94
1.02k
            static_cast<void>(function(pair.second));
95
1.02k
        }
96
8
    }
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_20PlanFragmentExecutorEES3_E5applyEOSt8functionIFNS_6StatusERN5phmap13flat_hash_mapIS1_S4_NS8_4HashIS1_EENS8_7EqualToIS1_EESaISt4pairIKS1_S4_EEEEEE
Line
Count
Source
89
4
    void apply(ApplyFunction&& function) {
90
512
        for (auto& pair : _internal_map) {
91
            // TODO: Now only the cancel worker do the GC the _query_ctx_map. each query must
92
            // do erase the finish query unless in _query_ctx_map. Rethink the logic is ok
93
512
            std::unique_lock lock(*pair.first);
94
512
            static_cast<void>(function(pair.second));
95
512
        }
96
4
    }
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_12QueryContextEES3_E5applyEOSt8functionIFNS_6StatusERN5phmap13flat_hash_mapIS1_S4_NS8_4HashIS1_EENS8_7EqualToIS1_EESaISt4pairIKS1_S4_EEEEEE
Line
Count
Source
89
8
    void apply(ApplyFunction&& function) {
90
1.02k
        for (auto& pair : _internal_map) {
91
            // TODO: Now only the cancel worker do the GC the _query_ctx_map. each query must
92
            // do erase the finish query unless in _query_ctx_map. Rethink the logic is ok
93
1.02k
            std::unique_lock lock(*pair.first);
94
1.02k
            static_cast<void>(function(pair.second));
95
1.02k
        }
96
8
    }
97
98
    Status apply_if_not_exists(const Key& query_id, std::shared_ptr<ValueType>& query_ctx,
99
                               ApplyFunction&& function);
100
101
private:
102
    // The lock should only be used to protect the structures in fragment manager. Has to be
103
    // used in a very small scope because it may dead lock. For example, if the _lock is used
104
    // in prepare stage, the call path is  prepare --> expr prepare --> may call allocator
105
    // when allocate failed, allocator may call query_is_cancelled, query is callced will also
106
    // call _lock, so that there is dead lock.
107
    std::vector<std::pair<std::unique_ptr<std::shared_mutex>, phmap::flat_hash_map<Key, Value>>>
108
            _internal_map;
109
};
110
111
// This class used to manage all the fragment execute in this instance
112
class FragmentMgr : public RestMonitorIface {
113
public:
114
    using FinishCallback = std::function<void(RuntimeState*, Status*)>;
115
116
    FragmentMgr(ExecEnv* exec_env);
117
    ~FragmentMgr() override;
118
119
    void stop();
120
121
    // execute one plan fragment
122
    Status exec_plan_fragment(const TExecPlanFragmentParams& params, const QuerySource query_type);
123
124
    Status exec_plan_fragment(const TPipelineFragmentParams& params, const QuerySource query_type);
125
126
    void remove_pipeline_context(
127
            std::shared_ptr<pipeline::PipelineFragmentContext> pipeline_context);
128
129
    // TODO(zc): report this is over
130
    Status exec_plan_fragment(const TExecPlanFragmentParams& params, const QuerySource query_type,
131
                              const FinishCallback& cb);
132
133
    Status exec_plan_fragment(const TPipelineFragmentParams& params, const QuerySource query_type,
134
                              const FinishCallback& cb);
135
136
    Status start_query_execution(const PExecPlanFragmentStartRequest* request);
137
138
    Status trigger_pipeline_context_report(const ReportStatusRequest,
139
                                           std::shared_ptr<pipeline::PipelineFragmentContext>&&);
140
141
    // Cancel instance (pipeline or nonpipeline).
142
    void cancel_instance(const TUniqueId& instance_id, const PPlanFragmentCancelReason& reason,
143
                         const std::string& msg = "");
144
    // Cancel fragment (only pipelineX).
145
    // {query id fragment} -> PipelineXFragmentContext
146
    void cancel_fragment(const TUniqueId& query_id, int32_t fragment_id,
147
                         const PPlanFragmentCancelReason& reason, const std::string& msg = "");
148
149
    // Can be used in both version.
150
    void cancel_query(const TUniqueId& query_id, const PPlanFragmentCancelReason& reason,
151
                      const std::string& msg = "");
152
153
    void cancel_worker();
154
155
    void debug(std::stringstream& ss) override;
156
157
    // input: TScanOpenParams fragment_instance_id
158
    // output: selected_columns
159
    // execute external query, all query info are packed in TScanOpenParams
160
    Status exec_external_plan_fragment(const TScanOpenParams& params,
161
                                       const TUniqueId& fragment_instance_id,
162
                                       std::vector<TScanColumnDesc>* selected_columns);
163
164
    Status apply_filter(const PPublishFilterRequest* request,
165
                        butil::IOBufAsZeroCopyInputStream* attach_data);
166
167
    Status apply_filterv2(const PPublishFilterRequestV2* request,
168
                          butil::IOBufAsZeroCopyInputStream* attach_data);
169
170
    Status merge_filter(const PMergeFilterRequest* request,
171
                        butil::IOBufAsZeroCopyInputStream* attach_data);
172
173
    Status send_filter_size(const PSendFilterSizeRequest* request);
174
175
    Status sync_filter_size(const PSyncFilterSizeRequest* request);
176
177
    std::string to_http_path(const std::string& file_name);
178
179
    void coordinator_callback(const ReportStatusRequest& req);
180
181
0
    ThreadPool* get_thread_pool() { return _thread_pool.get(); }
182
183
    std::shared_ptr<QueryContext> get_query_context(const TUniqueId& query_id);
184
185
0
    int32_t running_query_num() { return _query_ctx_map.num_items(); }
186
187
    std::string dump_pipeline_tasks(int64_t duration = 0);
188
    std::string dump_pipeline_tasks(TUniqueId& query_id);
189
190
    void get_runtime_query_info(std::vector<WorkloadQueryInfo>* _query_info_list);
191
192
private:
193
    void cancel_unlocked_impl(const TUniqueId& id, const PPlanFragmentCancelReason& reason,
194
                              const std::unique_lock<std::mutex>& state_lock, bool is_pipeline,
195
                              const std::string& msg = "");
196
197
    void _exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_executor,
198
                      const FinishCallback& cb);
199
    struct BrpcItem {
200
        TNetworkAddress network_address;
201
        std::vector<std::weak_ptr<QueryContext>> queries;
202
    };
203
204
    std::shared_ptr<QueryContext> _get_or_erase_query_ctx(const TUniqueId& query_id);
205
206
    template <typename Param>
207
    void _set_scan_concurrency(const Param& params, QueryContext* query_ctx);
208
209
    void _setup_shared_hashtable_for_broadcast_join(const TExecPlanFragmentParams& params,
210
                                                    QueryContext* query_ctx);
211
212
    void _setup_shared_hashtable_for_broadcast_join(const TPipelineFragmentParams& params,
213
                                                    const TPipelineInstanceParams& local_params,
214
                                                    QueryContext* query_ctx);
215
216
    void _setup_shared_hashtable_for_broadcast_join(const TPipelineFragmentParams& params,
217
                                                    QueryContext* query_ctx);
218
219
    template <typename Params>
220
    Status _get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline,
221
                          QuerySource query_type, std::shared_ptr<QueryContext>& query_ctx);
222
223
    void _check_brpc_available(const std::shared_ptr<PBackendService_Stub>& brpc_stub,
224
                               const BrpcItem& brpc_item);
225
226
    // This is input params
227
    ExecEnv* _exec_env = nullptr;
228
229
    // Make sure that remove this before no data reference PlanFragmentExecutor
230
    // (QueryID, FragmentID) -> PipelineFragmentContext
231
    ConcurrentContextMap<TUniqueId, std::shared_ptr<PlanFragmentExecutor>, PlanFragmentExecutor>
232
            _fragment_instance_map;
233
    // (QueryID, FragmentID) -> PipelineFragmentContext
234
    ConcurrentContextMap<TUniqueId, std::shared_ptr<pipeline::PipelineFragmentContext>,
235
                         pipeline::PipelineFragmentContext>
236
            _pipeline_map;
237
238
    // query id -> QueryContext
239
    ConcurrentContextMap<TUniqueId, std::shared_ptr<QueryContext>, QueryContext> _query_ctx_map;
240
    std::unordered_map<TUniqueId, std::unordered_map<int, int64_t>> _bf_size_map;
241
242
    CountDownLatch _stop_background_threads_latch;
243
    scoped_refptr<Thread> _cancel_thread;
244
    // every job is a pool
245
    std::unique_ptr<ThreadPool> _thread_pool;
246
247
    std::shared_ptr<MetricEntity> _entity;
248
    UIntGauge* timeout_canceled_fragment_count = nullptr;
249
250
    RuntimeFilterMergeController _runtimefilter_controller;
251
    std::unique_ptr<ThreadPool> _async_report_thread_pool =
252
            nullptr; // used for pipeliine context report
253
};
254
255
} // namespace doris