Coverage Report

Created: 2026-03-12 17:15

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/agent/agent_server.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "agent/agent_server.h"
19
20
#include <gen_cpp/AgentService_types.h>
21
#include <gen_cpp/HeartbeatService_types.h>
22
#include <gen_cpp/Types_types.h>
23
#include <stdint.h>
24
#include <thrift/protocol/TDebugProtocol.h>
25
26
#include <filesystem>
27
#include <memory>
28
#include <ostream>
29
#include <string>
30
31
#include "agent/task_worker_pool.h"
32
#include "agent/topic_subscriber.h"
33
#include "agent/utils.h"
34
#include "agent/workload_group_listener.h"
35
#include "agent/workload_sched_policy_listener.h"
36
#include "cloud/config.h"
37
#include "common/config.h"
38
#include "common/logging.h"
39
#include "common/status.h"
40
#include "runtime/exec_env.h"
41
#include "storage/olap_define.h"
42
#include "storage/options.h"
43
#include "storage/snapshot/snapshot_manager.h"
44
#include "storage/storage_engine.h"
45
#include "util/work_thread_pool.hpp"
46
47
namespace doris {
48
49
AgentServer::AgentServer(ExecEnv* exec_env, const ClusterInfo* cluster_info)
50
6
        : _cluster_info(cluster_info), _topic_subscriber(new TopicSubscriber()) {
51
6
    MasterServerClient::create(cluster_info);
52
53
6
#if !defined(BE_TEST) && !defined(__APPLE__)
54
    // Add subscriber here and register listeners
55
6
    std::unique_ptr<TopicListener> wg_listener = std::make_unique<WorkloadGroupListener>(exec_env);
56
6
    LOG(INFO) << "Register workload group listener";
57
6
    _topic_subscriber->register_listener(doris::TTopicInfoType::type::WORKLOAD_GROUP,
58
6
                                         std::move(wg_listener));
59
60
6
    std::unique_ptr<TopicListener> policy_listener =
61
6
            std::make_unique<WorkloadschedPolicyListener>(exec_env);
62
6
    LOG(INFO) << "Register workload scheduler policy listener";
63
6
    _topic_subscriber->register_listener(doris::TTopicInfoType::type::WORKLOAD_SCHED_POLICY,
64
6
                                         std::move(policy_listener));
65
66
6
#endif
67
6
}
68
69
3
AgentServer::~AgentServer() = default;
70
71
class PushTaskWorkerPool final : public TaskWorkerPoolIf {
72
public:
73
    PushTaskWorkerPool(StorageEngine& engine)
74
            : _push_delete_workers(
75
6
                      TaskWorkerPool("DELETE", config::delete_worker_count,
76
22
                                     [&engine](auto&& task) { push_callback(engine, task); })),
77
6
              _push_load_workers(PriorTaskWorkerPool(
78
6
                      "PUSH", config::push_worker_count_normal_priority,
79
6
                      config::push_worker_count_high_priority,
80
6
                      [&engine](auto&& task) { push_callback(engine, task); })) {}
81
82
3
    ~PushTaskWorkerPool() override { stop(); }
83
84
3
    void stop() {
85
3
        _push_delete_workers.stop();
86
3
        _push_load_workers.stop();
87
3
    }
88
89
22
    Status submit_task(const TAgentTaskRequest& task) override {
90
22
        if (task.push_req.push_type == TPushType::LOAD_V2) {
91
0
            return _push_load_workers.submit_task(task);
92
22
        } else if (task.push_req.push_type == TPushType::DELETE) {
93
22
            return _push_delete_workers.submit_task(task);
94
22
        } else {
95
0
            return Status::InvalidArgument(
96
0
                    "task(signature={}, type={}, push_type={}) has wrong push_type", task.signature,
97
0
                    task.task_type, task.push_req.push_type);
98
0
        }
99
22
    }
100
101
private:
102
    TaskWorkerPool _push_delete_workers;
103
    PriorTaskWorkerPool _push_load_workers;
104
};
105
106
6
void AgentServer::start_workers(StorageEngine& engine, ExecEnv* exec_env) {
107
10
    for (const auto& path : exec_env->store_paths()) {
108
10
        try {
109
10
            std::string dpp_download_path_str = path.path + "/" + DPP_PREFIX;
110
10
            std::filesystem::path dpp_download_path(dpp_download_path_str);
111
10
            if (std::filesystem::exists(dpp_download_path)) {
112
0
                std::filesystem::remove_all(dpp_download_path);
113
0
            }
114
10
        } catch (...) {
115
0
            LOG(WARNING) << "boost exception when remove dpp download path. path=" << path.path;
116
0
        }
117
10
    }
118
119
    // clang-format off
120
6
    _workers[TTaskType::ALTER_INVERTED_INDEX] = std::make_unique<TaskWorkerPool>(
121
6
        "ALTER_INVERTED_INDEX", config::alter_index_worker_count, [&engine](auto&& task) { return alter_inverted_index_callback(engine, task); });
122
123
6
    _workers[TTaskType::CHECK_CONSISTENCY] = std::make_unique<TaskWorkerPool>(
124
6
        "CHECK_CONSISTENCY", config::check_consistency_worker_count, [&engine](auto&& task) { return check_consistency_callback(engine, task); });
125
126
6
    _workers[TTaskType::UPLOAD] = std::make_unique<TaskWorkerPool>(
127
16
            "UPLOAD", config::upload_worker_count, [&engine, exec_env](auto&& task) { return upload_callback(engine, exec_env, task); });
128
129
6
    _workers[TTaskType::DOWNLOAD] = std::make_unique<TaskWorkerPool>(
130
22
            "DOWNLOAD", config::download_worker_count, [&engine, exec_env](auto&& task) { return download_callback(engine, exec_env, task); });
131
132
6
    _workers[TTaskType::MAKE_SNAPSHOT] = std::make_unique<TaskWorkerPool>(
133
320
            "MAKE_SNAPSHOT", config::make_snapshot_worker_count, [&engine](auto&& task) { return make_snapshot_callback(engine, task); });
134
135
6
    _workers[TTaskType::RELEASE_SNAPSHOT] = std::make_unique<TaskWorkerPool>(
136
320
            "RELEASE_SNAPSHOT", config::release_snapshot_worker_count, [&engine](auto&& task) { return release_snapshot_callback(engine, task); });
137
138
6
    _workers[TTaskType::MOVE] = std::make_unique<TaskWorkerPool>(
139
172
            "MOVE", 1, [&engine, exec_env](auto&& task) { return move_dir_callback(engine, exec_env, task); });
140
141
6
    _workers[TTaskType::COMPACTION] = std::make_unique<TaskWorkerPool>(
142
6
            "SUBMIT_TABLE_COMPACTION", 1, [&engine](auto&& task) { return submit_table_compaction_callback(engine, task); });
143
144
6
    _workers[TTaskType::PUSH_STORAGE_POLICY] = std::make_unique<TaskWorkerPool>(
145
9
            "PUSH_STORAGE_POLICY", 1, [&engine](auto&& task) { return push_storage_policy_callback(engine, task); });
146
147
6
    _workers[TTaskType::PUSH_INDEX_POLICY] = std::make_unique<TaskWorkerPool>(
148
6
            "PUSH_INDEX_POLICY", 1, [](auto&& task) { return push_index_policy_callback(task); });
149
150
6
    _workers[TTaskType::PUSH_COOLDOWN_CONF] = std::make_unique<TaskWorkerPool>(
151
6
            "PUSH_COOLDOWN_CONF", 1, [&engine](auto&& task) { return push_cooldown_conf_callback(engine, task); });
152
153
6
    _workers[TTaskType::CREATE] = std::make_unique<TaskWorkerPool>(
154
6.06k
            "CREATE_TABLE", config::create_tablet_worker_count, [&engine](auto&& task) { return create_tablet_callback(engine, task); });
155
156
6
    _workers[TTaskType::DROP] = std::make_unique<TaskWorkerPool>(
157
11.4k
            "DROP_TABLE", config::drop_tablet_worker_count, [&engine](auto&& task) { return drop_tablet_callback(engine, task); });
158
159
6
    _workers[TTaskType::PUBLISH_VERSION] = std::make_unique<PublishVersionWorkerPool>(engine);
160
161
6
    _workers[TTaskType::CLEAR_TRANSACTION_TASK] = std::make_unique<TaskWorkerPool>(
162
14
            "CLEAR_TRANSACTION_TASK", config::clear_transaction_task_worker_count, [&engine](auto&& task) { return clear_transaction_task_callback(engine, task); });
163
164
6
    _workers[TTaskType::PUSH] = std::make_unique<PushTaskWorkerPool>(engine);
165
166
6
    _workers[TTaskType::UPDATE_TABLET_META_INFO] = std::make_unique<TaskWorkerPool>(
167
6
            "UPDATE_TABLET_META_INFO", 1, [&engine](auto&& task) { return update_tablet_meta_callback(engine, task); });
168
169
6
    _workers[TTaskType::ALTER] = std::make_unique<TaskWorkerPool>(
170
6
            "ALTER_TABLE", config::alter_tablet_worker_count, [&engine](auto&& task) { return alter_tablet_callback(engine, task); });
171
172
6
    _workers[TTaskType::CLONE] = std::make_unique<PriorTaskWorkerPool>(
173
6
            "CLONE", config::clone_worker_count,config::clone_worker_count, [&engine, &cluster_info = _cluster_info](auto&& task) { return clone_callback(engine, cluster_info, task); });
174
175
6
    _workers[TTaskType::STORAGE_MEDIUM_MIGRATE] = std::make_unique<TaskWorkerPool>(
176
6
            "STORAGE_MEDIUM_MIGRATE", config::storage_medium_migrate_count, [&engine](auto&& task) { return storage_medium_migrate_callback(engine, task); });
177
178
6
    _workers[TTaskType::GC_BINLOG] = std::make_unique<TaskWorkerPool>(
179
6
            "GC_BINLOG", 1, [&engine](auto&& task) { return gc_binlog_callback(engine, task); });
180
181
6
    _workers[TTaskType::CLEAN_TRASH] = std::make_unique<TaskWorkerPool>(
182
6
            "CLEAN_TRASH", 1, [&engine](auto&& task) {return clean_trash_callback(engine, task); });
183
184
6
    _workers[TTaskType::CLEAN_UDF_CACHE] = std::make_unique<TaskWorkerPool>(
185
6
            "CLEAN_UDF_CACHE", 1, [](auto&& task) {return clean_udf_cache_callback(task); });
186
187
6
    _workers[TTaskType::UPDATE_VISIBLE_VERSION] = std::make_unique<TaskWorkerPool>(
188
2.24k
            "UPDATE_VISIBLE_VERSION", 1, [&engine](auto&& task) { return visible_version_callback(engine, task); });
189
190
6
    _report_workers.push_back(std::make_unique<ReportWorker>(
191
675
            "REPORT_TASK", _cluster_info, config::report_task_interval_seconds, [&cluster_info = _cluster_info] { report_task_callback(cluster_info); }));
192
193
6
    _report_workers.push_back(std::make_unique<ReportWorker>(
194
297
            "REPORT_DISK_STATE", _cluster_info, config::report_disk_state_interval_seconds, [&engine, &cluster_info = _cluster_info] { report_disk_callback(engine, cluster_info); }));
195
196
6
    _report_workers.push_back(std::make_unique<ReportWorker>(
197
141
            "REPORT_OLAP_TABLET", _cluster_info, config::report_tablet_interval_seconds,[&engine, &cluster_info = _cluster_info] { report_tablet_callback(engine, cluster_info); }));
198
199
6
    _report_workers.push_back(std::make_unique<ReportWorker>(
200
882
            "REPORT_INDEX_POLICY", _cluster_info, config::report_index_policy_interval_seconds,[&cluster_info = _cluster_info] { report_index_policy_callback(cluster_info); }));
201
    // clang-format on
202
203
6
    exec_env->storage_engine().to_local().workers = &_workers;
204
6
}
205
206
0
void AgentServer::cloud_start_workers(CloudStorageEngine& engine, ExecEnv* exec_env) {
207
0
    _workers[TTaskType::PUSH] = std::make_unique<TaskWorkerPool>(
208
0
            "PUSH", config::delete_worker_count,
209
0
            [&engine](auto&& task) { cloud_push_callback(engine, task); });
210
    // TODO(plat1ko): SUBMIT_TABLE_COMPACTION worker
211
212
0
    _workers[TTaskType::ALTER] = std::make_unique<TaskWorkerPool>(
213
0
            "ALTER_TABLE", config::alter_tablet_worker_count,
214
0
            [&engine](auto&& task) { return alter_cloud_tablet_callback(engine, task); });
215
216
0
    _workers[TTaskType::CALCULATE_DELETE_BITMAP] = std::make_unique<TaskWorkerPool>(
217
0
            "CALC_DBM_TASK", config::calc_delete_bitmap_worker_count,
218
0
            [&engine](auto&& task) { return calc_delete_bitmap_callback(engine, task); });
219
220
    // cloud, drop tablet just clean clear_cache, so just one thread do it
221
0
    _workers[TTaskType::DROP] = std::make_unique<TaskWorkerPool>(
222
0
            "DROP_TABLE", 1, [&engine](auto&& task) { return drop_tablet_callback(engine, task); });
223
224
0
    _workers[TTaskType::PUSH_INDEX_POLICY] = std::make_unique<TaskWorkerPool>(
225
0
            "PUSH_INDEX_POLICY", 1, [](auto&& task) { return push_index_policy_callback(task); });
226
227
0
    _workers[TTaskType::DOWNLOAD] = std::make_unique<TaskWorkerPool>(
228
0
            "DOWNLOAD", config::download_worker_count,
229
0
            [&engine, exec_env](auto&& task) { return download_callback(engine, exec_env, task); });
230
231
0
    _workers[TTaskType::MOVE] = std::make_unique<TaskWorkerPool>(
232
0
            "MOVE", 1,
233
0
            [&engine, exec_env](auto&& task) { return move_dir_callback(engine, exec_env, task); });
234
235
0
    _workers[TTaskType::RELEASE_SNAPSHOT] = std::make_unique<TaskWorkerPool>(
236
0
            "RELEASE_SNAPSHOT", config::release_snapshot_worker_count,
237
0
            [&engine](auto&& task) { return release_snapshot_callback(engine, task); });
238
239
0
    _workers[TTaskType::ALTER_INVERTED_INDEX] = std::make_unique<TaskWorkerPool>(
240
0
            "ALTER_INVERTED_INDEX", config::alter_index_worker_count,
241
0
            [&engine](auto&& task) { return alter_cloud_index_callback(engine, task); });
242
243
0
    _report_workers.push_back(std::make_unique<ReportWorker>(
244
0
            "REPORT_TASK", _cluster_info, config::report_task_interval_seconds,
245
0
            [&cluster_info = _cluster_info] { report_task_callback(cluster_info); }));
246
247
0
    _report_workers.push_back(std::make_unique<ReportWorker>(
248
0
            "REPORT_DISK_STATE", _cluster_info, config::report_disk_state_interval_seconds,
249
0
            [&engine, &cluster_info = _cluster_info] {
250
0
                report_disk_callback(engine, cluster_info);
251
0
            }));
252
253
0
    if (config::enable_cloud_tablet_report) {
254
0
        _report_workers.push_back(std::make_unique<ReportWorker>(
255
0
                "REPORT_OLAP_TABLET", _cluster_info, config::report_tablet_interval_seconds,
256
0
                [&engine, &cluster_info = _cluster_info] {
257
0
                    report_tablet_callback(engine, cluster_info);
258
0
                }));
259
0
    }
260
261
0
    _report_workers.push_back(std::make_unique<ReportWorker>(
262
0
            "REPORT_INDEX_POLICY", _cluster_info, config::report_index_policy_interval_seconds,
263
0
            [&cluster_info = _cluster_info] { report_index_policy_callback(cluster_info); }));
264
0
}
265
266
// TODO(lingbin): each task in the batch may have it own status or FE must check and
267
// resend request when something is wrong(BE may need some logic to guarantee idempotence.
268
void AgentServer::submit_tasks(TAgentResult& agent_result,
269
6.48k
                               const std::vector<TAgentTaskRequest>& tasks) {
270
6.48k
    Status ret_st;
271
272
    // TODO check cluster_info here if it is the same with that of heartbeat rpc
273
6.48k
    if (_cluster_info->master_fe_addr.hostname.empty() || _cluster_info->master_fe_addr.port == 0) {
274
0
        Status st = Status::Cancelled("Have not get FE Master heartbeat yet");
275
0
        st.to_thrift(&agent_result.status);
276
0
        return;
277
0
    }
278
279
22.9k
    for (auto&& task : tasks) {
280
22.9k
        VLOG_RPC << "submit one task: " << apache::thrift::ThriftDebugString(task).c_str();
281
22.9k
        auto task_type = task.task_type;
282
22.9k
        if (task_type == TTaskType::REALTIME_PUSH) {
283
22
            task_type = TTaskType::PUSH;
284
22
        }
285
22.9k
        int64_t signature = task.signature;
286
22.9k
        if (auto it = _workers.find(task_type); it != _workers.end()) {
287
22.9k
            auto& worker = it->second;
288
22.9k
            ret_st = worker->submit_task(task);
289
22.9k
        } else {
290
0
            ret_st = Status::InvalidArgument("task(signature={}, type={}) has wrong task type",
291
0
                                             signature, task.task_type);
292
0
        }
293
294
22.9k
        if (!ret_st.ok()) {
295
0
            LOG_WARNING("failed to submit task").tag("task", task).error(ret_st);
296
            // For now, all tasks in the batch share one status, so if any task
297
            // was failed to submit, we can only return error to FE(even when some
298
            // tasks have already been successfully submitted).
299
            // However, Fe does not check the return status of submit_tasks() currently,
300
            // and it is not sure that FE will retry when something is wrong, so here we
301
            // only print an warning log and go on(i.e. do not break current loop),
302
            // to ensure every task can be submitted once. It is OK for now, because the
303
            // ret_st can be error only when it encounters an wrong task_type and
304
            // req-member in TAgentTaskRequest, which is basically impossible.
305
            // TODO(lingbin): check the logic in FE again later.
306
0
        }
307
22.9k
    }
308
309
6.48k
    ret_st.to_thrift(&agent_result.status);
310
6.48k
}
311
312
void AgentServer::publish_cluster_state(TAgentResult& t_agent_result,
313
0
                                        const TAgentPublishRequest& request) {
314
0
    Status status = Status::NotSupported("deprecated method(publish_cluster_state) was invoked");
315
0
    status.to_thrift(&t_agent_result.status);
316
0
}
317
318
0
void AgentServer::stop_report_workers() {
319
0
    for (auto& work : _report_workers) {
320
0
        work->stop();
321
0
    }
322
0
}
323
324
} // namespace doris