Coverage Report

Created: 2026-03-12 17:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_backend_service.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_backend_service.h"
19
20
#include <brpc/controller.h>
21
22
#include "cloud/cloud_storage_engine.h"
23
#include "cloud/cloud_tablet.h"
24
#include "cloud/cloud_tablet_hotspot.h"
25
#include "cloud/cloud_tablet_mgr.h"
26
#include "cloud/cloud_warm_up_manager.h"
27
#include "common/config.h"
28
#include "common/logging.h"
29
#include "common/status.h"
30
#include "io/cache/block_file_cache_downloader.h"
31
#include "io/cache/block_file_cache_factory.h"
32
#include "load/stream_load/stream_load_context.h"
33
#include "load/stream_load/stream_load_recorder.h"
34
#include "util/brpc_client_cache.h" // BrpcClientCache
35
#include "util/stack_util.h"
36
#include "util/thrift_server.h"
37
38
namespace doris {
39
40
bvar::Adder<uint64_t> g_file_cache_warm_up_cache_async_submitted_segment_num(
41
        "file_cache_warm_up_cache_async_submitted_segment_num");
42
bvar::Adder<uint64_t> g_file_cache_warm_up_cache_async_submitted_task_num(
43
        "file_cache_warm_up_cache_async_submitted_task_num");
44
bvar::Adder<uint64_t> g_file_cache_warm_up_cache_async_submitted_tablet_num(
45
        "file_cache_warm_up_cache_async_submitted_tablet_num");
46
47
CloudBackendService::CloudBackendService(CloudStorageEngine& engine, ExecEnv* exec_env)
48
1
        : BaseBackendService(exec_env), _engine(engine) {}
49
50
0
CloudBackendService::~CloudBackendService() = default;
51
52
Status CloudBackendService::create_service(CloudStorageEngine& engine, ExecEnv* exec_env, int port,
53
                                           std::unique_ptr<ThriftServer>* server,
54
1
                                           std::shared_ptr<doris::CloudBackendService> service) {
55
1
    service->_agent_server->cloud_start_workers(engine, exec_env);
56
    // TODO: do we want a BoostThreadFactory?
57
    // TODO: we want separate thread factories here, so that fe requests can't starve
58
    // be requests
59
    // std::shared_ptr<TProcessor> be_processor = std::make_shared<BackendServiceProcessor>(service);
60
1
    auto be_processor = std::make_shared<BackendServiceProcessor>(service);
61
62
1
    *server = std::make_unique<ThriftServer>("backend", be_processor, port,
63
1
                                             config::be_service_threads);
64
65
1
    LOG(INFO) << "Doris CloudBackendService listening on " << port;
66
67
1
    return Status::OK();
68
1
}
69
70
void CloudBackendService::sync_load_for_tablets(TSyncLoadForTabletsResponse&,
71
0
                                                const TSyncLoadForTabletsRequest& request) {
72
0
    auto f = [this, tablet_ids = request.tablet_ids]() {
73
0
        std::for_each(tablet_ids.cbegin(), tablet_ids.cend(), [this](int64_t tablet_id) {
74
0
            CloudTabletSPtr tablet;
75
0
            auto result = _engine.tablet_mgr().get_tablet(tablet_id, true);
76
0
            if (!result.has_value()) {
77
0
                return;
78
0
            }
79
0
            SyncOptions options;
80
0
            options.warmup_delta_data = true;
81
0
            Status st = result.value()->sync_rowsets(options);
82
0
            if (!st.ok()) {
83
0
                LOG_WARNING("failed to sync load for tablet").error(st);
84
0
            }
85
0
        });
86
0
    };
87
0
    static_cast<void>(_engine.sync_load_for_tablets_thread_pool().submit_func(std::move(f)));
88
0
}
89
90
void CloudBackendService::get_top_n_hot_partitions(TGetTopNHotPartitionsResponse& response,
91
1
                                                   const TGetTopNHotPartitionsRequest& request) {
92
1
    _engine.tablet_hotspot().get_top_n_hot_partition(&response.hot_tables);
93
1
    response.file_cache_size = io::FileCacheFactory::instance()->get_capacity();
94
1
    response.__isset.hot_tables = !response.hot_tables.empty();
95
1
}
96
97
void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
98
0
                                          const TWarmUpTabletsRequest& request) {
99
0
    Status st;
100
0
    auto& manager = _engine.cloud_warm_up_manager();
101
0
    switch (request.type) {
102
0
    case TWarmUpTabletsRequestType::SET_JOB: {
103
0
        LOG_INFO("receive the warm up request.")
104
0
                .tag("request_type", "SET_JOB")
105
0
                .tag("job_id", request.job_id);
106
0
        if (request.__isset.event) {
107
0
            st = manager.set_event(request.job_id, request.event);
108
0
            if (st.ok()) {
109
0
                break;
110
0
            }
111
0
        } else {
112
0
            st = manager.check_and_set_job_id(request.job_id);
113
0
        }
114
0
        if (!st.ok()) {
115
0
            LOG_WARNING("SET_JOB failed.").error(st);
116
0
            break;
117
0
        }
118
0
        [[fallthrough]];
119
0
    }
120
0
    case TWarmUpTabletsRequestType::SET_BATCH: {
121
0
        LOG_INFO("receive the warm up request.")
122
0
                .tag("request_type", "SET_BATCH")
123
0
                .tag("job_id", request.job_id)
124
0
                .tag("batch_id", request.batch_id)
125
0
                .tag("jobs size", request.job_metas.size())
126
0
                .tag("tablet num of first meta",
127
0
                     request.job_metas.empty() ? 0 : request.job_metas[0].tablet_ids.size());
128
0
        bool retry = false;
129
0
        st = manager.check_and_set_batch_id(request.job_id, request.batch_id, &retry);
130
0
        if (!retry && st) {
131
0
            manager.add_job(request.job_metas);
132
0
        } else {
133
0
            if (retry) {
134
0
                LOG_WARNING("retry the job.")
135
0
                        .tag("job_id", request.job_id)
136
0
                        .tag("batch_id", request.batch_id);
137
0
            } else {
138
0
                LOG_WARNING("SET_BATCH failed.").error(st);
139
0
            }
140
0
        }
141
0
        break;
142
0
    }
143
0
    case TWarmUpTabletsRequestType::GET_CURRENT_JOB_STATE_AND_LEASE: {
144
0
        auto [job_id, batch_id, pending_job_size, finish_job_size] =
145
0
                manager.get_current_job_state();
146
0
        LOG_INFO("receive the warm up request.")
147
0
                .tag("request_type", "GET_CURRENT_JOB_STATE_AND_LEASE")
148
0
                .tag("job_id", job_id)
149
0
                .tag("batch_id", batch_id)
150
0
                .tag("pending_job_size", pending_job_size)
151
0
                .tag("finish_job_size", finish_job_size);
152
0
        response.__set_job_id(job_id);
153
0
        response.__set_batch_id(batch_id);
154
0
        response.__set_pending_job_size(pending_job_size);
155
0
        response.__set_finish_job_size(finish_job_size);
156
0
        break;
157
0
    }
158
0
    case TWarmUpTabletsRequestType::CLEAR_JOB: {
159
0
        LOG_INFO("receive the warm up request.")
160
0
                .tag("request_type", "CLEAR_JOB")
161
0
                .tag("job_id", request.job_id);
162
0
        if (request.__isset.event) {
163
0
            st = manager.set_event(request.job_id, request.event, /* clear: */ true);
164
0
        } else {
165
0
            st = manager.clear_job(request.job_id);
166
0
        }
167
0
        break;
168
0
    }
169
0
    default:
170
0
        DCHECK(false);
171
0
    };
172
0
    st.to_thrift(&response.status);
173
0
}
174
175
static Status run_rpc_get_file_cache_meta(std::shared_ptr<PBackendService_Stub> brpc_stub,
176
                                          const std::string& brpc_addr,
177
                                          PGetFileCacheMetaRequest brpc_request,
178
0
                                          PGetFileCacheMetaResponse& brpc_response) {
179
0
    brpc::Controller cntl;
180
0
    cntl.set_timeout_ms(20 * 1000); // 20s
181
0
    brpc_stub->get_file_cache_meta_by_tablet_id(&cntl, &brpc_request, &brpc_response, nullptr);
182
0
    if (cntl.Failed()) {
183
0
        LOG(WARNING) << "warm_up_cache_async: brpc call failed, addr=" << brpc_addr
184
0
                     << ", error=" << cntl.ErrorText() << ", error code=" << cntl.ErrorCode();
185
0
        return Status::RpcError("{} isn't connected, error code={}", brpc_addr, cntl.ErrorCode());
186
0
    }
187
0
    VLOG_DEBUG << "warm_up_cache_async: request=" << brpc_request.DebugString()
188
0
               << ", response=" << brpc_response.DebugString();
189
0
    g_file_cache_warm_up_cache_async_submitted_segment_num
190
0
            << brpc_response.file_cache_block_metas().size();
191
0
    return Status::OK();
192
0
}
193
194
void CloudBackendService::_warm_up_cache(TWarmUpCacheAsyncResponse& response,
195
0
                                         const TWarmUpCacheAsyncRequest& request) {
196
0
    std::ostringstream oss;
197
0
    oss << "[";
198
0
    for (size_t i = 0; i < request.tablet_ids.size() && i < 10; ++i) {
199
0
        if (i > 0) oss << ",";
200
0
        oss << request.tablet_ids[i];
201
0
    }
202
0
    oss << "]";
203
0
    g_file_cache_warm_up_cache_async_submitted_tablet_num << request.tablet_ids.size();
204
0
    LOG(INFO) << "warm_up_cache_async: enter, request=" << request.host << ":" << request.brpc_port
205
0
              << ", tablets num=" << request.tablet_ids.size() << ", tablet_ids=" << oss.str();
206
207
0
    auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
208
    // Record each tablet in manager
209
0
    for (int64_t tablet_id : request.tablet_ids) {
210
0
        manager.record_balanced_tablet(tablet_id, request.host, request.brpc_port);
211
0
    }
212
213
0
    std::string host = request.host;
214
0
    auto* dns_cache = ExecEnv::GetInstance()->dns_cache();
215
0
    if (dns_cache == nullptr) {
216
0
        LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
217
0
    } else if (!is_valid_ip(request.host)) {
218
0
        Status status = dns_cache->get(request.host, &host);
219
0
        if (!status.ok()) {
220
0
            LOG(WARNING) << "failed to get ip from host " << request.host << ": "
221
0
                         << status.to_string();
222
0
            return;
223
0
        }
224
0
    }
225
0
    std::string brpc_addr = get_host_port(host, request.brpc_port);
226
0
    std::shared_ptr<PBackendService_Stub> brpc_stub =
227
0
            _exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_addr);
