Coverage Report

Created: 2024-11-20 19:28

/root/doris/be/src/service/http_service.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "service/http_service.h"
19
20
#include <event2/bufferevent.h>
21
#include <event2/http.h>
22
#include <gen_cpp/FrontendService_types.h>
23
24
#include <string>
25
#include <vector>
26
27
#include "common/config.h"
28
#include "common/status.h"
29
#include "http/action/adjust_log_level.h"
30
#include "http/action/adjust_tracing_dump.h"
31
#include "http/action/calc_file_crc_action.h"
32
#include "http/action/check_rpc_channel_action.h"
33
#include "http/action/check_tablet_segment_action.h"
34
#include "http/action/checksum_action.h"
35
#include "http/action/clear_cache_action.h"
36
#include "http/action/compaction_action.h"
37
#include "http/action/compaction_score_action.h"
38
#include "http/action/config_action.h"
39
#include "http/action/debug_point_action.h"
40
#include "http/action/download_action.h"
41
#include "http/action/download_binlog_action.h"
42
#include "http/action/file_cache_action.h"
43
#include "http/action/health_action.h"
44
#include "http/action/http_stream.h"
45
#include "http/action/jeprofile_actions.h"
46
#include "http/action/load_stream_action.h"
47
#include "http/action/meta_action.h"
48
#include "http/action/metrics_action.h"
49
#include "http/action/pad_rowset_action.h"
50
#include "http/action/pipeline_task_action.h"
51
#include "http/action/pprof_actions.h"
52
#include "http/action/reload_tablet_action.h"
53
#include "http/action/report_action.h"
54
#include "http/action/reset_rpc_channel_action.h"
55
#include "http/action/restore_tablet_action.h"
56
#include "http/action/show_nested_index_file_action.h"
57
#include "http/action/snapshot_action.h"
58
#include "http/action/stream_load.h"
59
#include "http/action/stream_load_2pc.h"
60
#include "http/action/tablet_migration_action.h"
61
#include "http/action/tablets_distribution_action.h"
62
#include "http/action/tablets_info_action.h"
63
#include "http/action/version_action.h"
64
#include "http/default_path_handlers.h"
65
#include "http/ev_http_server.h"
66
#include "http/http_method.h"
67
#include "http/web_page_handler.h"
68
#include "olap/options.h"
69
#include "runtime/exec_env.h"
70
#include "runtime/load_path_mgr.h"
71
#include "util/doris_metrics.h"
72
73
namespace doris {
74
namespace {
75
1
std::shared_ptr<bufferevent_rate_limit_group> get_rate_limit_group(event_base* event_base) {
76
1
    auto rate_limit = config::download_binlog_rate_limit_kbs;
77
1
    if (rate_limit <= 0) {
78
1
        return nullptr;
79
1
    }
80
81
0
    auto max_value = std::numeric_limits<int32_t>::max() / 1024 * 10;
82
0
    if (rate_limit > max_value) {
83
0
        LOG(WARNING) << "rate limit is too large, set to max value.";
84
0
        rate_limit = max_value;
85
0
    }
86
0
    struct timeval cfg_tick = {0, 100 * 1000}; // 100ms
87
0
    rate_limit = rate_limit / 10 * 1024;       // convert to KB/S
88
89
0
    auto token_bucket = std::unique_ptr<ev_token_bucket_cfg, decltype(&ev_token_bucket_cfg_free)>(
90
0
            ev_token_bucket_cfg_new(rate_limit, rate_limit * 2, rate_limit, rate_limit * 2,
91
0
                                    &cfg_tick),
92
0
            ev_token_bucket_cfg_free);
93
0
    return std::shared_ptr<bufferevent_rate_limit_group>(
94
0
            bufferevent_rate_limit_group_new(event_base, token_bucket.get()),
95
0
            bufferevent_rate_limit_group_free);
96
1
}
97
} // namespace
98
99
HttpService::HttpService(ExecEnv* env, int port, int num_threads)
100
        : _env(env),
101
          _ev_http_server(new EvHttpServer(port, num_threads)),
102
1
          _web_page_handler(new WebPageHandler(_ev_http_server.get())) {}
103
104
1
HttpService::~HttpService() {
105
1
    stop();
106
1
}
107
108
// NOLINTBEGIN(readability-function-size)
109
1
Status HttpService::start() {
110
1
    add_default_path_handlers(_web_page_handler.get());
111
112
1
    auto event_base = _ev_http_server->get_event_bases()[0];
113
1
    _rate_limit_group = get_rate_limit_group(event_base.get());
114
115
    // register load
116
1
    StreamLoadAction* streamload_action = _pool.add(new StreamLoadAction(_env));
117
1
    _ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_load",
118
1
                                      streamload_action);
119
1
    _ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_stream_load",
120
1
                                      streamload_action);
