Coverage Report

Created: 2024-11-20 15:52

/root/doris/be/src/runtime/fragment_mgr.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/FrontendService_types.h>
21
#include <gen_cpp/QueryPlanExtra_types.h>
22
#include <gen_cpp/Types_types.h>
23
#include <gen_cpp/types.pb.h>
24
#include <stdint.h>
25
26
#include <condition_variable>
27
#include <functional>
28
#include <iosfwd>
29
#include <memory>
30
#include <mutex>
31
#include <string>
32
#include <unordered_map>
33
#include <vector>
34
35
#include "common/status.h"
36
#include "gutil/ref_counted.h"
37
#include "http/rest_monitor_iface.h"
38
#include "runtime/query_context.h"
39
#include "runtime_filter_mgr.h"
40
#include "util/countdown_latch.h"
41
#include "util/hash_util.hpp" // IWYU pragma: keep
42
#include "util/metrics.h"
43
44
namespace butil {
45
class IOBufAsZeroCopyInputStream;
46
}
47
48
namespace doris {
49
extern bvar::Adder<uint64_t> g_fragment_executing_count;
50
extern bvar::Status<uint64_t> g_fragment_last_active_time;
51
52
namespace pipeline {
53
class PipelineFragmentContext;
54
} // namespace pipeline
55
class QueryContext;
56
class ExecEnv;
57
class ThreadPool;
58
class TExecPlanFragmentParams;
59
class PExecPlanFragmentStartRequest;
60
class PMergeFilterRequest;
61
class PPublishFilterRequest;
62
class RuntimeProfile;
63
class RuntimeState;
64
class TPipelineFragmentParams;
65
class TPipelineInstanceParams;
66
class TScanColumnDesc;
67
class TScanOpenParams;
68
class Thread;
69
class WorkloadQueryInfo;
70
71
std::string to_load_error_http_path(const std::string& file_name);
72
73
// This class used to manage all the fragment execute in this instance
74
class FragmentMgr : public RestMonitorIface {
75
public:
76
    using FinishCallback = std::function<void(RuntimeState*, Status*)>;
77
78
    FragmentMgr(ExecEnv* exec_env);
79
    ~FragmentMgr() override;
80
81
    void stop();
82
83
    // execute one plan fragment
84
    Status exec_plan_fragment(const TExecPlanFragmentParams& params, const QuerySource query_type);
85
86
    Status exec_plan_fragment(const TPipelineFragmentParams& params, const QuerySource query_type);
87
88
    void remove_pipeline_context(
89
            std::shared_ptr<pipeline::PipelineFragmentContext> pipeline_context);
90
91
    // TODO(zc): report this is over
92
    Status exec_plan_fragment(const TExecPlanFragmentParams& params, const QuerySource query_type,
93
                              const FinishCallback& cb);
94
95
    Status exec_plan_fragment(const TPipelineFragmentParams& params, const QuerySource query_type,
96
                              const FinishCallback& cb);
97
98
    Status start_query_execution(const PExecPlanFragmentStartRequest* request);
99
100
    Status trigger_pipeline_context_report(const ReportStatusRequest,
101
                                           std::shared_ptr<pipeline::PipelineFragmentContext>&&);
102
103
    // Can be used in both version.
104
    void cancel_query(const TUniqueId query_id, const Status reason);
105
106
    void cancel_worker();
107
108
    void debug(std::stringstream& ss) override;
109
110
    // input: TQueryPlanInfo fragment_instance_id
111
    // output: selected_columns
112
    // execute external query, all query info are packed in TScanOpenParams
113
    Status exec_external_plan_fragment(const TScanOpenParams& params,
114
                                       const TQueryPlanInfo& t_query_plan_info,
115
                                       const TUniqueId& query_id,
116
                                       const TUniqueId& fragment_instance_id,
117
                                       std::vector<TScanColumnDesc>* selected_columns);
118
119
    Status apply_filterv2(const PPublishFilterRequestV2* request,
120
                          butil::IOBufAsZeroCopyInputStream* attach_data);
121
122
    Status merge_filter(const PMergeFilterRequest* request,
123
                        butil::IOBufAsZeroCopyInputStream* attach_data);
124
125
    Status send_filter_size(const PSendFilterSizeRequest* request);
126
127
    Status sync_filter_size(const PSyncFilterSizeRequest* request);
128
129
    std::string to_http_path(const std::string& file_name);
130
131
    void coordinator_callback(const ReportStatusRequest& req);
132
133
0
    ThreadPool* get_thread_pool() { return _thread_pool.get(); }
134
135
0
    int32_t running_query_num() {
136
0
        std::unique_lock<std::mutex> ctx_lock(_lock);
137
0
        return _query_ctx_map.size();
138
0
    }
139
140
    std::string dump_pipeline_tasks(int64_t duration = 0);
141
    std::string dump_pipeline_tasks(TUniqueId& query_id);
142
143
    void get_runtime_query_info(std::vector<WorkloadQueryInfo>* _query_info_list);
144
145
    Status get_realtime_exec_status(const TUniqueId& query_id,
146
                                    TReportExecStatusParams* exec_status);
147
148
    std::shared_ptr<QueryContext> get_or_erase_query_ctx_with_lock(const TUniqueId& query_id);
149
150
private:
151
    struct BrpcItem {
152
        TNetworkAddress network_address;
153
        std::vector<std::weak_ptr<QueryContext>> queries;
154
    };
155
156
    std::shared_ptr<QueryContext> _get_or_erase_query_ctx(const TUniqueId& query_id);
157
158
    template <typename Param>
159
    void _set_scan_concurrency(const Param& params, QueryContext* query_ctx);
160
161
    template <typename Params>
162
    Status _get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline,
163
                          QuerySource query_type, std::shared_ptr<QueryContext>& query_ctx);
164
165
    void _check_brpc_available(const std::shared_ptr<PBackendService_Stub>& brpc_stub,
166
                               const BrpcItem& brpc_item);
167
168
    // This is input params
169
    ExecEnv* _exec_env = nullptr;
170
171
    // The lock should only be used to protect the structures in fragment manager. Has to be
172
    // used in a very small scope because it may dead lock. For example, if the _lock is used
173
    // in prepare stage, the call path is  prepare --> expr prepare --> may call allocator
174
    // when allocate failed, allocator may call query_is_cancelled, query is callced will also
175
    // call _lock, so that there is dead lock.
176
    std::mutex _lock;
177
178
    // (QueryID, FragmentID) -> PipelineFragmentContext
179
    std::unordered_map<std::pair<TUniqueId, int>,
180
                       std::shared_ptr<pipeline::PipelineFragmentContext>>
181
            _pipeline_map;
182
183
    // query id -> QueryContext
184
    std::unordered_map<TUniqueId, std::weak_ptr<QueryContext>> _query_ctx_map;
185
    std::unordered_map<TUniqueId, std::unordered_map<int, int64_t>> _bf_size_map;
186
187
    CountDownLatch _stop_background_threads_latch;
188
    scoped_refptr<Thread> _cancel_thread;
189
    // every job is a pool
190
    std::unique_ptr<ThreadPool> _thread_pool;
191
192
    std::shared_ptr<MetricEntity> _entity;
193
    UIntGauge* timeout_canceled_fragment_count = nullptr;
194
195
    RuntimeFilterMergeController _runtimefilter_controller;
196
};
197
198
uint64_t get_fragment_executing_count();
199
uint64_t get_fragment_last_active_time();
200
201
} // namespace doris