228
0
    if (!brpc_stub) {
229
0
        LOG(WARNING) << "warm_up_cache_async: failed to get brpc_stub for addr " << brpc_addr;
230
0
        return;
231
0
    }
232
0
    PGetFileCacheMetaRequest brpc_request;
233
0
    PGetFileCacheMetaResponse brpc_response;
234
0
    for (int64_t tablet_id : request.tablet_ids) {
235
0
        brpc_request.add_tablet_ids(tablet_id);
236
0
    }
237
238
0
    Status rpc_status = run_rpc_get_file_cache_meta(brpc_stub, brpc_addr, std::move(brpc_request),
239
0
                                                    brpc_response);
240
0
    if (rpc_status.ok()) {
241
0
        _engine.file_cache_block_downloader().submit_download_task(
242
0
                std::move(*brpc_response.mutable_file_cache_block_metas()));
243
0
    } else {
244
0
        LOG(WARNING) << "warm_up_cache_async: rpc failed for addr=" << brpc_addr
245
0
                     << ", status=" << rpc_status;
246
0
    }
247
0
}
248
249
void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& response,
250
0
                                              const TWarmUpCacheAsyncRequest& request) {
251
    // just submit the task to the thread pool, no need to wait for the result
252
0
    auto do_warm_up = [this, request, &response]() { this->_warm_up_cache(response, request); };
253
0
    g_file_cache_warm_up_cache_async_submitted_task_num << 1;
254
0
    Status submit_st = _engine.warmup_cache_async_thread_pool().submit_func(std::move(do_warm_up));
255
0
    if (!submit_st.ok()) {
256
0
        LOG(WARNING) << "warm_up_cache_async: fail to submit heavy task to "
257
0
                        "warmup_cache_async_thread_pool, status="
258
0
                     << submit_st.to_string() << ", execute synchronously";
259
0
        do_warm_up();
260
0
    }
261
0
    TStatus t_status;
262
0
    submit_st.to_thrift(&t_status);
263
0
    response.status = std::move(t_status);
264
0
}
265
266
void CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse& response,
267
0
                                                    const TCheckWarmUpCacheAsyncRequest& request) {
268
0
    std::ostringstream oss;
269
0
    oss << "[";
270
0
    for (size_t i = 0; i < request.tablets.size() && i < 10; ++i) {
271
0
        if (i > 0) oss << ",";
272
0
        oss << request.tablets[i];
273
0
    }
274
0
    oss << "]";
275
0
    LOG(INFO) << "check_warm_up_cache_async: enter, request tablets num=" << request.tablets.size()
276
0
              << ", tablet_ids=" << oss.str();
277
0
    std::map<int64_t, bool> task_done;
278
0
    _engine.file_cache_block_downloader().check_download_task(request.tablets, &task_done);
279
0
    DBUG_EXECUTE_IF("CloudBackendService.check_warm_up_cache_async.return_task_false", {
280
0
        for (auto& it : task_done) {
281
0
            it.second = false;
282
0
        }
283
0
    });
284
0
    response.__set_task_done(task_done);
285
286
0
    for (const auto& [tablet_id, done] : task_done) {
287
0
        VLOG_DEBUG << "check_warm_up_cache_async: tablet_id=" << tablet_id << ", done=" << done;
288
0
    }
289
290
0
    Status st = Status::OK();
291
0
    TStatus t_status;
292
0
    st.to_thrift(&t_status);
293
0
    response.status = t_status;
294
0
}
295
296
void CloudBackendService::get_stream_load_record(TStreamLoadRecordResult& result,
297
36
                                                 int64_t last_stream_record_time) {
298
36
    BaseBackendService::get_stream_load_record(result, last_stream_record_time,
299
36
                                               _engine.get_stream_load_recorder());
300
36
}
301
302
} // namespace doris