121
1
    StreamLoad2PCAction* streamload_2pc_action = _pool.add(new StreamLoad2PCAction(_env));
122
1
    _ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/_stream_load_2pc",
123
1
                                      streamload_2pc_action);
124
1
    _ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_stream_load_2pc",
125
1
                                      streamload_2pc_action);
126
127
    // register http_stream
128
1
    HttpStreamAction* http_stream_action = _pool.add(new HttpStreamAction(_env));
129
1
    _ev_http_server->register_handler(HttpMethod::PUT, "/api/_http_stream", http_stream_action);
130
131
    // register download action
132
1
    std::vector<std::string> allow_paths;
133
1
    for (auto& path : _env->store_paths()) {
134
0
        allow_paths.emplace_back(path.path);
135
0
    }
136
1
    DownloadAction* download_action = _pool.add(new DownloadAction(_env, nullptr, allow_paths));
137
1
    _ev_http_server->register_handler(HttpMethod::HEAD, "/api/_download_load", download_action);
138
1
    _ev_http_server->register_handler(HttpMethod::GET, "/api/_download_load", download_action);
139
140
1
    DownloadAction* tablet_download_action =
141
1
            _pool.add(new DownloadAction(_env, _rate_limit_group, allow_paths));
142
1
    _ev_http_server->register_handler(HttpMethod::HEAD, "/api/_tablet/_download",
143
1
                                      tablet_download_action);
144
1
    _ev_http_server->register_handler(HttpMethod::GET, "/api/_tablet/_download",
145
1
                                      tablet_download_action);
146
1
    if (config::enable_single_replica_load) {
147
1
        DownloadAction* single_replica_download_action = _pool.add(new DownloadAction(
148
1
                _env, nullptr, allow_paths, config::single_replica_load_download_num_workers));
149
1
        _ev_http_server->register_handler(HttpMethod::HEAD, "/api/_single_replica/_download",
150
1
                                          single_replica_download_action);
151
1
        _ev_http_server->register_handler(HttpMethod::GET, "/api/_single_replica/_download",
152
1
                                          single_replica_download_action);
153
1
    }
154
155
1
    DownloadAction* error_log_download_action =
156
1
            _pool.add(new DownloadAction(_env, _env->load_path_mgr()->get_load_error_file_dir()));
157
1
    _ev_http_server->register_handler(HttpMethod::GET, "/api/_load_error_log",
158
1
                                      error_log_download_action);
159
1
    _ev_http_server->register_handler(HttpMethod::HEAD, "/api/_load_error_log",
160
1
                                      error_log_download_action);
161
162
1
    DownloadBinlogAction* download_binlog_action =
163
1
            _pool.add(new DownloadBinlogAction(_env, _rate_limit_group));
164
1
    _ev_http_server->register_handler(HttpMethod::GET, "/api/_binlog/_download",
165
1
                                      download_binlog_action);
166
1
    _ev_http_server->register_handler(HttpMethod::HEAD, "/api/_binlog/_download",
167
1
                                      download_binlog_action);
168
169
1
    AdjustLogLevelAction* adjust_log_level_action = _pool.add(new AdjustLogLevelAction());
170
1
    _ev_http_server->register_handler(HttpMethod::POST, "api/glog/adjust", adjust_log_level_action);
171
172
    //TODO: add query GET interface
173
1
    auto* adjust_tracing_dump = _pool.add(new AdjustTracingDump());
174
1
    _ev_http_server->register_handler(HttpMethod::POST, "api/pipeline/tracing",
175
1
                                      adjust_tracing_dump);
176
177
    // Register BE version action
178
1
    VersionAction* version_action =
179
1
            _pool.add(new VersionAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::NONE));
180
1
    _ev_http_server->register_handler(HttpMethod::GET, "/api/be_version_info", version_action);
181
182
    // Register BE health action
183
1
    HealthAction* health_action = _pool.add(new HealthAction());
184
1
    _ev_http_server->register_handler(HttpMethod::GET, "/api/health", health_action);
185
186
    // Clear cache action
187
1
    ClearCacheAction* clear_cache_action = _pool.add(new ClearCacheAction());
188
1
    _ev_http_server->register_handler(HttpMethod::GET, "/api/clear_cache/{type}",
189
1
                                      clear_cache_action);
190
191
    // Dump all running pipeline tasks
192
1
    PipelineTaskAction* pipeline_task_action = _pool.add(new PipelineTaskAction());
193
1
    _ev_http_server->register_handler(HttpMethod::GET, "/api/running_pipeline_tasks",
194
1
                                      pipeline_task_action);
195
196
    // Dump all running pipeline tasks which has been running for more than {duration} seconds
197
1
    LongPipelineTaskAction* long_pipeline_task_action = _pool.add(new LongPipelineTaskAction());
