Coverage Report

Created: 2024-11-20 15:53

/root/doris/be/src/service/backend_service.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/BackendService.h>
21
#include <stdint.h>
22
23
#include <memory>
24
#include <string>
25
#include <vector>
26
27
#include "agent/agent_server.h"
28
#include "agent/topic_subscriber.h"
29
#include "common/status.h"
30
31
namespace doris {
32
33
class ExecEnv;
34
class ThriftServer;
35
class TAgentResult;
36
class TAgentTaskRequest;
37
class TAgentPublishRequest;
38
class TExecPlanFragmentParams;
39
class TExecPlanFragmentResult;
40
class TCancelPlanFragmentResult;
41
class TTransmitDataResult;
42
class TExportTaskRequest;
43
class TExportStatusResult;
44
class TStreamLoadRecordResult;
45
class TDiskTrashInfo;
46
class TCancelPlanFragmentParams;
47
class TCheckStorageFormatResult;
48
class TRoutineLoadTask;
49
class TScanBatchResult;
50
class TScanCloseParams;
51
class TScanCloseResult;
52
class TScanNextBatchParams;
53
class TScanOpenParams;
54
class TScanOpenResult;
55
class TSnapshotRequest;
56
class TStatus;
57
class TTabletStatResult;
58
class TTransmitDataParams;
59
class TUniqueId;
60
class TIngestBinlogRequest;
61
class TIngestBinlogResult;
62
class ThreadPool;
63
64
// This class just forward rpc for actual handler
65
// make this class because we can bind multiple service on single point
66
class BackendService : public BackendServiceIf {
67
public:
68
    BackendService(ExecEnv* exec_env);
69
70
    ~BackendService() override = default;
71
72
    // NOTE: now we do not support multiple backend in one process
73
    static Status create_service(ExecEnv* exec_env, int port,
74
                                 std::unique_ptr<ThriftServer>* server);
75
76
    // Agent service
77
    void submit_tasks(TAgentResult& return_value,
78
0
                      const std::vector<TAgentTaskRequest>& tasks) override {
79
0
        _agent_server->submit_tasks(return_value, tasks);
80
0
    }
81
82
    void make_snapshot(TAgentResult& return_value,
83
0
                       const TSnapshotRequest& snapshot_request) override {
84
0
        _agent_server->make_snapshot(return_value, snapshot_request);
85
0
    }
86
87
0
    void release_snapshot(TAgentResult& return_value, const std::string& snapshot_path) override {
88
0
        _agent_server->release_snapshot(return_value, snapshot_path);
89
0
    }
90
91
0
    void publish_cluster_state(TAgentResult& result, const TAgentPublishRequest& request) override {
92
0
        _agent_server->publish_cluster_state(result, request);
93
0
    }
94
95
    void publish_topic_info(TPublishTopicResult& result,
96
0
                            const TPublishTopicRequest& topic_request) override {
97
0
        _agent_server->get_topic_subscriber()->handle_topic_info(topic_request);
98
0
    }
99
100
    // DorisServer service
101
    void exec_plan_fragment(TExecPlanFragmentResult& return_val,
102
                            const TExecPlanFragmentParams& params) override;
103
104
    void cancel_plan_fragment(TCancelPlanFragmentResult& return_val,
105
                              const TCancelPlanFragmentParams& params) override;
106
107
    void transmit_data(TTransmitDataResult& return_val, const TTransmitDataParams& params) override;
108
109
    void submit_export_task(TStatus& t_status, const TExportTaskRequest& request) override;
110
111
    void get_export_status(TExportStatusResult& result, const TUniqueId& task_id) override;
112
113
    void erase_export_task(TStatus& t_status, const TUniqueId& task_id) override;
114
115
    void get_tablet_stat(TTabletStatResult& result) override;
116
117
    int64_t get_trash_used_capacity() override;
118
119
    void get_disk_trash_used_capacity(std::vector<TDiskTrashInfo>& diskTrashInfos) override;
120
121
    void submit_routine_load_task(TStatus& t_status,
122
                                  const std::vector<TRoutineLoadTask>& tasks) override;
123
124
    // used for external service, open means start the scan procedure
125
    void open_scanner(TScanOpenResult& result_, const TScanOpenParams& params) override;
126
127
    // used for external service, external use getNext to fetch data batch after batch until eos = true
128
    void get_next(TScanBatchResult& result_, const TScanNextBatchParams& params) override;
129
130
    // used for external service, close some context and release resource related with this context
131
    void close_scanner(TScanCloseResult& result_, const TScanCloseParams& params) override;
132
133
    void get_stream_load_record(TStreamLoadRecordResult& result,
134
                                const int64_t last_stream_record_time) override;
135
136
    void check_storage_format(TCheckStorageFormatResult& result) override;
137
138
    void ingest_binlog(TIngestBinlogResult& result, const TIngestBinlogRequest& request) override;
139
140
    void query_ingest_binlog(TQueryIngestBinlogResult& result,
141
                             const TQueryIngestBinlogRequest& request) override;
142
143
private:
144
    Status start_plan_fragment_execution(const TExecPlanFragmentParams& exec_params);
145
    ExecEnv* _exec_env = nullptr;
146
    std::unique_ptr<AgentServer> _agent_server;
147
    std::unique_ptr<ThreadPool> _ingest_binlog_workers;
148
};
149
150
} // namespace doris