Coverage Report

Created: 2026-03-12 16:03

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/service/backend_service.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "service/backend_service.h"
19
20
#include <absl/strings/str_split.h>
21
#include <arrow/record_batch.h>
22
#include <fmt/format.h>
23
#include <gen_cpp/BackendService.h>
24
#include <gen_cpp/BackendService_types.h>
25
#include <gen_cpp/Data_types.h>
26
#include <gen_cpp/DorisExternalService_types.h>
27
#include <gen_cpp/FrontendService_types.h>
28
#include <gen_cpp/Metrics_types.h>
29
#include <gen_cpp/PaloInternalService_types.h>
30
#include <gen_cpp/Planner_types.h>
31
#include <gen_cpp/Status_types.h>
32
#include <gen_cpp/Types_types.h>
33
#include <sys/types.h>
34
#include <thrift/concurrency/ThreadFactory.h>
35
#include <thrift/protocol/TDebugProtocol.h>
36
#include <time.h>
37
38
#include <cstdint>
39
#include <map>
40
#include <memory>
41
#include <ostream>
42
#include <ranges>
43
#include <string>
44
#include <thread>
45
#include <utility>
46
#include <vector>
47
48
#include "absl/strings/substitute.h"
49
#include "cloud/config.h"
50
#include "common/config.h"
51
#include "common/logging.h"
52
#include "common/status.h"
53
#include "exprs/function/dictionary_factory.h"
54
#include "format/arrow/arrow_row_batch.h"
55
#include "io/fs/connectivity/storage_connectivity_tester.h"
56
#include "io/fs/local_file_system.h"
57
#include "load/routine_load/routine_load_task_executor.h"
58
#include "load/stream_load/stream_load_context.h"
59
#include "load/stream_load/stream_load_recorder.h"
60
#include "runtime/exec_env.h"
61
#include "runtime/external_scan_context_mgr.h"
62
#include "runtime/fragment_mgr.h"
63
#include "runtime/result_queue_mgr.h"
64
#include "runtime/runtime_profile.h"
65
#include "service/http/http_client.h"
66
#include "storage/olap_common.h"
67
#include "storage/olap_define.h"
68
#include "storage/rowset/beta_rowset.h"
69
#include "storage/rowset/pending_rowset_helper.h"
70
#include "storage/rowset/rowset_factory.h"
71
#include "storage/rowset/rowset_meta.h"
72
#include "storage/snapshot/snapshot_manager.h"
73
#include "storage/storage_engine.h"
74
#include "storage/tablet/tablet_manager.h"
75
#include "storage/tablet/tablet_meta.h"
76
#include "storage/txn/txn_manager.h"
77
#include "udf/python/python_env.h"
78
#include "util/defer_op.h"
79
#include "util/threadpool.h"
80
#include "util/thrift_server.h"
81
#include "util/uid_util.h"
82
#include "util/url_coding.h"
83
84
namespace apache {
85
namespace thrift {
86
class TException;
87
class TMultiplexedProcessor;
88
class TProcessor;
89
namespace transport {
90
class TTransportException;
91
} // namespace transport
92
} // namespace thrift
93
} // namespace apache
94
95
namespace doris {
96
#include "common/compile_check_begin.h"
97
namespace {
98
99
bvar::LatencyRecorder g_ingest_binlog_latency("doris_backend_service", "ingest_binlog");
100
101
struct IngestBinlogArg {
102
    int64_t txn_id;
103
    int64_t partition_id;
104
    int64_t local_tablet_id;
105
    TabletSharedPtr local_tablet;
106
    TIngestBinlogRequest request;
107
    TStatus* tstatus;
108
};
109
110
Status _exec_http_req(std::optional<HttpClient>& client, int retry_times, int sleep_time,
111
0
                      const std::function<Status(HttpClient*)>& callback) {
112
0
    if (client.has_value()) {
113
0
        return client->execute(retry_times, sleep_time, callback);
114
0
    } else {
115
0
        return HttpClient::execute_with_retry(retry_times, sleep_time, callback);
116
0
    }
117
0
}
118
119
Status _download_binlog_segment_file(HttpClient* client, const std::string& get_segment_file_url,
120
                                     const std::string& segment_path, uint64_t segment_file_size,
121
                                     uint64_t estimate_timeout,
122
0
                                     std::vector<std::string>& download_success_files) {
123
0
    RETURN_IF_ERROR(client->init(get_segment_file_url));
124
0
    client->set_timeout_ms(estimate_timeout * 1000);
125
0
    RETURN_IF_ERROR(client->download(segment_path));
126
0
    download_success_files.push_back(segment_path);
127
128
0
    std::string remote_file_md5;
129
0
    RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5));
130
0
    LOG(INFO) << "download segment file to " << segment_path << ", remote md5: " << remote_file_md5
131
0
              << ", remote size: " << segment_file_size;
132
133
0
    std::error_code ec;
134
    // Check file length
135
0
    uint64_t local_file_size = std::filesystem::file_size(segment_path, ec);
136
0
    if (ec) {
137
0
        LOG(WARNING) << "download file error" << ec.message();
138
0
        return Status::IOError("can't retrive file_size of {}, due to {}", segment_path,
139
0
                               ec.message());
140
0
    }
141
142
0
    if (local_file_size != segment_file_size) {
143
0
        LOG(WARNING) << "download file length error"
144
0
                     << ", get_segment_file_url=" << get_segment_file_url
145
0
                     << ", file_size=" << segment_file_size
146
0
                     << ", local_file_size=" << local_file_size;
147
0
        return Status::RuntimeError("downloaded file size is not equal, local={}, remote={}",
148
0
                                    local_file_size, segment_file_size);
149
0
    }
150
151
0
    if (!remote_file_md5.empty()) { // keep compatibility
152
0
        std::string local_file_md5;
153
0
        RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(segment_path, &local_file_md5));
154
0
        if (local_file_md5 != remote_file_md5) {
155
0
            LOG(WARNING) << "download file md5 error"
156
0
                         << ", get_segment_file_url=" << get_segment_file_url
157
0
                         << ", remote_file_md5=" << remote_file_md5
158
0
                         << ", local_file_md5=" << local_file_md5;
159
0
            return Status::RuntimeError("download file md5 is not equal, local={}, remote={}",
160
0
                                        local_file_md5, remote_file_md5);
161
0
        }
162
0
    }
163
164
0
    return io::global_local_filesystem()->permission(segment_path,
165
0
                                                     io::LocalFileSystem::PERMS_OWNER_RW);