198
1
    _ev_http_server->register_handler(HttpMethod::GET, "/api/running_pipeline_tasks/{duration}",
199
1
                                      long_pipeline_task_action);
200
201
    // Register BE LoadStream action
202
1
    LoadStreamAction* load_stream_action = _pool.add(new LoadStreamAction());
203
1
    _ev_http_server->register_handler(HttpMethod::GET, "/api/load_streams", load_stream_action);
204
205
1
    QueryPipelineTaskAction* query_pipeline_task_action = _pool.add(new QueryPipelineTaskAction());
206
1
    _ev_http_server->register_handler(HttpMethod::GET, "/api/query_pipeline_tasks/{query_id}",
207
1
                                      query_pipeline_task_action);
208
209
    // Register Tablets Info action
210
1
    TabletsInfoAction* tablets_info_action =
211
1
            _pool.add(new TabletsInfoAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
212
1
    _ev_http_server->register_handler(HttpMethod::GET, "/tablets_json", tablets_info_action);
213
214
    // Register Tablets Distribution action
215
1
    TabletsDistributionAction* tablets_distribution_action = _pool.add(
216
1
            new TabletsDistributionAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
217
1
    _ev_http_server->register_handler(HttpMethod::GET, "/api/tablets_distribution",
218
1
                                      tablets_distribution_action);
219
220
    // Register tablet migration action
221
1
    TabletMigrationAction* tablet_migration_action = _pool.add(
222
1
            new TabletMigrationAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
223
1
    _ev_http_server->register_handler(HttpMethod::GET, "/api/tablet_migration",
224
1
                                      tablet_migration_action);
225
226
    // register pprof actions
227
1
    static_cast<void>(PprofActions::setup(_env, _ev_http_server.get(), _pool));
228
229
    // register jeprof actions
230
1
    static_cast<void>(JeprofileActions::setup(_env, _ev_http_server.get(), _pool));
231
232
    // register metrics
233
1
    {
234
1
        auto* action =
235
1
                _pool.add(new MetricsAction(DorisMetrics::instance()->metric_registry(), _env,
236
1
                                            TPrivilegeHier::GLOBAL, TPrivilegeType::NONE));
237
1
        _ev_http_server->register_handler(HttpMethod::GET, "/metrics", action);
238
1
    }
239
240
1
    MetaAction* meta_action =
241
1
            _pool.add(new MetaAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
242
1
    _ev_http_server->register_handler(HttpMethod::GET, "/api/meta/{op}/{tablet_id}", meta_action);
243
244
1
    FileCacheAction* file_cache_action = _pool.add(new FileCacheAction());
245
1
    _ev_http_server->register_handler(HttpMethod::GET, "/api/file_cache", file_cache_action);
246
247
#ifndef BE_TEST
248
    // Register BE checksum action
249
    ChecksumAction* checksum_action =
250
            _pool.add(new ChecksumAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
251
    _ev_http_server->register_handler(HttpMethod::GET, "/api/checksum", checksum_action);
252
253
    // Register BE reload tablet action
254
    ReloadTabletAction* reload_tablet_action =
255
            _pool.add(new ReloadTabletAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
256
    _ev_http_server->register_handler(HttpMethod::GET, "/api/reload_tablet", reload_tablet_action);
257
258
    RestoreTabletAction* restore_tablet_action =
259
            _pool.add(new RestoreTabletAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
260
    _ev_http_server->register_handler(HttpMethod::POST, "/api/restore_tablet",
261
                                      restore_tablet_action);
262
263
    // Register BE snapshot action
264
    SnapshotAction* snapshot_action =
265
            _pool.add(new SnapshotAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
266
    _ev_http_server->register_handler(HttpMethod::GET, "/api/snapshot", snapshot_action);
267
268
    CompactionScoreAction* compaction_score_action =
269
            _pool.add(new CompactionScoreAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN,
270
                                                _env->get_storage_engine()->tablet_manager()));
271
    _ev_http_server->register_handler(HttpMethod::GET, "/api/compaction_score",
272
                                      compaction_score_action);
273
#endif
274
275
    // 2 compaction actions
276
1
    CompactionAction* show_compaction_action = _pool.add(new CompactionAction(
277
1
            CompactionActionType::SHOW_INFO, _env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
278
1
    _ev_http_server->register_handler(HttpMethod::GET, "/api/compaction/show",
279
1
                                      show_compaction_action);
280
1
    CompactionAction* run_compaction_action =
281
1
            _pool.add(new CompactionAction(CompactionActionType::RUN_COMPACTION, _env,
282
1
                                           TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
283
1
    _ev_http_server->register_handler(HttpMethod::POST, "/api/compaction/run",
284
1
                                      run_compaction_action);
285
1
    CompactionAction* run_status_compaction_action =
286
1
            _pool.add(new CompactionAction(CompactionActionType::RUN_COMPACTION_STATUS, _env,
287
1
                                           TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
288
1
    _ev_http_server->register_handler(HttpMethod::GET, "/api/compaction/run_status",
289
1
                                      run_status_compaction_action);
290
291
1
    ConfigAction* update_config_action =
292
1
            _pool.add(new ConfigAction(ConfigActionType::UPDATE_CONFIG));
293
1
    _ev_http_server->register_handler(HttpMethod::POST, "/api/update_config", update_config_action);
294
295
1
    ConfigAction* show_config_action = _pool.add(new ConfigAction(ConfigActionType::SHOW_CONFIG));
296
1
    _ev_http_server->register_handler(HttpMethod::GET, "/api/show_config", show_config_action);
297
298
    // 3 check action
299
1
    CheckRPCChannelAction* check_rpc_channel_action = _pool.add(
300
1
            new CheckRPCChannelAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
301
1
    _ev_http_server->register_handler(HttpMethod::GET,
302
1
                                      "/api/check_rpc_channel/{ip}/{port}/{payload_size}",
303
1
                                      check_rpc_channel_action);
304
305
1
    ResetRPCChannelAction* reset_rpc_channel_action = _pool.add(
306
1
            new ResetRPCChannelAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
307
1
    _ev_http_server->register_handler(HttpMethod::GET, "/api/reset_rpc_channel/{endpoints}",
308
1
                                      reset_rpc_channel_action);
309
310
1
    CheckTabletSegmentAction* check_tablet_segment_action = _pool.add(
311
1
            new CheckTabletSegmentAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
312
1
    _ev_http_server->register_handler(HttpMethod::POST, "/api/check_tablet_segment_lost",
313
1
                                      check_tablet_segment_action);
314
315
1
    PadRowsetAction* pad_rowset_action =
316
1
            _pool.add(new PadRowsetAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
317
1
    _ev_http_server->register_handler(HttpMethod::POST, "/api/pad_rowset", pad_rowset_action);
318
319
    // debug point
320
1
    AddDebugPointAction* add_debug_point_action =
321
1
            _pool.add(new AddDebugPointAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
322
1
    _ev_http_server->register_handler(HttpMethod::POST, "/api/debug_point/add/{debug_point}",
323
1
                                      add_debug_point_action);
324
325
1
    RemoveDebugPointAction* remove_debug_point_action = _pool.add(
326
1
            new RemoveDebugPointAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
327
1
    _ev_http_server->register_handler(HttpMethod::POST, "/api/debug_point/remove/{debug_point}",
328
1
                                      remove_debug_point_action);
329
330
1
    ClearDebugPointsAction* clear_debug_points_action = _pool.add(
331
1
            new ClearDebugPointsAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
332
1
    _ev_http_server->register_handler(HttpMethod::POST, "/api/debug_point/clear",
333
1
                                      clear_debug_points_action);
334
335
1
    ReportAction* report_tablet_action = _pool.add(new ReportAction(
336
1
            _env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN, "REPORT_OLAP_TABLE"));
337
1
    _ev_http_server->register_handler(HttpMethod::GET, "/api/report/tablet", report_tablet_action);
338
339
1
    ReportAction* report_disk_action = _pool.add(new ReportAction(
340
1
            _env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN, "REPORT_DISK_STATE"));
341
1
    _ev_http_server->register_handler(HttpMethod::GET, "/api/report/disk", report_disk_action);
342
343
1
    CalcFileCrcAction* calc_crc_action =
344
1
            _pool.add(new CalcFileCrcAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
345
1
    _ev_http_server->register_handler(HttpMethod::GET, "/api/calc_crc", calc_crc_action);
346
347
1
    ShowNestedIndexFileAction* show_nested_index_file_action = _pool.add(
348
1
            new ShowNestedIndexFileAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
349
1
    _ev_http_server->register_handler(HttpMethod::GET, "/api/show_nested_index_file",
350
1
                                      show_nested_index_file_action);
351
352
1
    ReportAction* report_task_action = _pool.add(
353
1
            new ReportAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN, "REPORT_TASK"));
354
1
    _ev_http_server->register_handler(HttpMethod::GET, "/api/report/task", report_task_action);
355
356
1
    _ev_http_server->start();
357
1
    return Status::OK();
358
1
}
359
// NOLINTEND(readability-function-size)
360
361
1
void HttpService::stop() {
362
1
    if (stopped) {
363
0
        return;
364
0
    }
365
1
    _ev_http_server->stop();
366
1
    _pool.clear();
367
1
    stopped = true;
368
1
}
369
370
1
int HttpService::get_real_port() const {
371
1
    return _ev_http_server->get_real_port();
372
1
}
373
374
} // namespace doris