Coverage Report

Created: 2026-05-15 01:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/service/backend_service.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/BackendService.h>
21
22
#include <memory>
23
#include <string>
24
#include <vector>
25
26
#include "agent/agent_server.h"
27
#include "agent/topic_subscriber.h"
28
#include "common/status.h"
29
#include "load/stream_load/stream_load_recorder.h"
30
31
namespace doris {
32
33
class StorageEngine;
34
class ExecEnv;
35
class TAgentResult;
36
class TAgentTaskRequest;
37
class TAgentPublishRequest;
38
class TStreamLoadRecordResult;
39
class TDiskTrashInfo;
40
class TCheckStorageFormatResult;
41
class TRoutineLoadTask;
42
class TScanBatchResult;
43
class TScanCloseParams;
44
class TScanCloseResult;
45
class TScanNextBatchParams;
46
class TScanOpenParams;
47
class TScanOpenResult;
48
class TSnapshotRequest;
49
class TStatus;
50
class TTabletStatResult;
51
class TUniqueId;
52
class TIngestBinlogRequest;
53
class TIngestBinlogResult;
54
class ThreadPool;
55
56
// This class just forward rpc for actual handler
57
// make this class because we can bind multiple service on single point
58
class BaseBackendService : public BackendServiceIf {
59
public:
60
    BaseBackendService(ExecEnv* exec_env);
61
62
    ~BaseBackendService() override;
63
64
    // Start runtime workers that the thrift server depends on (agent workers,
65
    // ingest-binlog thread pool, etc.). Must be called before constructing the
66
    // thrift server. The name makes the side effects explicit: this is not a
67
    // lightweight preparation hook.
68
    virtual Status start_thrift_dependencies() = 0;
69
70
    // Agent service
71
    void submit_tasks(TAgentResult& return_value,
72
0
                      const std::vector<TAgentTaskRequest>& tasks) override {
73
0
        _agent_server->submit_tasks(return_value, tasks);
74
0
    }
75
76
0
    void publish_cluster_state(TAgentResult& result, const TAgentPublishRequest& request) override {
77
0
        _agent_server->publish_cluster_state(result, request);
78
0
    }
79
80
    void publish_topic_info(TPublishTopicResult& result,
81
0
                            const TPublishTopicRequest& topic_request) override {
82
0
        _agent_server->get_topic_subscriber()->handle_topic_info(topic_request);
83
0
    }
84
85
    void submit_routine_load_task(TStatus& t_status,
86
                                  const std::vector<TRoutineLoadTask>& tasks) override;
87
88
    // used for external service, open means start the scan procedure
89
    void open_scanner(TScanOpenResult& result_, const TScanOpenParams& params) override;
90
91
    // used for external service, external use getNext to fetch data batch after batch until eos = true
92
    void get_next(TScanBatchResult& result_, const TScanNextBatchParams& params) override;
93
94
    // used for external service, close some context and release resource related with this context
95
    void close_scanner(TScanCloseResult& result_, const TScanCloseParams& params) override;
96
97
    ////////////////////////////////////////////////////////////////////////////
98
    // begin local backend functions
99
    ////////////////////////////////////////////////////////////////////////////
100
    void get_tablet_stat(TTabletStatResult& result) override;
101
102
    int64_t get_trash_used_capacity() override;
103
104
    void get_stream_load_record(TStreamLoadRecordResult& result,
105
                                int64_t last_stream_record_time) override;
106
107
    void get_disk_trash_used_capacity(std::vector<TDiskTrashInfo>& diskTrashInfos) override;
108
109
    void make_snapshot(TAgentResult& return_value,
110
                       const TSnapshotRequest& snapshot_request) override;
111
112
    void release_snapshot(TAgentResult& return_value, const std::string& snapshot_path) override;
113
114
    void check_storage_format(TCheckStorageFormatResult& result) override;
115
116
    void ingest_binlog(TIngestBinlogResult& result, const TIngestBinlogRequest& request) override;
117
118
    void query_ingest_binlog(TQueryIngestBinlogResult& result,
119
                             const TQueryIngestBinlogRequest& request) override;
120
121
    void get_realtime_exec_status(TGetRealtimeExecStatusResponse& response,
122
                                  const TGetRealtimeExecStatusRequest& request) override;
123
124
    void get_dictionary_status(TDictionaryStatusList& result,
125
                               const std::vector<int64_t>& dictionary_id) override;
126
127
    void test_storage_connectivity(TTestStorageConnectivityResponse& response,
128
                                   const TTestStorageConnectivityRequest& request) override;
129
130
    void get_python_envs(std::vector<TPythonEnvInfo>& result) override;
131
132
    void get_python_packages(std::vector<TPythonPackageInfo>& result,
133
                             const std::string& python_version) override;
134
135
    ////////////////////////////////////////////////////////////////////////////
136
    // begin cloud backend functions
137
    ////////////////////////////////////////////////////////////////////////////
138
    void warm_up_cache_async(TWarmUpCacheAsyncResponse& response,
139
                             const TWarmUpCacheAsyncRequest& request) override;
140
141
    void check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse& response,
142
                                   const TCheckWarmUpCacheAsyncRequest& request) override;
143
144
    // If another cluster load, FE need to notify the cluster to sync the load data
145
    void sync_load_for_tablets(TSyncLoadForTabletsResponse& response,
146
                               const TSyncLoadForTabletsRequest& request) override;
147
148
    void get_top_n_hot_partitions(TGetTopNHotPartitionsResponse& response,
149
                                  const TGetTopNHotPartitionsRequest& request) override;
150
151
    void warm_up_tablets(TWarmUpTabletsResponse& response,
152
                         const TWarmUpTabletsRequest& request) override;
153
154
0
    void stop_works() { _agent_server->stop_report_workers(); }
155
156
protected:
157
    void get_stream_load_record(TStreamLoadRecordResult& result, int64_t last_stream_record_time,
158
                                std::shared_ptr<StreamLoadRecorder> stream_load_recorder);
159
160
    ExecEnv* _exec_env = nullptr;
161
    std::unique_ptr<AgentServer> _agent_server;
162
    std::unique_ptr<ThreadPool> _ingest_binlog_workers;
163
};
164
165
// `StorageEngine` mixin for `BaseBackendService`
166
class BackendService final : public BaseBackendService {
167
public:
168
    BackendService(StorageEngine& engine, ExecEnv* exec_env);
169
170
    ~BackendService() override;
171
172
    Status start_thrift_dependencies() override;
173
174
    void get_tablet_stat(TTabletStatResult& result) override;
175
176
    int64_t get_trash_used_capacity() override;
177
178
    void get_stream_load_record(TStreamLoadRecordResult& result,
179
                                int64_t last_stream_record_time) override;
180
181
    void get_disk_trash_used_capacity(std::vector<TDiskTrashInfo>& diskTrashInfos) override;
182
183
    void make_snapshot(TAgentResult& return_value,
184
                       const TSnapshotRequest& snapshot_request) override;
185
186
    void release_snapshot(TAgentResult& return_value, const std::string& snapshot_path) override;
187
188
    void check_storage_format(TCheckStorageFormatResult& result) override;
189
190
    void ingest_binlog(TIngestBinlogResult& result, const TIngestBinlogRequest& request) override;
191
192
    void query_ingest_binlog(TQueryIngestBinlogResult& result,
193
                             const TQueryIngestBinlogRequest& request) override;
194
195
private:
196
    StorageEngine& _engine;
197
};
198
199
} // namespace doris