166
0
}
167
168
Status _download_binlog_index_file(HttpClient* client,
169
                                   const std::string& get_segment_index_file_url,
170
                                   const std::string& local_segment_index_path,
171
                                   uint64_t segment_index_file_size, uint64_t estimate_timeout,
172
0
                                   std::vector<std::string>& download_success_files) {
173
0
    RETURN_IF_ERROR(client->init(get_segment_index_file_url));
174
0
    client->set_timeout_ms(estimate_timeout * 1000);
175
0
    RETURN_IF_ERROR(client->download(local_segment_index_path));
176
0
    download_success_files.push_back(local_segment_index_path);
177
178
0
    std::string remote_file_md5;
179
0
    RETURN_IF_ERROR(client->get_content_md5(&remote_file_md5));
180
181
0
    LOG(INFO) << "download segment index file to " << local_segment_index_path
182
0
              << ", remote md5: " << remote_file_md5
183
0
              << ", remote size: " << segment_index_file_size;
184
185
0
    std::error_code ec;
186
    // Check file length
187
0
    uint64_t local_index_file_size = std::filesystem::file_size(local_segment_index_path, ec);
188
0
    if (ec) {
189
0
        LOG(WARNING) << "download index file error" << ec.message();
190
0
        return Status::IOError("can't retrive file_size of {}, due to {}", local_segment_index_path,
191
0
                               ec.message());
192
0
    }
193
0
    if (local_index_file_size != segment_index_file_size) {
194
0
        LOG(WARNING) << "download index file length error"
195
0
                     << ", get_segment_index_file_url=" << get_segment_index_file_url
196
0
                     << ", index_file_size=" << segment_index_file_size
197
0
                     << ", local_index_file_size=" << local_index_file_size;
198
0
        return Status::RuntimeError("downloaded index file size is not equal, local={}, remote={}",
199
0
                                    local_index_file_size, segment_index_file_size);
200
0
    }
201
202
0
    if (!remote_file_md5.empty()) { // keep compatibility
203
0
        std::string local_file_md5;
204
0
        RETURN_IF_ERROR(
205
0
                io::global_local_filesystem()->md5sum(local_segment_index_path, &local_file_md5));
206
0
        if (local_file_md5 != remote_file_md5) {
207
0
            LOG(WARNING) << "download file md5 error"
208
0
                         << ", get_segment_index_file_url=" << get_segment_index_file_url
209
0
                         << ", remote_file_md5=" << remote_file_md5
210
0
                         << ", local_file_md5=" << local_file_md5;
211
0
            return Status::RuntimeError("download file md5 is not equal, local={}, remote={}",
212
0
                                        local_file_md5, remote_file_md5);
213
0
        }
214
0
    }
215
216
0
    return io::global_local_filesystem()->permission(local_segment_index_path,
217
0
                                                     io::LocalFileSystem::PERMS_OWNER_RW);
218
0
}
219
220
0
void _ingest_binlog(StorageEngine& engine, IngestBinlogArg* arg) {
221
0
    std::optional<HttpClient> client;
222
0
    if (config::enable_ingest_binlog_with_persistent_connection) {
223
        // Save the http client instance for persistent connection
224
0
        client = std::make_optional<HttpClient>();
225
0
    }
226
227
0
    auto txn_id = arg->txn_id;
228
0
    auto partition_id = arg->partition_id;
229
0
    auto local_tablet_id = arg->local_tablet_id;
230
0
    const auto& local_tablet = arg->local_tablet;
231
0
    const auto& local_tablet_uid = local_tablet->tablet_uid();
232
233
0
    std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared(
234
0
            MemTrackerLimiter::Type::OTHER, fmt::format("IngestBinlog#TxnId={}", txn_id));
235
0
    SCOPED_ATTACH_TASK(mem_tracker);
236
237
0
    auto& request = arg->request;
238
239
0
    MonotonicStopWatch watch(true);
240
0
    int64_t total_download_bytes = 0;
241
0
    int64_t total_download_files = 0;
242
0
    TStatus tstatus;
243
0
    std::vector<std::string> download_success_files;
244
0
    std::unordered_map<std::string_view, uint64_t> elapsed_time_map;
245
0
    Defer defer {[=, &engine, &tstatus, ingest_binlog_tstatus = arg->tstatus, &watch,
246
0
                  &total_download_bytes, &total_download_files, &elapsed_time_map]() {
247
0
        g_ingest_binlog_latency << watch.elapsed_time_microseconds();
248
0
        auto elapsed_time_ms = static_cast<int64_t>(watch.elapsed_time_milliseconds());
249
0
        double copy_rate = 0.0;
250
0
        if (elapsed_time_ms > 0) {
251
0
            copy_rate = (double)total_download_bytes / ((double)elapsed_time_ms) / 1000;
252
0
        }
253
0
        LOG(INFO) << "ingest binlog elapsed " << elapsed_time_ms << " ms, download "
254
0
                  << total_download_files << " files, total " << total_download_bytes
255
0
                  << " bytes, avg rate " << copy_rate
256
0
                  << " MB/s. result: " << apache::thrift::ThriftDebugString(tstatus);
257
0
        if (config::ingest_binlog_elapsed_threshold_ms >= 0 &&
258
0
            elapsed_time_ms > config::ingest_binlog_elapsed_threshold_ms) {
259
0
            auto elapsed_details_view =
260
0
                    elapsed_time_map | std::views::transform([](const auto& pair) {
261
0
                        return fmt::format("{}:{}", pair.first, pair.second);
262
0
                    });
263
0
            std::string elapsed_details = fmt::format("{}", fmt::join(elapsed_details_view, ", "));
264
0
            LOG(WARNING) << "ingest binlog elapsed " << elapsed_time_ms << " ms, "
265
0
                         << elapsed_details;
266
0
        }
267
0
        if (tstatus.status_code != TStatusCode::OK) {
268
            // abort txn
269
0
            engine.txn_manager()->abort_txn(partition_id, txn_id, local_tablet_id,
270
0
                                            local_tablet_uid);
271
            // delete all successfully downloaded files
272
0
            LOG(WARNING) << "will delete downloaded success files due to error " << tstatus;
273
0
            std::vector<io::Path> paths;
274
0
            for (const auto& file : download_success_files) {
275
0
                paths.emplace_back(file);
276
0
                LOG(WARNING) << "will delete downloaded success file " << file << " due to error";
277
0
            }
278
0
            static_cast<void>(io::global_local_filesystem()->batch_delete(paths));
279
0
            LOG(WARNING) << "done delete downloaded success files due to error " << tstatus;
280
0
        }
281
282
0
        if (ingest_binlog_tstatus) {
283
0
            *ingest_binlog_tstatus = std::move(tstatus);
284
0
        }
285
0
    }};
286
287
0
    auto set_tstatus = [&tstatus](TStatusCode::type code, std::string error_msg) {
288
0
        tstatus.__set_status_code(code);
289
0
        tstatus.__isset.error_msgs = true;
290
0
        tstatus.error_msgs.push_back(std::move(error_msg));
291
0
    };
292
293
0
    auto estimate_download_timeout = [](int64_t file_size) {
294
0
        uint64_t estimate_timeout = file_size / config::download_low_speed_limit_kbps / 1024;
295
0
        if (estimate_timeout < config::download_low_speed_time) {
296
0
            estimate_timeout = config::download_low_speed_time;
297
0
        }
298
0
        return estimate_timeout;
299
0
    };
300
301
    // Step 3: get binlog info
302
0
    auto binlog_api_url = fmt::format("http://{}:{}/api/_binlog/_download", request.remote_host,
303
0
                                      request.remote_port);
304
0
    constexpr int max_retry = 3;
305
306
0
    auto get_binlog_info_url =
307
0
            fmt::format("{}?method={}&tablet_id={}&binlog_version={}", binlog_api_url,
308
0
                        "get_binlog_info", request.remote_tablet_id, request.binlog_version);
309
0
    std::string binlog_info;
310
0
    auto get_binlog_info_cb = [&get_binlog_info_url, &binlog_info](HttpClient* client) {
311
0
        RETURN_IF_ERROR(client->init(get_binlog_info_url));
312
0
        client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
313
0
        return client->execute(&binlog_info);
314
0
    };
315
0
    auto status = _exec_http_req(client, max_retry, 1, get_binlog_info_cb);
316
0
    if (!status.ok()) {
317
0
        LOG(WARNING) << "failed to get binlog info from " << get_binlog_info_url
318
0
                     << ", status=" << status.to_string();
319
0
        status.to_thrift(&tstatus);
320
0
        return;
321
0
    }
322
0
    elapsed_time_map.emplace("get_binlog_info", watch.elapsed_time_microseconds());
323
324
0
    std::vector<std::string> binlog_info_parts = absl::StrSplit(binlog_info, ":");
325
0
    if (binlog_info_parts.size() != 2) {
326
0
        status = Status::RuntimeError("failed to parse binlog info into 2 parts: {}", binlog_info);
327
0
        LOG(WARNING) << "failed to get binlog info from " << get_binlog_info_url
328
0
                     << ", status=" << status.to_string();
329
0
        status.to_thrift(&tstatus);
330
0
        return;
331
0
    }
332
0
    std::string remote_rowset_id = std::move(binlog_info_parts[0]);
333
0
    int64_t num_segments = -1;
334
0
    try {
335
0
        num_segments = std::stoll(binlog_info_parts[1]);
336
0
    } catch (std::exception& e) {
337
0
        status = Status::RuntimeError("failed to parse num segments from binlog info {}: {}",
338
0
                                      binlog_info, e.what());
339
0
        LOG(WARNING) << "failed to get binlog info from " << get_binlog_info_url
340
0
                     << ", status=" << status;
341
0
        status.to_thrift(&tstatus);
342
0
        return;
343
0
    }
344
345
    // Step 4: get rowset meta
346
0
    auto get_rowset_meta_url = fmt::format(
347
0
            "{}?method={}&tablet_id={}&rowset_id={}&binlog_version={}", binlog_api_url,
348
0
            "get_rowset_meta", request.remote_tablet_id, remote_rowset_id, request.binlog_version);
349
0
    std::string rowset_meta_str;
350
0
    auto get_rowset_meta_cb = [&get_rowset_meta_url, &rowset_meta_str](HttpClient* client) {
351
0
        RETURN_IF_ERROR(client->init(get_rowset_meta_url));
352
0
        client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
353
0
        return client->execute(&rowset_meta_str);
354
0
    };
355
0
    status = _exec_http_req(client, max_retry, 1, get_rowset_meta_cb);
356
0
    if (!status.ok()) {
357
0
        LOG(WARNING) << "failed to get rowset meta from " << get_rowset_meta_url
358
0
                     << ", status=" << status.to_string();
359
0
        status.to_thrift(&tstatus);
360
0
        return;
361
0
    }
362
0
    elapsed_time_map.emplace("get_rowset_meta", watch.elapsed_time_microseconds());
363
364
0
    RowsetMetaPB rowset_meta_pb;
365
0
    if (!rowset_meta_pb.ParseFromString(rowset_meta_str)) {
366
0
        LOG(WARNING) << "failed to parse rowset meta from " << get_rowset_meta_url;
367
0
        status = Status::InternalError("failed to parse rowset meta");
368
0
        status.to_thrift(&tstatus);
369
0
        return;
370
0
    }
371
    // save source rowset id and tablet id
372
0
    rowset_meta_pb.set_source_rowset_id(remote_rowset_id);
373
0
    rowset_meta_pb.set_source_tablet_id(request.remote_tablet_id);
374
    // rewrite rowset meta
375
0
    rowset_meta_pb.set_tablet_id(local_tablet_id);
376
0
    rowset_meta_pb.set_partition_id(partition_id);
377
0
    rowset_meta_pb.set_tablet_schema_hash(local_tablet->tablet_meta()->schema_hash());
378
0
    rowset_meta_pb.set_txn_id(txn_id);
379
0
    rowset_meta_pb.set_rowset_state(RowsetStatePB::COMMITTED);
380
0
    auto rowset_meta = std::make_shared<RowsetMeta>();
381
0
    if (!rowset_meta->init_from_pb(rowset_meta_pb)) {
382
0
        LOG(WARNING) << "failed to init rowset meta from " << get_rowset_meta_url;
383
0
        status = Status::InternalError("failed to init rowset meta");
384
0
        status.to_thrift(&tstatus);
385
0
        return;
386
0
    }
387
0
    RowsetId new_rowset_id = engine.next_rowset_id();
388
0
    auto pending_rs_guard = engine.pending_local_rowsets().add(new_rowset_id);
389
0
    rowset_meta->set_rowset_id(new_rowset_id);
390
0
    rowset_meta->set_tablet_uid(local_tablet->tablet_uid());
391
392
    // Step 5: get all segment files
393
    // Step 5.1: get all segment files size
394
0
    std::vector<std::string> segment_file_urls;
395
0
    segment_file_urls.reserve(num_segments);
396
0
    std::vector<uint64_t> segment_file_sizes;
397
0
    segment_file_sizes.reserve(num_segments);
398
0
    for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) {
399
0
        auto get_segment_file_size_url = fmt::format(
400
0
                "{}?method={}&tablet_id={}&rowset_id={}&segment_index={}", binlog_api_url,
401
0
                "get_segment_file", request.remote_tablet_id, remote_rowset_id, segment_index);
402
0
        uint64_t segment_file_size;
403
0
        auto get_segment_file_size_cb = [&get_segment_file_size_url,
404
0
                                         &segment_file_size](HttpClient* client) {
405
0
            RETURN_IF_ERROR(client->init(get_segment_file_size_url));
406
0
            client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
407
0
            RETURN_IF_ERROR(client->head());
408
0
            return client->get_content_length(&segment_file_size);
409
0
        };
410
411
0
        status = _exec_http_req(client, max_retry, 1, get_segment_file_size_cb);
412
0
        if (!status.ok()) {
413
0
            LOG(WARNING) << "failed to get segment file size from " << get_segment_file_size_url
414
0
                         << ", status=" << status.to_string();
415
0
            status.to_thrift(&tstatus);
416
0
            return;
417
0
        }
418
419
0
        segment_file_sizes.push_back(segment_file_size);
420
0
        segment_file_urls.push_back(std::move(get_segment_file_size_url));
421
0
    }
