Coverage Report

Created: 2025-03-10 17:42

/root/doris/be/src/service/backend_service.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/BackendService.h>
21
22
#include <memory>
23
#include <string>
24
#include <vector>
25
26
#include "agent/agent_server.h"
27
#include "agent/topic_subscriber.h"
28
#include "common/status.h"
29
#include "runtime/stream_load/stream_load_recorder.h"
30
31
namespace doris {
32
33
class StorageEngine;
34
class ExecEnv;
35
class ThriftServer;
36
class TAgentResult;
37
class TAgentTaskRequest;
38
class TAgentPublishRequest;
39
class TExecPlanFragmentParams;
40
class TExecPlanFragmentResult;
41
class TCancelPlanFragmentResult;
42
class TTransmitDataResult;
43
class TExportTaskRequest;
44
class TExportStatusResult;
45
class TStreamLoadRecordResult;
46
class TDiskTrashInfo;
47
class TCancelPlanFragmentParams;
48
class TCheckStorageFormatResult;
49
class TRoutineLoadTask;
50
class TScanBatchResult;
51
class TScanCloseParams;
52
class TScanCloseResult;
53
class TScanNextBatchParams;
54
class TScanOpenParams;
55
class TScanOpenResult;
56
class TSnapshotRequest;
57
class TStatus;
58
class TTabletStatResult;
59
class TTransmitDataParams;
60
class TUniqueId;
61
class TIngestBinlogRequest;
62
class TIngestBinlogResult;
63
class ThreadPool;
64
65
// This class just forward rpc for actual handler
66
// make this class because we can bind multiple service on single point
67
class BaseBackendService : public BackendServiceIf {
68
public:
69
    BaseBackendService(ExecEnv* exec_env);
70
71
    ~BaseBackendService() override;
72
73
    // Agent service
74
    void submit_tasks(TAgentResult& return_value,
75
0
                      const std::vector<TAgentTaskRequest>& tasks) override {
76
0
        _agent_server->submit_tasks(return_value, tasks);
77
0
    }
78
79
0
    void publish_cluster_state(TAgentResult& result, const TAgentPublishRequest& request) override {
80
0
        _agent_server->publish_cluster_state(result, request);
81
0
    }
82
83
    void publish_topic_info(TPublishTopicResult& result,
84
0
                            const TPublishTopicRequest& topic_request) override {
85
0
        _agent_server->get_topic_subscriber()->handle_topic_info(topic_request);
86
0
    }
87
88
    // DorisServer service
89
    void exec_plan_fragment(TExecPlanFragmentResult& return_val,
90
                            const TExecPlanFragmentParams& params) override;
91
92
    void cancel_plan_fragment(TCancelPlanFragmentResult& return_val,
93
0
                              const TCancelPlanFragmentParams& params) override {};
94
95
    void submit_export_task(TStatus& t_status, const TExportTaskRequest& request) override;
96
97
    void get_export_status(TExportStatusResult& result, const TUniqueId& task_id) override;
98
99
    void erase_export_task(TStatus& t_status, const TUniqueId& task_id) override;
100
101
    void submit_routine_load_task(TStatus& t_status,
102
                                  const std::vector<TRoutineLoadTask>& tasks) override;
103
104
    // used for external service, open means start the scan procedure
105
    void open_scanner(TScanOpenResult& result_, const TScanOpenParams& params) override;
106
107
    // used for external service, external use getNext to fetch data batch after batch until eos = true
108
    void get_next(TScanBatchResult& result_, const TScanNextBatchParams& params) override;
109
110
    // used for external service, close some context and release resource related with this context
111
    void close_scanner(TScanCloseResult& result_, const TScanCloseParams& params) override;
112
113
    ////////////////////////////////////////////////////////////////////////////
114
    // begin local backend functions
115
    ////////////////////////////////////////////////////////////////////////////
116
    void get_tablet_stat(TTabletStatResult& result) override;
117
118
    int64_t get_trash_used_capacity() override;
119
120
    void get_stream_load_record(TStreamLoadRecordResult& result,
121
                                int64_t last_stream_record_time) override;
122
123
    void get_disk_trash_used_capacity(std::vector<TDiskTrashInfo>& diskTrashInfos) override;
124
125
    void make_snapshot(TAgentResult& return_value,
126
                       const TSnapshotRequest& snapshot_request) override;
127
128
    void release_snapshot(TAgentResult& return_value, const std::string& snapshot_path) override;
129
130
    void check_storage_format(TCheckStorageFormatResult& result) override;
131
132
    void ingest_binlog(TIngestBinlogResult& result, const TIngestBinlogRequest& request) override;
133
134
    void query_ingest_binlog(TQueryIngestBinlogResult& result,
135
                             const TQueryIngestBinlogRequest& request) override;
136
137
    void get_realtime_exec_status(TGetRealtimeExecStatusResponse& response,
138
                                  const TGetRealtimeExecStatusRequest& request) override;
139
140
    ////////////////////////////////////////////////////////////////////////////
141
    // begin cloud backend functions
142
    ////////////////////////////////////////////////////////////////////////////
143
    void warm_up_cache_async(TWarmUpCacheAsyncResponse& response,
144
                             const TWarmUpCacheAsyncRequest& request) override;
145
146
    void check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse& response,
147
                                   const TCheckWarmUpCacheAsyncRequest& request) override;
148
149
    // If another cluster load, FE need to notify the cluster to sync the load data
150
    void sync_load_for_tablets(TSyncLoadForTabletsResponse& response,
151
                               const TSyncLoadForTabletsRequest& request) override;
152
153
    void get_top_n_hot_partitions(TGetTopNHotPartitionsResponse& response,
154
                                  const TGetTopNHotPartitionsRequest& request) override;
155
156
    void warm_up_tablets(TWarmUpTabletsResponse& response,
157
                         const TWarmUpTabletsRequest& request) override;
158
159
0
    void stop_works() { _agent_server->stop_report_workers(); }
160
161
protected:
162
    Status start_plan_fragment_execution(const TExecPlanFragmentParams& exec_params);
163
164
    void get_stream_load_record(TStreamLoadRecordResult& result, int64_t last_stream_record_time,
165
                                std::shared_ptr<StreamLoadRecorder> stream_load_recorder);
166
167
    ExecEnv* _exec_env = nullptr;
168
    std::unique_ptr<AgentServer> _agent_server;
169
    std::unique_ptr<ThreadPool> _ingest_binlog_workers;
170
};
171
172
// `StorageEngine` mixin for `BaseBackendService`
173
class BackendService final : public BaseBackendService {
174
public:
175
    // NOTE: now we do not support multiple backend in one process
176
    static Status create_service(StorageEngine& engine, ExecEnv* exec_env, int port,
177
                                 std::unique_ptr<ThriftServer>* server,
178
                                 std::shared_ptr<doris::BackendService> service);
179
180
    BackendService(StorageEngine& engine, ExecEnv* exec_env);
181
182
    ~BackendService() override;
183
184
    void get_tablet_stat(TTabletStatResult& result) override;
185
186
    int64_t get_trash_used_capacity() override;
187
188
    void get_stream_load_record(TStreamLoadRecordResult& result,
189
                                int64_t last_stream_record_time) override;
190
191
    void get_disk_trash_used_capacity(std::vector<TDiskTrashInfo>& diskTrashInfos) override;
192
193
    void make_snapshot(TAgentResult& return_value,
194
                       const TSnapshotRequest& snapshot_request) override;
195
196
    void release_snapshot(TAgentResult& return_value, const std::string& snapshot_path) override;
197
198
    void check_storage_format(TCheckStorageFormatResult& result) override;
199
200
    void ingest_binlog(TIngestBinlogResult& result, const TIngestBinlogRequest& request) override;
201
202
    void query_ingest_binlog(TQueryIngestBinlogResult& result,
203
                             const TQueryIngestBinlogRequest& request) override;
204
205
private:
206
    StorageEngine& _engine;
207
};
208
209
} // namespace doris