422
0
    elapsed_time_map.emplace("get_segment_file_size", watch.elapsed_time_microseconds());
423
424
    // Step 5.2: check data capacity
425
0
    uint64_t total_size = std::accumulate(segment_file_sizes.begin(), segment_file_sizes.end(),
426
0
                                          0ULL); // NOLINT(bugprone-fold-init-type)
427
0
    if (!local_tablet->can_add_binlog(total_size)) {
428
0
        LOG(WARNING) << "failed to add binlog, no enough space, total_size=" << total_size
429
0
                     << ", tablet=" << local_tablet->tablet_id();
430
0
        status = Status::InternalError("no enough space");
431
0
        status.to_thrift(&tstatus);
432
0
        return;
433
0
    }
434
0
    total_download_bytes = total_size;
435
0
    total_download_files = num_segments;
436
437
    // Step 5.3: get all segment files
438
0
    for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) {
439
0
        auto segment_file_size = segment_file_sizes[segment_index];
440
0
        auto get_segment_file_url = segment_file_urls[segment_index];
441
0
        if (config::enable_download_md5sum_check) {
442
0
            get_segment_file_url = fmt::format("{}&acquire_md5=true", get_segment_file_url);
443
0
        }
444
445
0
        auto segment_path = local_segment_path(local_tablet->tablet_path(),
446
0
                                               rowset_meta->rowset_id().to_string(), segment_index);
447
0
        LOG(INFO) << "download segment file from " << get_segment_file_url << " to "
448
0
                  << segment_path;
449
0
        uint64_t estimate_timeout = estimate_download_timeout(segment_file_size);
450
0
        auto get_segment_file_cb = [&get_segment_file_url, &segment_path, segment_file_size,
451
0
                                    estimate_timeout, &download_success_files](HttpClient* client) {
452
0
            return _download_binlog_segment_file(client, get_segment_file_url, segment_path,
453
0
                                                 segment_file_size, estimate_timeout,
454
0
                                                 download_success_files);
455
0
        };
456
457
0
        status = _exec_http_req(client, max_retry, 1, get_segment_file_cb);
458
0
        if (!status.ok()) {
459
0
            LOG(WARNING) << "failed to get segment file from " << get_segment_file_url
460
0
                         << ", status=" << status.to_string();
461
0
            status.to_thrift(&tstatus);
462
0
            return;
463
0
        }
464
0
    }
465
0
    elapsed_time_map.emplace("get_segment_files", watch.elapsed_time_microseconds());
466
467
    // Step 6: get all segment index files
468
    // Step 6.1: get all segment index files size
469
0
    std::vector<std::string> segment_index_file_urls;
470
0
    std::vector<uint64_t> segment_index_file_sizes;
471
0
    std::vector<std::string> segment_index_file_names;
472
0
    auto tablet_schema = rowset_meta->tablet_schema();
473
0
    if (tablet_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) {
474
0
        for (const auto& index : tablet_schema->inverted_indexes()) {
475
0
            auto index_id = index->index_id();
476
0
            for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) {
477
0
                auto get_segment_index_file_size_url = fmt::format(
478
0
                        "{}?method={}&tablet_id={}&rowset_id={}&segment_index={}&segment_index_id={"
479
0
                        "}",
480
0
                        binlog_api_url, "get_segment_index_file", request.remote_tablet_id,
481
0
                        remote_rowset_id, segment_index, index_id);
482
0
                uint64_t segment_index_file_size;
483
0
                auto get_segment_index_file_size_cb =
484
0
                        [&get_segment_index_file_size_url,
485
0
                         &segment_index_file_size](HttpClient* client) {
486
0
                            RETURN_IF_ERROR(client->init(get_segment_index_file_size_url));
487
0
                            client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
488
0
                            RETURN_IF_ERROR(client->head());
489
0
                            return client->get_content_length(&segment_index_file_size);
490
0
                        };
491
492
0
                auto segment_path =
493
0
                        local_segment_path(local_tablet->tablet_path(),
494
0
                                           rowset_meta->rowset_id().to_string(), segment_index);
495
0
                segment_index_file_names.push_back(InvertedIndexDescriptor::get_index_file_path_v1(
496
0
                        InvertedIndexDescriptor::get_index_file_path_prefix(segment_path), index_id,
497
0
                        index->get_index_suffix()));
498
499
0
                status = _exec_http_req(client, max_retry, 1, get_segment_index_file_size_cb);
500
0
                if (!status.ok()) {
501
0
                    LOG(WARNING) << "failed to get segment file size from "
502
0
                                 << get_segment_index_file_size_url
503
0
                                 << ", status=" << status.to_string();
504
0
                    status.to_thrift(&tstatus);
505
0
                    return;
506
0
                }
507
508
0
                segment_index_file_sizes.push_back(segment_index_file_size);
509
0
                segment_index_file_urls.push_back(std::move(get_segment_index_file_size_url));
510
0
            }
511
0
        }
512
0
    } else {
513
0
        for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) {
514
0
            if (tablet_schema->has_inverted_index() || tablet_schema->has_ann_index()) {
515
0
                auto get_segment_index_file_size_url = fmt::format(
516
0
                        "{}?method={}&tablet_id={}&rowset_id={}&segment_index={}&segment_index_id={"
517
0
                        "}",
518
0
                        binlog_api_url, "get_segment_index_file", request.remote_tablet_id,
519
0
                        remote_rowset_id, segment_index, -1);
520
0
                uint64_t segment_index_file_size;
521
0
                auto get_segment_index_file_size_cb =
522
0
                        [&get_segment_index_file_size_url,
523
0
                         &segment_index_file_size](HttpClient* client) {
524
0
                            RETURN_IF_ERROR(client->init(get_segment_index_file_size_url));
525
0
                            client->set_timeout_ms(config::download_binlog_meta_timeout_ms);
526
0
                            RETURN_IF_ERROR(client->head());
527
0
                            return client->get_content_length(&segment_index_file_size);
528
0
                        };
529
0
                auto segment_path =
530
0
                        local_segment_path(local_tablet->tablet_path(),
531
0
                                           rowset_meta->rowset_id().to_string(), segment_index);
532
0
                segment_index_file_names.push_back(InvertedIndexDescriptor::get_index_file_path_v2(
533
0
                        InvertedIndexDescriptor::get_index_file_path_prefix(segment_path)));
534
535
0
                status = _exec_http_req(client, max_retry, 1, get_segment_index_file_size_cb);
536
0
                if (!status.ok()) {
537
0
                    LOG(WARNING) << "failed to get segment file size from "
538
0
                                 << get_segment_index_file_size_url
539
0
                                 << ", status=" << status.to_string();
540
0
                    status.to_thrift(&tstatus);
541
0
                    return;
542
0
                }
543
544
0
                segment_index_file_sizes.push_back(segment_index_file_size);
545
0
                segment_index_file_urls.push_back(std::move(get_segment_index_file_size_url));
546
0
            }
547
0
        }
548
0
    }
549
0
    elapsed_time_map.emplace("get_segment_index_file_size", watch.elapsed_time_microseconds());
550
551
    // Step 6.2: check data capacity
552
0
    uint64_t total_index_size =
553
0
            std::accumulate(segment_index_file_sizes.begin(), segment_index_file_sizes.end(),
554
0
                            0ULL); // NOLINT(bugprone-fold-init-type)
555
0
    if (!local_tablet->can_add_binlog(total_index_size)) {
556
0
        LOG(WARNING) << "failed to add binlog, no enough space, total_index_size="
557
0
                     << total_index_size << ", tablet=" << local_tablet->tablet_id();
558
0
        status = Status::InternalError("no enough space");
559
0
        status.to_thrift(&tstatus);
560
0
        return;
561
0
    }
562
0
    total_download_bytes += total_index_size;
563
0
    total_download_files += segment_index_file_urls.size();
564
565
    // Step 6.3: get all segment index files
566
0
    DCHECK(segment_index_file_sizes.size() == segment_index_file_names.size());
567
0
    DCHECK(segment_index_file_names.size() == segment_index_file_urls.size());
568
0
    for (int64_t i = 0; i < segment_index_file_urls.size(); ++i) {
569
0
        auto segment_index_file_size = segment_index_file_sizes[i];
570
0
        auto get_segment_index_file_url = segment_index_file_urls[i];
571
0
        if (config::enable_download_md5sum_check) {
572
0
            get_segment_index_file_url =
573
0
                    fmt::format("{}&acquire_md5=true", get_segment_index_file_url);
574
0
        }
575
576
0
        uint64_t estimate_timeout = estimate_download_timeout(segment_index_file_size);
577
0
        auto local_segment_index_path = segment_index_file_names[i];
578
0
        LOG(INFO) << fmt::format("download segment index file from {} to {}",
579
0
                                 get_segment_index_file_url, local_segment_index_path);
580
0
        auto get_segment_index_file_cb = [&get_segment_index_file_url, &local_segment_index_path,
581
0
                                          segment_index_file_size, estimate_timeout,
582
0
                                          &download_success_files](HttpClient* client) {
583
0
            return _download_binlog_index_file(client, get_segment_index_file_url,
584
0
                                               local_segment_index_path, segment_index_file_size,
585
0
                                               estimate_timeout, download_success_files);
586
0
        };
587
588
0
        status = _exec_http_req(client, max_retry, 1, get_segment_index_file_cb);
589
0
        if (!status.ok()) {
590
0
            LOG(WARNING) << "failed to get segment index file from " << get_segment_index_file_url
591
0
                         << ", status=" << status.to_string();
592
0
            status.to_thrift(&tstatus);
593
0
            return;
594
0
        }
595
0
    }
596
0
    elapsed_time_map.emplace("get_segment_index_files", watch.elapsed_time_microseconds());
597
598
    // Step 7: create rowset && calculate delete bitmap && commit
599
    // Step 7.1: create rowset
600
0
    RowsetSharedPtr rowset;
601
0
    status = RowsetFactory::create_rowset(local_tablet->tablet_schema(),
602
0
                                          local_tablet->tablet_path(), rowset_meta, &rowset);
603
0
    if (!status) {
604
0
        LOG(WARNING) << "failed to create rowset from rowset meta for remote tablet"
605
0
                     << ". rowset_id: " << rowset_meta_pb.rowset_id()
606
0
                     << ", rowset_type: " << rowset_meta_pb.rowset_type()
607
0
                     << ", remote_tablet_id=" << rowset_meta_pb.tablet_id() << ", txn_id=" << txn_id
608
0
                     << ", status=" << status.to_string();
609
0
        status.to_thrift(&tstatus);
610
0
        return;
611
0
    }
612
613
    // Step 7.2 calculate delete bitmap before commit
614
0
    auto calc_delete_bitmap_token = engine.calc_delete_bitmap_executor()->create_token();
615
0
    DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(local_tablet_id);
616
0
    RowsetIdUnorderedSet pre_rowset_ids;
617
0
    if (local_tablet->enable_unique_key_merge_on_write()) {
618
0
        auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get());
619
0
        std::vector<segment_v2::SegmentSharedPtr> segments;
620
0
        status = beta_rowset->load_segments(&segments);
621
0
        if (!status) {
622
0
            LOG(WARNING) << "failed to load segments from rowset"
623
0
                         << ". rowset_id: " << beta_rowset->rowset_id() << ", txn_id=" << txn_id
624
0
                         << ", status=" << status.to_string();
625
0
            status.to_thrift(&tstatus);
626
0
            return;
627
0
        }
628
0
        elapsed_time_map.emplace("load_segments", watch.elapsed_time_microseconds());
629
0
        if (segments.size() > 1) {
630
            // calculate delete bitmap between segments
631
0
            status = local_tablet->calc_delete_bitmap_between_segments(
632
0
                    rowset->tablet_schema(), rowset->rowset_id(), segments, delete_bitmap);
633
0
            if (!status) {
634
0
                LOG(WARNING) << "failed to calculate delete bitmap"
635
0
                             << ". tablet_id: " << local_tablet->tablet_id()
636
0
                             << ". rowset_id: " << rowset->rowset_id() << ", txn_id=" << txn_id
637
0
                             << ", status=" << status.to_string();
638
0
                status.to_thrift(&tstatus);
639
0
                return;
640
0
            }
641
0
            elapsed_time_map.emplace("calc_delete_bitmap", watch.elapsed_time_microseconds());
642
0
        }
643
644
0
        static_cast<void>(BaseTablet::commit_phase_update_delete_bitmap(
645
0
                local_tablet, rowset, pre_rowset_ids, delete_bitmap, segments, txn_id,
646
0
                calc_delete_bitmap_token.get(), nullptr));
647
0
        elapsed_time_map.emplace("commit_phase_update_delete_bitmap",
648
0
                                 watch.elapsed_time_microseconds());
649
0
        static_cast<void>(calc_delete_bitmap_token->wait());
650
0
        elapsed_time_map.emplace("wait_delete_bitmap", watch.elapsed_time_microseconds());
651
0
    }
652
653
    // Step 7.3: commit txn
654
0
    Status commit_txn_status = engine.txn_manager()->commit_txn(
655
0
            local_tablet->data_dir()->get_meta(), rowset_meta->partition_id(),
656
0
            rowset_meta->txn_id(), rowset_meta->tablet_id(), local_tablet->tablet_uid(),
657
0
            rowset_meta->load_id(), rowset, std::move(pending_rs_guard), false);
658
0
    if (!commit_txn_status && !commit_txn_status.is<ErrorCode::PUSH_TRANSACTION_ALREADY_EXIST>()) {
659
0
        auto err_msg = fmt::format(
660
0
                "failed to commit txn for remote tablet. rowset_id: {}, remote_tablet_id={}, "
661
0
                "txn_id={}, status={}",
662
0
                rowset_meta->rowset_id().to_string(), rowset_meta->tablet_id(),
663
0
                rowset_meta->txn_id(), commit_txn_status.to_string());
664
0
        LOG(WARNING) << err_msg;
665
0
        set_tstatus(TStatusCode::RUNTIME_ERROR, std::move(err_msg));
666
0
        return;
667
0
    }
668
0
    elapsed_time_map.emplace("commit_txn", watch.elapsed_time_microseconds());
669
670
0
    if (local_tablet->enable_unique_key_merge_on_write()) {
671
0
        engine.txn_manager()->set_txn_related_delete_bitmap(partition_id, txn_id, local_tablet_id,
672
0
                                                            local_tablet->tablet_uid(), true,
673
0
                                                            delete_bitmap, pre_rowset_ids, nullptr);
674
0
        elapsed_time_map.emplace("set_txn_related_delete_bitmap",
675
0
                                 watch.elapsed_time_microseconds());
676
0
    }
677
678
0
    tstatus.__set_status_code(TStatusCode::OK);
679
0
}
680
} // namespace
681
682
BaseBackendService::BaseBackendService(ExecEnv* exec_env)
683
7
        : _exec_env(exec_env), _agent_server(new AgentServer(exec_env, exec_env->cluster_info())) {}
684
685
3
BaseBackendService::~BaseBackendService() = default;
686
687
BackendService::BackendService(StorageEngine& engine, ExecEnv* exec_env)
688
6
        : BaseBackendService(exec_env), _engine(engine) {}
689
690
BackendService::~BackendService() = default;
691
692
Status BackendService::create_service(StorageEngine& engine, ExecEnv* exec_env, int port,
693
                                      std::unique_ptr<ThriftServer>* server,
694
6
                                      std::shared_ptr<doris::BackendService> service) {
695
6
    service->_agent_server->start_workers(engine, exec_env);
696
    // TODO: do we want a BoostThreadFactory?
697
    // TODO: we want separate thread factories here, so that fe requests can't starve
698
    // be requests
699
    // std::shared_ptr<TProcessor> be_processor = std::make_shared<BackendServiceProcessor>(service);
700
6
    auto be_processor = std::make_shared<BackendServiceProcessor>(service);
701
702
6
    *server = std::make_unique<ThriftServer>("backend", be_processor, port,
703
6
                                             config::be_service_threads);
704
705
6
    LOG(INFO) << "Doris BackendService listening on " << port;
706
707
6
    auto thread_num = config::ingest_binlog_work_pool_size;
708
6
    if (thread_num < 0) {
709
6
        LOG(INFO) << fmt::format("ingest binlog thread pool size is {}, so we will in sync mode",
710
6
                                 thread_num);
711
6
        return Status::OK();
712
6
    }
713
714
0
    if (thread_num == 0) {
715
0
        thread_num = std::thread::hardware_concurrency();
716
0
    }
717
0
    static_cast<void>(doris::ThreadPoolBuilder("IngestBinlog")
718
0
                              .set_min_threads(thread_num)
719
0
                              .set_max_threads(thread_num * 2)
720
0
                              .build(&(service->_ingest_binlog_workers)));
721
0
    LOG(INFO) << fmt::format("ingest binlog thread pool size is {}, in async mode", thread_num);
722
0
    return Status::OK();
723
6
}
724
725
24
void BackendService::get_tablet_stat(TTabletStatResult& result) {
726
24
    _engine.tablet_manager()->get_tablet_stat(&result);
727
24
}
728
729
0
int64_t BackendService::get_trash_used_capacity() {
730
0
    int64_t result = 0;
731
732
0
    std::vector<DataDirInfo> data_dir_infos;
733
0
    static_cast<void>(_engine.get_all_data_dir_info(&data_dir_infos, false /*do not update */));
734
735
    // uses excute sql `show trash`, then update backend trash capacity too.
736
0
    _engine.notify_listener("REPORT_DISK_STATE");
737
738
0
    for (const auto& root_path_info : data_dir_infos) {
739
0
        result += root_path_info.trash_used_capacity;
740
0
    }
741
742
0
    return result;
743
0
}
744
745
0
void BackendService::get_disk_trash_used_capacity(std::vector<TDiskTrashInfo>& diskTrashInfos) {
746
0
    std::vector<DataDirInfo> data_dir_infos;
747
0
    static_cast<void>(_engine.get_all_data_dir_info(&data_dir_infos, false /*do not update */));
748
749
    // uses excute sql `show trash on <be>`, then update backend trash capacity too.
750
0
    _engine.notify_listener("REPORT_DISK_STATE");
751
752
0
    for (const auto& root_path_info : data_dir_infos) {
753
0
        TDiskTrashInfo diskTrashInfo;
754
0
        diskTrashInfo.__set_root_path(root_path_info.path);
755
0
        diskTrashInfo.__set_state(root_path_info.is_used ? "ONLINE" : "OFFLINE");
756
0
        diskTrashInfo.__set_trash_used_capacity(root_path_info.trash_used_capacity);
757
0
        diskTrashInfos.push_back(diskTrashInfo);
758
0
    }
759
0
}
760
761
void BaseBackendService::submit_routine_load_task(TStatus& t_status,
762
70
                                                  const std::vector<TRoutineLoadTask>& tasks) {
763
70
    for (auto& task : tasks) {
764
70
        Status st = _exec_env->routine_load_task_executor()->submit_task(task);
765
70
        if (!st.ok()) {
766
0
            LOG(WARNING) << "failed to submit routine load task. job id: " << task.job_id
767
0
                         << " task id: " << task.id;
768
0
            return st.to_thrift(&t_status);
769
0
        }
770
70
    }
771
772
70
    return Status::OK().to_thrift(&t_status);
773
70
}
774
775
/*
776
 * 1. validate user privilege (todo)
777
 * 2. FragmentMgr#exec_plan_fragment
778
 */
779
3
void BaseBackendService::open_scanner(TScanOpenResult& result_, const TScanOpenParams& params) {
780
3
    TStatus t_status;
781
3
    TUniqueId fragment_instance_id = generate_uuid();
782
    // A query_id is randomly generated to replace t_query_plan_info.query_id.
783
    // external query does not need to report anything to FE, so the query_id can be changed.
784
    // Otherwise, multiple independent concurrent open tablet scanners have the same query_id.
785
    // when one of the scanners ends, the other scanners will be canceled through FragmentMgr.cancel(query_id).
786
3
    TUniqueId query_id = generate_uuid();
787
3
    std::shared_ptr<ScanContext> p_context;
788
3
    static_cast<void>(_exec_env->external_scan_context_mgr()->create_scan_context(&p_context));
789
3
    p_context->fragment_instance_id = fragment_instance_id;
790
3
    p_context->offset = 0;
791
3
    p_context->last_access_time = time(nullptr);
792
3
    if (params.__isset.keep_alive_min) {
793
0
        p_context->keep_alive_min = params.keep_alive_min;
794
3
    } else {
795
3
        p_context->keep_alive_min = 5;
796
3
    }
797
798
3
    Status exec_st;
799
3
    TQueryPlanInfo t_query_plan_info;
800
3
    {
801
3
        const std::string& opaqued_query_plan = params.opaqued_query_plan;
802
3
        std::string query_plan_info;
803
        // base64 decode query plan
804
3
        if (!base64_decode(opaqued_query_plan, &query_plan_info)) {
805
0
            LOG(WARNING) << "open context error: base64_decode decode opaqued_query_plan failure";
806
0
            std::stringstream msg;
807
0
            msg << "query_plan_info: " << query_plan_info
808
0
                << " validate error, should not be modified after returned Doris FE processed";
809
0
            exec_st = Status::InvalidArgument(msg.str());
810
0
        }
811
812
3
        const uint8_t* buf = (const uint8_t*)query_plan_info.data();
813
3
        uint32_t len = (uint32_t)query_plan_info.size();
814
        // deserialize TQueryPlanInfo
815
3
        auto st = deserialize_thrift_msg(buf, &len, false, &t_query_plan_info);
816
3
        if (!st.ok()) {
817
0
            LOG(WARNING) << "open context error: deserialize TQueryPlanInfo failure";
818
0
            std::stringstream msg;
819
0
            msg << "query_plan_info: " << query_plan_info
820
0
                << " deserialize error, should not be modified after returned Doris FE processed";
821
0
            exec_st = Status::InvalidArgument(msg.str());
822
0
        }
823
3
        p_context->query_id = query_id;
824
3
    }
825
3
    std::vector<TScanColumnDesc> selected_columns;
826
3
    if (exec_st.ok()) {
827
        // start the scan procedure
828
3
        LOG(INFO) << fmt::format(
829
3
                "exec external scanner, old_query_id = {}, new_query_id = {}, fragment_instance_id "
830
3
                "= {}",
831
3
                print_id(t_query_plan_info.query_id), print_id(query_id),
832
3
                print_id(fragment_instance_id));
833
3
        exec_st = _exec_env->fragment_mgr()->exec_external_plan_fragment(
834
3
                params, t_query_plan_info, query_id, fragment_instance_id, &selected_columns);
835
3
    }
836
3
    exec_st.to_thrift(&t_status);
837
    //return status
838
    // t_status.status_code = TStatusCode::OK;
839
3
    result_.status = t_status;
840
3
    result_.__set_context_id(p_context->context_id);
841
3
    result_.__set_selected_columns(selected_columns);
842
3
}
843
844
// fetch result from polling the queue, should always maintain the context offset, otherwise inconsistent result
845
5
void BaseBackendService::get_next(TScanBatchResult& result_, const TScanNextBatchParams& params) {
846
5
    std::string context_id = params.context_id;
847
5
    u_int64_t offset = params.offset;
848
5
    TStatus t_status;
849
5
    std::shared_ptr<ScanContext> context;
850
5
    Status st = _exec_env->external_scan_context_mgr()->get_scan_context(context_id, &context);
851
5
    if (!st.ok()) {
852
0
        st.to_thrift(&t_status);
853
0
        result_.status = t_status;
854
0
        return;
855
0
    }
856
5
    if (offset != context->offset) {
857
0
        LOG(ERROR) << "getNext error: context offset [" << context->offset << " ]"
858
0
                   << " ,client offset [ " << offset << " ]";
859
        // invalid offset
860
0
        t_status.status_code = TStatusCode::NOT_FOUND;
861
0
        t_status.error_msgs.push_back(
862
0
                absl::Substitute("context_id=$0, send_offset=$1, context_offset=$2", context_id,
863
0
                                 offset, context->offset));
864
0
        result_.status = t_status;
865
5
    } else {
866
        // during accessing, should disabled last_access_time
867
5
        context->last_access_time = -1;
868
5
        TUniqueId fragment_instance_id = context->fragment_instance_id;
869
5
        std::shared_ptr<arrow::RecordBatch> record_batch;
870
5
        bool eos;
871
872
5
        st = _exec_env->result_queue_mgr()->fetch_result(fragment_instance_id, &record_batch, &eos);
873
5
        if (st.ok()) {
874
5
            result_.__set_eos(eos);
875
5
            if (!eos) {
876
3
                std::string record_batch_str;
877
3
                st = serialize_record_batch(*record_batch, &record_batch_str);
878
3
                st.to_thrift(&t_status);
879
3
                if (st.ok()) {
880
                    // avoid copy large string
881
3
                    result_.rows = std::move(record_batch_str);
882
                    // set __isset
883
3
                    result_.__isset.rows = true;
884
3
                    context->offset += record_batch->num_rows();
885
3
                }
886
3
            }
887
5
        } else {
888
0
            LOG(WARNING) << "fragment_instance_id [" << print_id(fragment_instance_id)
889
0
                         << "] fetch result status [" << st.to_string() + "]";
890
0
            st.to_thrift(&t_status);
891
0
            result_.status = t_status;
892
0
        }
893
5
    }
894
5
    context->last_access_time = time(nullptr);
895
5
}
896
897
2
void BaseBackendService::close_scanner(TScanCloseResult& result_, const TScanCloseParams& params) {
898
2
    std::string context_id = params.context_id;
899
2
    TStatus t_status;
900
2
    Status st = _exec_env->external_scan_context_mgr()->clear_scan_context(context_id);
901
2
    st.to_thrift(&t_status);
902
2
    result_.status = t_status;
903
2
}
904
905
void BackendService::get_stream_load_record(TStreamLoadRecordResult& result,
906
12
                                            int64_t last_stream_record_time) {
907
12
    BaseBackendService::get_stream_load_record(result, last_stream_record_time,
908
12
                                               _engine.get_stream_load_recorder());
909
12
}
910
911
0
void BackendService::check_storage_format(TCheckStorageFormatResult& result) {
912
0
    _engine.tablet_manager()->get_all_tablets_storage_format(&result);
913
0
}
914
915
void BackendService::make_snapshot(TAgentResult& return_value,
916
0
                                   const TSnapshotRequest& snapshot_request) {
917
0
    std::string snapshot_path;
918
0
    bool allow_incremental_clone = false;
919
0
    Status status = _engine.snapshot_mgr()->make_snapshot(snapshot_request, &snapshot_path,
920
0
                                                          &allow_incremental_clone);
921
0
    if (!status) {
922
0
        LOG_WARNING("failed to make snapshot")
923
0
                .tag("tablet_id", snapshot_request.tablet_id)
924
0
                .tag("schema_hash", snapshot_request.schema_hash)
925
0
                .error(status);
926
0
    } else {
927
0
        LOG_INFO("successfully make snapshot")
928
0
                .tag("tablet_id", snapshot_request.tablet_id)
929
0
                .tag("schema_hash", snapshot_request.schema_hash)
930
0
                .tag("snapshot_path", snapshot_path);
931
0
        return_value.__set_snapshot_path(snapshot_path);
932
0
        return_value.__set_allow_incremental_clone(allow_incremental_clone);
933
0
    }
934
935
0
    status.to_thrift(&return_value.status);
936
0
    return_value.__set_snapshot_version(snapshot_request.preferred_snapshot_version);
937
0
}
938
939
void BackendService::release_snapshot(TAgentResult& return_value,
940
0
                                      const std::string& snapshot_path) {
941
0
    Status status = _engine.snapshot_mgr()->release_snapshot(snapshot_path);
942
0
    if (!status) {
943
0
        LOG_WARNING("failed to release snapshot").tag("snapshot_path", snapshot_path).error(status);
944
0
    } else {
945
0
        LOG_INFO("successfully release snapshot").tag("snapshot_path", snapshot_path);
946
0
    }
947
0
    status.to_thrift(&return_value.status);
948
0
}
949
950
void BackendService::ingest_binlog(TIngestBinlogResult& result,
951
0
                                   const TIngestBinlogRequest& request) {
952
0
    LOG(INFO) << "ingest binlog. request: " << apache::thrift::ThriftDebugString(request);
953
954
0
    TStatus tstatus;
955
0
    Defer defer {[&result, &tstatus]() {
956
0
        result.__set_status(tstatus);
957
0
        LOG(INFO) << "ingest binlog. result: " << apache::thrift::ThriftDebugString(result);
958
0
    }};
959
960
0
    auto set_tstatus = [&tstatus](TStatusCode::type code, std::string error_msg) {
961
0
        tstatus.__set_status_code(code);
962
0
        tstatus.__isset.error_msgs = true;
963
0
        tstatus.error_msgs.push_back(std::move(error_msg));
964
0
    };
965
966
0
    if (!config::enable_feature_binlog) {
967
0
        set_tstatus(TStatusCode::RUNTIME_ERROR, "enable feature binlog is false");
968
0
        return;
969
0
    }
970
971
    /// Check args: txn_id, remote_tablet_id, binlog_version, remote_host, remote_port, partition_id, load_id
972
0
    if (!request.__isset.txn_id) {
973
0
        auto error_msg = "txn_id is empty";
974
0
        LOG(WARNING) << error_msg;
975
0
        set_tstatus(TStatusCode::ANALYSIS_ERROR, error_msg);
976
0
        return;
977
0
    }
978
0
    if (!request.__isset.remote_tablet_id) {
979
0
        auto error_msg = "remote_tablet_id is empty";
980
0
        LOG(WARNING) << error_msg;
981
0
        set_tstatus(TStatusCode::ANALYSIS_ERROR, error_msg);
982
0
        return;
983
0
    }
984
0
    if (!request.__isset.binlog_version) {
985
0
        auto error_msg = "binlog_version is empty";
986
0
        LOG(WARNING) << error_msg;
987
0
        set_tstatus(TStatusCode::ANALYSIS_ERROR, error_msg);
988
0
        return;
989
0
    }
990
0
    if (!request.__isset.remote_host) {
991
0
        auto error_msg = "remote_host is empty";
992
0
        LOG(WARNING) << error_msg;
993
0
        set_tstatus(TStatusCode::ANALYSIS_ERROR, error_msg);
994
0
        return;
995
0
    }
996
0
    if (!request.__isset.remote_port) {
997
0
        auto error_msg = "remote_port is empty";
998
0
        LOG(WARNING) << error_msg;
999
0
        set_tstatus(TStatusCode::ANALYSIS_ERROR, error_msg);
1000
0
        return;
1001
0
    }
1002
0
    if (!request.__isset.partition_id) {
1003
0
        auto error_msg = "partition_id is empty";
1004
0
        LOG(WARNING) << error_msg;
1005
0
        set_tstatus(TStatusCode::ANALYSIS_ERROR, error_msg);
1006
0
        return;
1007
0
    }
1008
0
    if (!request.__isset.local_tablet_id) {
1009
0
        auto error_msg = "local_tablet_id is empty";
1010
0
        LOG(WARNING) << error_msg;
1011
0
        set_tstatus(TStatusCode::ANALYSIS_ERROR, error_msg);
1012
0
        return;
1013
0
    }
1014
0
    if (!request.__isset.load_id) {
1015
0
        auto error_msg = "load_id is empty";
1016
0
        LOG(WARNING) << error_msg;
1017
0
        set_tstatus(TStatusCode::ANALYSIS_ERROR, error_msg);
1018
0
        return;
1019
0
    }
1020
1021
0
    auto txn_id = request.txn_id;
1022
    // Step 1: get local tablet
1023
0
    auto const& local_tablet_id = request.local_tablet_id;
1024
0
    auto local_tablet = _engine.tablet_manager()->get_tablet(local_tablet_id);
1025
0
    if (local_tablet == nullptr) {
1026
0
        auto error_msg = fmt::format("tablet {} not found", local_tablet_id);
1027
0
        LOG(WARNING) << error_msg;
1028
0
        set_tstatus(TStatusCode::TABLET_MISSING, std::move(error_msg));
1029
0
        return;
1030
0
    }
1031
1032
    // Step 2: check txn, create txn, prepare_txn will check it
1033
0
    auto partition_id = request.partition_id;
1034
0
    auto& load_id = request.load_id;
1035
0
    auto is_ingrest = true;
1036
0
    PUniqueId p_load_id;
1037
0
    p_load_id.set_hi(load_id.hi);
1038
0
    p_load_id.set_lo(load_id.lo);
1039
1040
0
    {
1041
        // TODO: Before push_lock is not held, but I think it should hold.
1042
0
        auto status = local_tablet->prepare_txn(partition_id, txn_id, p_load_id, is_ingrest);
1043
0
        if (!status.ok()) {
1044
0
            LOG(WARNING) << "prepare txn failed. txn_id=" << txn_id
1045
0
                         << ", status=" << status.to_string();
1046
0
            status.to_thrift(&tstatus);
1047
0
            return;
1048
0
        }
1049
0
    }
1050
1051
0
    bool is_async = (_ingest_binlog_workers != nullptr);
1052
0
    result.__set_is_async(is_async);
1053
1054
0
    auto ingest_binlog_func = [=, this, tstatus = &tstatus]() {
1055
0
        IngestBinlogArg ingest_binlog_arg = {
1056
0
                .txn_id = txn_id,
1057
0
                .partition_id = partition_id,
1058
0
                .local_tablet_id = local_tablet_id,
1059
0
                .local_tablet = local_tablet,
1060
1061
0
                .request = request,
1062
0
                .tstatus = is_async ? nullptr : tstatus,
1063
0
        };
1064
1065
0
        _ingest_binlog(_engine, &ingest_binlog_arg);
1066
0
    };
1067
1068
0
    if (is_async) {
1069
0
        auto status = _ingest_binlog_workers->submit_func(std::move(ingest_binlog_func));
1070
0
        if (!status.ok()) {
1071
0
            status.to_thrift(&tstatus);
1072
0
            return;
1073
0
        }
1074
0
    } else {
1075
0
        ingest_binlog_func();
1076
0
    }
1077
0
}
1078
1079
void BackendService::query_ingest_binlog(TQueryIngestBinlogResult& result,
1080
0
                                         const TQueryIngestBinlogRequest& request) {
1081
0
    LOG(INFO) << "query ingest binlog. request: " << apache::thrift::ThriftDebugString(request);
1082
1083
0
    auto set_result = [&](TIngestBinlogStatus::type status, std::string error_msg) {
1084
0
        result.__set_status(status);
1085
0
        result.__set_err_msg(std::move(error_msg));
1086
0
    };
1087
1088
    /// Check args: txn_id, partition_id, tablet_id, load_id
1089
0
    if (!request.__isset.txn_id) {
1090
0
        auto error_msg = "txn_id is empty";
1091
0
        LOG(WARNING) << error_msg;
1092
0
        set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg);
1093
0
        return;
1094
0
    }
1095
0
    if (!request.__isset.partition_id) {
1096
0
        auto error_msg = "partition_id is empty";
1097
0
        LOG(WARNING) << error_msg;
1098
0
        set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg);
1099
0
        return;
1100
0
    }
1101
0
    if (!request.__isset.tablet_id) {
1102
0
        auto error_msg = "tablet_id is empty";
1103
0
        LOG(WARNING) << error_msg;
1104
0
        set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg);
1105
0
        return;
1106
0
    }
1107
0
    if (!request.__isset.load_id) {
1108
0
        auto error_msg = "load_id is empty";
1109
0
        LOG(WARNING) << error_msg;
1110
0
        set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg);
1111
0
        return;
1112
0
    }
1113
1114
0
    auto partition_id = request.partition_id;
1115
0
    auto txn_id = request.txn_id;
1116
0
    auto tablet_id = request.tablet_id;
1117
1118
    // Step 1: get local tablet
1119
0
    auto local_tablet = _engine.tablet_manager()->get_tablet(tablet_id);
1120
0
    if (local_tablet == nullptr) {
1121
0
        auto error_msg = fmt::format("tablet {} not found", tablet_id);
1122
0
        LOG(WARNING) << error_msg;
1123
0
        set_result(TIngestBinlogStatus::NOT_FOUND, std::move(error_msg));
1124
0
        return;
1125
0
    }
1126
1127
    // Step 2: get txn state
1128
0
    auto tablet_uid = local_tablet->tablet_uid();
1129
0
    auto txn_state =
1130
0
            _engine.txn_manager()->get_txn_state(partition_id, txn_id, tablet_id, tablet_uid);
1131
0
    switch (txn_state) {
1132
0
    case TxnState::NOT_FOUND:
1133
0
        result.__set_status(TIngestBinlogStatus::NOT_FOUND);
1134
0
        break;
1135
0
    case TxnState::PREPARED:
1136
0
        result.__set_status(TIngestBinlogStatus::DOING);
1137
0
        break;
1138
0
    case TxnState::COMMITTED:
1139
0
        result.__set_status(TIngestBinlogStatus::OK);
1140
0
        break;
1141
0
    case TxnState::ROLLEDBACK:
1142
0
        result.__set_status(TIngestBinlogStatus::FAILED);
1143
0
        break;
1144
0
    case TxnState::ABORTED:
1145
0
        result.__set_status(TIngestBinlogStatus::FAILED);
1146
0
        break;
1147
0
    case TxnState::DELETED:
1148
0
        result.__set_status(TIngestBinlogStatus::FAILED);
1149
0
        break;
1150
0
    }
1151
0
}
1152
1153
0
void BaseBackendService::get_tablet_stat(TTabletStatResult& result) {
1154
0
    LOG(ERROR) << "get_tablet_stat is not implemented";
1155
0
}
1156
1157
3
int64_t BaseBackendService::get_trash_used_capacity() {
1158
3
    LOG(ERROR) << "get_trash_used_capacity is not implemented";
1159
3
    return 0;
1160
3
}
1161
1162
void BaseBackendService::get_stream_load_record(TStreamLoadRecordResult& result,
1163
0
                                                int64_t last_stream_record_time) {
1164
0
    LOG(ERROR) << "get_stream_load_record is not implemented";
1165
0
}
1166
1167
void BaseBackendService::get_stream_load_record(
1168
        TStreamLoadRecordResult& result, int64_t last_stream_record_time,
1169
48
        std::shared_ptr<StreamLoadRecorder> stream_load_recorder) {
1170
48
    if (stream_load_recorder != nullptr) {
1171
48
        std::map<std::string, std::string> records;
1172
48
        auto st = stream_load_recorder->get_batch(std::to_string(last_stream_record_time),
1173
48
                                                  config::stream_load_record_batch_size, &records);
1174
48
        if (st.ok()) {
1175
48
            LOG(INFO) << "get_batch stream_load_record rocksdb successfully. records size: "
1176
48
                      << records.size()
1177
48
                      << ", last_stream_load_timestamp: " << last_stream_record_time;
1178
48
            std::map<std::string, TStreamLoadRecord> stream_load_record_batch;
1179
48
            auto it = records.begin();
1180
2.18k
            for (; it != records.end(); ++it) {
1181
2.13k
                TStreamLoadRecord stream_load_item;
1182
2.13k
                StreamLoadContext::parse_stream_load_record(it->second, stream_load_item);
1183
2.13k
                stream_load_record_batch.emplace(it->first.c_str(), stream_load_item);
1184
2.13k
            }
1185
48
            result.__set_stream_load_record(stream_load_record_batch);
1186
48
        }
1187
48
    } else {
1188
0
        LOG(WARNING) << "stream_load_recorder is null.";
1189
0
    }
1190
48
}
1191
1192
0
void BaseBackendService::get_disk_trash_used_capacity(std::vector<TDiskTrashInfo>& diskTrashInfos) {
1193
0
    LOG(ERROR) << "get_disk_trash_used_capacity is not implemented";
1194
0
}
1195
1196
void BaseBackendService::make_snapshot(TAgentResult& return_value,
1197
0
                                       const TSnapshotRequest& snapshot_request) {
1198
0
    LOG(ERROR) << "make_snapshot is not implemented";
1199
0
    return_value.__set_status(Status::NotSupported("make_snapshot is not implemented").to_thrift());
1200
0
}
1201
1202
void BaseBackendService::release_snapshot(TAgentResult& return_value,
1203
0
                                          const std::string& snapshot_path) {
1204
0
    LOG(ERROR) << "release_snapshot is not implemented";
1205
0
    return_value.__set_status(
1206
0
            Status::NotSupported("release_snapshot is not implemented").to_thrift());
1207
0
}
1208
1209
0
void BaseBackendService::check_storage_format(TCheckStorageFormatResult& result) {
1210
0
    LOG(ERROR) << "check_storage_format is not implemented";
1211
0
}
1212
1213
void BaseBackendService::ingest_binlog(TIngestBinlogResult& result,
1214
0
                                       const TIngestBinlogRequest& request) {
1215
0
    LOG(ERROR) << "ingest_binlog is not implemented";
1216
0
    result.__set_status(Status::NotSupported("ingest_binlog is not implemented").to_thrift());
1217
0
}
1218
1219
void BaseBackendService::query_ingest_binlog(TQueryIngestBinlogResult& result,
1220
0
                                             const TQueryIngestBinlogRequest& request) {
1221
0
    LOG(ERROR) << "query_ingest_binlog is not implemented";
1222
0
    result.__set_status(TIngestBinlogStatus::UNKNOWN);
1223
0
    result.__set_err_msg("query_ingest_binlog is not implemented");
1224
0
}
1225
1226
void BaseBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& response,
1227
0
                                             const TWarmUpCacheAsyncRequest& request) {
1228
0
    LOG(ERROR) << "warm_up_cache_async is not implemented";
1229
0
    response.__set_status(
1230
0
            Status::NotSupported("warm_up_cache_async is not implemented").to_thrift());
1231
0
}
1232
1233
void BaseBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse& response,
1234
0
                                                   const TCheckWarmUpCacheAsyncRequest& request) {
1235
0
    LOG(ERROR) << "check_warm_up_cache_async is not implemented";
1236
0
    response.__set_status(
1237
0
            Status::NotSupported("check_warm_up_cache_async is not implemented").to_thrift());
1238
0
}
1239
1240
void BaseBackendService::sync_load_for_tablets(TSyncLoadForTabletsResponse& response,
1241
0
                                               const TSyncLoadForTabletsRequest& request) {
1242
0
    LOG(ERROR) << "sync_load_for_tablets is not implemented";
1243
0
}
1244
1245
void BaseBackendService::get_top_n_hot_partitions(TGetTopNHotPartitionsResponse& response,
1246
0
                                                  const TGetTopNHotPartitionsRequest& request) {
1247
0
    LOG(ERROR) << "get_top_n_hot_partitions is not implemented";
1248
0
}
1249
1250
void BaseBackendService::warm_up_tablets(TWarmUpTabletsResponse& response,
1251
0
                                         const TWarmUpTabletsRequest& request) {
1252
0
    LOG(ERROR) << "warm_up_tablets is not implemented";
1253
0
    response.__set_status(Status::NotSupported("warm_up_tablets is not implemented").to_thrift());
1254
0
}
1255
1256
void BaseBackendService::get_realtime_exec_status(TGetRealtimeExecStatusResponse& response,
1257
1
                                                  const TGetRealtimeExecStatusRequest& request) {
1258
1
    if (!request.__isset.id) {
1259
0
        LOG_WARNING("Invalidate argument, id is empty");
1260
0
        response.__set_status(Status::InvalidArgument("id is empty").to_thrift());
1261
0
        return;
1262
0
    }
1263
1264
1
    RuntimeProfile::Counter get_realtime_timer {TUnit::TIME_NS};
1265
1266
1
    Defer _print_log([&]() {
1267
1
        LOG_INFO("Getting realtime exec status of query {} , cost time {}", print_id(request.id),
1268
1
                 PrettyPrinter::print(get_realtime_timer.value(), get_realtime_timer.type()));
1269
1
    });
1270
1271
1
    SCOPED_TIMER(&get_realtime_timer);
1272
1273
1
    std::unique_ptr<TReportExecStatusParams> report_exec_status_params =
1274
1
            std::make_unique<TReportExecStatusParams>();
1275
1
    std::unique_ptr<TQueryStatistics> query_stats = std::make_unique<TQueryStatistics>();
1276
1277
1
    std::string req_type = request.__isset.req_type ? request.req_type : "profile";
1278
1
    Status st;
1279
1
    if (req_type == "stats") {
1280
1
        st = ExecEnv::GetInstance()->fragment_mgr()->get_query_statistics(request.id,
1281
1
                                                                          query_stats.get());
1282
1
        if (st.ok()) {
1283
1
            response.__set_query_stats(*query_stats);
1284
1
        }
1285
1
    } else {
1286
        // default is "profile"
1287
0
        st = ExecEnv::GetInstance()->fragment_mgr()->get_realtime_exec_status(
1288
0
                request.id, report_exec_status_params.get());
1289
0
        if (st.ok()) {
1290
0
            response.__set_report_exec_status_params(*report_exec_status_params);
1291
0
        }
1292
0
    }
1293
1294
1
    report_exec_status_params->__set_query_id(TUniqueId());
1295
1
    report_exec_status_params->__set_done(false);
1296
1
    response.__set_status(st.to_thrift());
1297
1
}
1298
1299
void BaseBackendService::get_dictionary_status(TDictionaryStatusList& result,
1300
1.22k
                                               const std::vector<int64_t>& dictionary_ids) {
1301
1.22k
    std::vector<TDictionaryStatus> dictionary_status;
1302
1.22k
    ExecEnv::GetInstance()->dict_factory()->get_dictionary_status(dictionary_status,
1303
1.22k
                                                                  dictionary_ids);
1304
1.22k
    result.__set_dictionary_status_list(dictionary_status);
1305
1.22k
    LOG(INFO) << "query for dictionary status, return " << result.dictionary_status_list.size()
1306
1.22k
              << " rows";
1307
1.22k
}
1308
1309
void BaseBackendService::test_storage_connectivity(TTestStorageConnectivityResponse& response,
1310
0
                                                   const TTestStorageConnectivityRequest& request) {
1311
0
    Status status = io::StorageConnectivityTester::test(request.type, request.properties);
1312
0
    response.__set_status(status.to_thrift());
1313
0
}
1314
1315
2
void BaseBackendService::get_python_envs(std::vector<TPythonEnvInfo>& result) {
1316
2
    result = PythonVersionManager::instance().env_infos_to_thrift();
1317
2
}
1318
1319
void BaseBackendService::get_python_packages(std::vector<TPythonPackageInfo>& result,
1320
1
                                             const std::string& python_version) {
1321
1
    PythonVersion version;
1322
1
    auto& manager = PythonVersionManager::instance();
1323
1
    THROW_IF_ERROR(manager.get_version(python_version, &version));
1324
1325
1
    std::vector<std::pair<std::string, std::string>> packages;
1326
1
    THROW_IF_ERROR(list_installed_packages(version, &packages));
1327
1
    result = manager.package_infos_to_thrift(packages);
1328
1
}
1329
1330
#include "common/compile_check_end.h"
1331
} // namespace doris