Coverage Report

Created: 2026-05-09 09:25

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/agent/task_worker_pool.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "agent/task_worker_pool.h"
19
20
#include <brpc/controller.h>
21
#include <fmt/format.h>
22
#include <gen_cpp/AgentService_types.h>
23
#include <gen_cpp/DataSinks_types.h>
24
#include <gen_cpp/HeartbeatService_types.h>
25
#include <gen_cpp/MasterService_types.h>
26
#include <gen_cpp/Status_types.h>
27
#include <gen_cpp/Types_types.h>
28
#include <unistd.h>
29
30
#include <algorithm>
31
// IWYU pragma: no_include <bits/chrono.h>
32
#include <thrift/protocol/TDebugProtocol.h>
33
34
#include <atomic>
35
#include <chrono> // IWYU pragma: keep
36
#include <ctime>
37
#include <functional>
38
#include <memory>
39
#include <mutex>
40
#include <shared_mutex>
41
#include <sstream>
42
#include <string>
43
#include <thread>
44
#include <type_traits>
45
#include <utility>
46
#include <vector>
47
48
#include "agent/utils.h"
49
#include "cloud/cloud_delete_task.h"
50
#include "cloud/cloud_engine_calc_delete_bitmap_task.h"
51
#include "cloud/cloud_schema_change_job.h"
52
#include "cloud/cloud_snapshot_loader.h"
53
#include "cloud/cloud_snapshot_mgr.h"
54
#include "cloud/cloud_tablet.h"
55
#include "cloud/cloud_tablet_mgr.h"
56
#include "cloud/config.h"
57
#include "common/config.h"
58
#include "common/logging.h"
59
#include "common/metrics/doris_metrics.h"
60
#include "common/status.h"
61
#include "io/fs/file_system.h"
62
#include "io/fs/hdfs_file_system.h"
63
#include "io/fs/local_file_system.h"
64
#include "io/fs/obj_storage_client.h"
65
#include "io/fs/path.h"
66
#include "io/fs/remote_file_system.h"
67
#include "io/fs/s3_file_system.h"
68
#include "runtime/exec_env.h"
69
#include "runtime/fragment_mgr.h"
70
#include "runtime/index_policy/index_policy_mgr.h"
71
#include "runtime/memory/global_memory_arbitrator.h"
72
#include "runtime/snapshot_loader.h"
73
#include "runtime/user_function_cache.h"
74
#include "service/backend_options.h"
75
#include "storage/compaction/cumulative_compaction_time_series_policy.h"
76
#include "storage/data_dir.h"
77
#include "storage/olap_common.h"
78
#include "storage/rowset/rowset_meta.h"
79
#include "storage/snapshot/snapshot_manager.h"
80
#include "storage/storage_engine.h"
81
#include "storage/storage_policy.h"
82
#include "storage/tablet/tablet.h"
83
#include "storage/tablet/tablet_manager.h"
84
#include "storage/tablet/tablet_meta.h"
85
#include "storage/tablet/tablet_schema.h"
86
#include "storage/task/engine_batch_load_task.h"
87
#include "storage/task/engine_checksum_task.h"
88
#include "storage/task/engine_clone_task.h"
89
#include "storage/task/engine_cloud_index_change_task.h"
90
#include "storage/task/engine_index_change_task.h"
91
#include "storage/task/engine_publish_version_task.h"
92
#include "storage/task/engine_storage_migration_task.h"
93
#include "storage/txn/txn_manager.h"
94
#include "storage/utils.h"
95
#include "udf/python/python_server.h"
96
#include "util/brpc_client_cache.h"
97
#include "util/debug_points.h"
98
#include "util/jni-util.h"
99
#include "util/mem_info.h"
100
#include "util/random.h"
101
#include "util/s3_util.h"
102
#include "util/stopwatch.hpp"
103
#include "util/threadpool.h"
104
#include "util/time.h"
105
#include "util/trace.h"
106
107
namespace doris {
108
using namespace ErrorCode;
109
110
namespace {
111
112
std::mutex s_task_signatures_mtx;
113
std::unordered_map<TTaskType::type, std::unordered_set<int64_t>> s_task_signatures;
114
115
std::atomic_ulong s_report_version(time(nullptr) * 100000);
116
117
20.0k
void increase_report_version() {
118
20.0k
    s_report_version.fetch_add(1, std::memory_order_relaxed);
119
20.0k
}
120
121
// FIXME(plat1ko): Paired register and remove task info
122
68.6k
bool register_task_info(const TTaskType::type task_type, int64_t signature) {
123
68.6k
    if (task_type == TTaskType::type::PUSH_STORAGE_POLICY ||
124
68.6k
        task_type == TTaskType::type::PUSH_COOLDOWN_CONF ||
125
68.6k
        task_type == TTaskType::type::COMPACTION) {
126
        // no need to report task of these types
127
15
        return true;
128
15
    }
129
130
68.6k
    if (signature == -1) { // No need to report task with unintialized signature
131
2.57k
        return true;
132
2.57k
    }
133
134
66.0k
    std::lock_guard lock(s_task_signatures_mtx);
135
66.0k
    auto& set = s_task_signatures[task_type];
136
66.0k
    return set.insert(signature).second;
137
68.6k
}
138
139
37.7k
void remove_task_info(const TTaskType::type task_type, int64_t signature) {
140
37.7k
    size_t queue_size;
141
37.7k
    {
142
37.7k
        std::lock_guard lock(s_task_signatures_mtx);
143
37.7k
        auto& set = s_task_signatures[task_type];
144
37.7k
        set.erase(signature);
145
37.7k
        queue_size = set.size();
146
37.7k
    }
147
148
18.4E
    VLOG_NOTICE << "remove task info. type=" << task_type << ", signature=" << signature
149
18.4E
                << ", queue_size=" << queue_size;
150
37.7k
}
151
152
37.2k
void finish_task(const TFinishTaskRequest& finish_task_request) {
153
    // Return result to FE
154
37.2k
    TMasterResult result;
155
37.2k
    uint32_t try_time = 0;
156
37.2k
    constexpr int TASK_FINISH_MAX_RETRY = 3;
157
37.5k
    while (try_time < TASK_FINISH_MAX_RETRY) {
158
37.5k
        DorisMetrics::instance()->finish_task_requests_total->increment(1);
159
37.5k
        Status client_status =
160
37.5k
                MasterServerClient::instance()->finish_task(finish_task_request, &result);
161
162
37.7k
        if (client_status.ok()) {
163
37.7k
            break;
164
18.4E
        } else {
165
18.4E
            DorisMetrics::instance()->finish_task_requests_failed->increment(1);
166
18.4E
            LOG_WARNING("failed to finish task")
167
18.4E
                    .tag("type", finish_task_request.task_type)
168
18.4E
                    .tag("signature", finish_task_request.signature)
169
18.4E
                    .error(result.status);
170
18.4E
            try_time += 1;
171
18.4E
        }
172
18.4E
        sleep(1);
173
18.4E
    }
174
37.2k
}
175
176
Status get_tablet_info(StorageEngine& engine, const TTabletId tablet_id,
177
0
                       const TSchemaHash schema_hash, TTabletInfo* tablet_info) {
178
0
    tablet_info->__set_tablet_id(tablet_id);
179
0
    tablet_info->__set_schema_hash(schema_hash);
180
0
    return engine.tablet_manager()->report_tablet_info(tablet_info);
181
0
}
182
183
1.20k
void random_sleep(int second) {
184
1.20k
    Random rnd(static_cast<uint32_t>(UnixMillis()));
185
1.20k
    sleep(rnd.Uniform(second) + 1);
186
1.20k
}
187
188
void alter_tablet(StorageEngine& engine, const TAgentTaskRequest& agent_task_req, int64_t signature,
189
0
                  const TTaskType::type task_type, TFinishTaskRequest* finish_task_request) {
190
0
    Status status;
191
192
0
    std::string_view process_name = "alter tablet";
193
    // Check last schema change status, if failed delete tablet file
194
    // Do not need to adjust delete success or not
195
    // Because if delete failed create rollup will failed
196
0
    TTabletId new_tablet_id = 0;
197
0
    TSchemaHash new_schema_hash = 0;
198
0
    if (status.ok()) {
199
0
        new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
200
0
        new_schema_hash = agent_task_req.alter_tablet_req_v2.new_schema_hash;
201
0
        auto mem_tracker = MemTrackerLimiter::create_shared(
202
0
                MemTrackerLimiter::Type::SCHEMA_CHANGE,
203
0
                fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
204
0
                            std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id),
205
0
                            std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id),
206
0
                            engine.memory_limitation_bytes_per_thread_for_schema_change()));
207
0
        SCOPED_ATTACH_TASK(mem_tracker);
208
0
        DorisMetrics::instance()->create_rollup_requests_total->increment(1);
209
0
        Status res = Status::OK();
210
0
        try {
211
0
            LOG_INFO("start {}", process_name)
212
0
                    .tag("signature", agent_task_req.signature)
213
0
                    .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
214
0
                    .tag("new_tablet_id", new_tablet_id)
215
0
                    .tag("mem_limit",
216
0
                         engine.memory_limitation_bytes_per_thread_for_schema_change());
217
0
            SchemaChangeJob job(engine, agent_task_req.alter_tablet_req_v2,
218
0
                                std::to_string(agent_task_req.alter_tablet_req_v2.__isset.job_id
219
0
                                                       ? agent_task_req.alter_tablet_req_v2.job_id
220
0
                                                       : 0));
221
0
            status = job.process_alter_tablet(agent_task_req.alter_tablet_req_v2);
222
0
        } catch (const Exception& e) {
223
0
            status = e.to_status();
224
0
        }
225
0
        if (!status.ok()) {
226
0
            DorisMetrics::instance()->create_rollup_requests_failed->increment(1);
227
0
        }
228
0
    }
229
230
0
    if (status.ok()) {
231
0
        increase_report_version();
232
0
    }
233
234
    // Return result to fe
235
0
    finish_task_request->__set_backend(BackendOptions::get_local_backend());
236
0
    finish_task_request->__set_report_version(s_report_version);
237
0
    finish_task_request->__set_task_type(task_type);
238
0
    finish_task_request->__set_signature(signature);
239
240
0
    std::vector<TTabletInfo> finish_tablet_infos;
241
0
    if (status.ok()) {
242
0
        TTabletInfo tablet_info;
243
0
        status = get_tablet_info(engine, new_tablet_id, new_schema_hash, &tablet_info);
244
0
        if (status.ok()) {
245
0
            finish_tablet_infos.push_back(tablet_info);
246
0
        }
247
0
    }
248
249
0
    if (!status.ok() && !status.is<NOT_IMPLEMENTED_ERROR>()) {
250
0
        LOG_WARNING("failed to {}", process_name)
251
0
                .tag("signature", agent_task_req.signature)
252
0
                .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
253
0
                .tag("new_tablet_id", new_tablet_id)
254
0
                .error(status);
255
0
    } else {
256
0
        finish_task_request->__set_finish_tablet_infos(finish_tablet_infos);
257
0
        LOG_INFO("successfully {}", process_name)
258
0
                .tag("signature", agent_task_req.signature)
259
0
                .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
260
0
                .tag("new_tablet_id", new_tablet_id);
261
0
    }
262
0
    finish_task_request->__set_task_status(status.to_thrift());
263
0
}
264
265
void alter_cloud_tablet(CloudStorageEngine& engine, const TAgentTaskRequest& agent_task_req,
266
                        int64_t signature, const TTaskType::type task_type,
267
10.6k
                        TFinishTaskRequest* finish_task_request) {
268
10.6k
    Status status;
269
270
10.6k
    std::string_view process_name = "alter tablet";
271
    // Check last schema change status, if failed delete tablet file
272
    // Do not need to adjust delete success or not
273
    // Because if delete failed create rollup will failed
274
10.6k
    TTabletId new_tablet_id = 0;
275
10.6k
    new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
276
10.6k
    auto mem_tracker = MemTrackerLimiter::create_shared(
277
10.6k
            MemTrackerLimiter::Type::SCHEMA_CHANGE,
278
10.6k
            fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
279
10.6k
                        std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id),
280
10.6k
                        std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id),
281
10.6k
                        engine.memory_limitation_bytes_per_thread_for_schema_change()));
282
10.6k
    SCOPED_ATTACH_TASK(mem_tracker);
283
10.6k
    DorisMetrics::instance()->create_rollup_requests_total->increment(1);
284
285
10.6k
    LOG_INFO("start {}", process_name)
286
10.6k
            .tag("signature", agent_task_req.signature)
287
10.6k
            .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
288
10.6k
            .tag("new_tablet_id", new_tablet_id)
289
10.6k
            .tag("mem_limit", engine.memory_limitation_bytes_per_thread_for_schema_change());
290
10.6k
    DCHECK(agent_task_req.alter_tablet_req_v2.__isset.job_id);
291
10.6k
    CloudSchemaChangeJob job(engine, std::to_string(agent_task_req.alter_tablet_req_v2.job_id),
292
10.6k
                             agent_task_req.alter_tablet_req_v2.expiration);
293
10.6k
    status = [&]() {
294
10.6k
        HANDLE_EXCEPTION_IF_CATCH_EXCEPTION(
295
10.6k
                job.process_alter_tablet(agent_task_req.alter_tablet_req_v2),
296
10.6k
                [&](const doris::Exception& ex) {
297
10.6k
                    DorisMetrics::instance()->create_rollup_requests_failed->increment(1);
298
10.6k
                    job.clean_up_on_failure();
299
10.6k
                });
300
10.1k
        return Status::OK();
301
10.6k
    }();
302
303
10.6k
    if (status.ok()) {
304
10.0k
        increase_report_version();
305
10.0k
        LOG_INFO("successfully {}", process_name)
306
10.0k
                .tag("signature", agent_task_req.signature)
307
10.0k
                .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
308
10.0k
                .tag("new_tablet_id", new_tablet_id);
309
10.0k
    } else {
310
608
        LOG_WARNING("failed to {}", process_name)
311
608
                .tag("signature", agent_task_req.signature)
312
608
                .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id)
313
608
                .tag("new_tablet_id", new_tablet_id)
314
608
                .error(status);
315
608
    }
316
317
    // Return result to fe
318
10.6k
    finish_task_request->__set_backend(BackendOptions::get_local_backend());
319
10.6k
    finish_task_request->__set_report_version(s_report_version);
320
10.6k
    finish_task_request->__set_task_type(task_type);
321
10.6k
    finish_task_request->__set_signature(signature);
322
10.6k
    finish_task_request->__set_task_status(status.to_thrift());
323
10.6k
}
324
325
Status check_migrate_request(StorageEngine& engine, const TStorageMediumMigrateReq& req,
326
0
                             TabletSharedPtr& tablet, DataDir** dest_store) {
327
0
    int64_t tablet_id = req.tablet_id;
328
0
    tablet = engine.tablet_manager()->get_tablet(tablet_id);
329
0
    if (tablet == nullptr) {
330
0
        return Status::InternalError("could not find tablet {}", tablet_id);
331
0
    }
332
333
0
    if (req.__isset.data_dir) {
334
        // request specify the data dir
335
0
        *dest_store = engine.get_store(req.data_dir);
336
0
        if (*dest_store == nullptr) {
337
0
            return Status::InternalError("could not find data dir {}", req.data_dir);
338
0
        }
339
0
    } else {
340
        // this is a storage medium
341
        // get data dir by storage medium
342
343
        // judge case when no need to migrate
344
0
        uint32_t count = engine.available_storage_medium_type_count();
345
0
        if (count <= 1) {
346
0
            return Status::InternalError("available storage medium type count is less than 1");
347
0
        }
348
        // check current tablet storage medium
349
0
        TStorageMedium::type storage_medium = req.storage_medium;
350
0
        TStorageMedium::type src_storage_medium = tablet->data_dir()->storage_medium();
351
0
        if (src_storage_medium == storage_medium) {
352
0
            return Status::InternalError("tablet is already on specified storage medium {}",
353
0
                                         storage_medium);
354
0
        }
355
        // get a random store of specified storage medium
356
0
        auto stores = engine.get_stores_for_create_tablet(tablet->partition_id(), storage_medium);
357
0
        if (stores.empty()) {
358
0
            return Status::InternalError("failed to get root path for create tablet");
359
0
        }
360
361
0
        *dest_store = stores[0];
362
0
    }
363
0
    if (tablet->data_dir()->path() == (*dest_store)->path()) {
364
0
        LOG_WARNING("tablet is already on specified path").tag("path", tablet->data_dir()->path());
365
0
        return Status::Error<FILE_ALREADY_EXIST, false>("tablet is already on specified path: {}",
366
0
                                                        tablet->data_dir()->path());
367
0
    }
368
369
    // check local disk capacity
370
0
    int64_t tablet_size = tablet->tablet_local_size();
371
0
    if ((*dest_store)->reach_capacity_limit(tablet_size)) {
372
0
        return Status::Error<EXCEEDED_LIMIT>("reach the capacity limit of path {}, tablet_size={}",
373
0
                                             (*dest_store)->path(), tablet_size);
374
0
    }
375
0
    return Status::OK();
376
0
}
377
378
// Return `true` if report success
379
bool handle_report(const TReportRequest& request, const ClusterInfo* cluster_info,
380
2.64k
                   std::string_view name) {
381
2.64k
    TMasterResult result;
382
2.64k
    Status status = MasterServerClient::instance()->report(request, &result);
383
2.64k
    if (!status.ok()) [[unlikely]] {
384
0
        LOG_WARNING("failed to report {}", name)
385
0
                .tag("host", cluster_info->master_fe_addr.hostname)
386
0
                .tag("port", cluster_info->master_fe_addr.port)
387
0
                .error(status);
388
0
        return false;
389
0
    }
390
391
2.64k
    else if (result.status.status_code != TStatusCode::OK) [[unlikely]] {
392
0
        LOG_WARNING("failed to report {}", name)
393
0
                .tag("host", cluster_info->master_fe_addr.hostname)
394
0
                .tag("port", cluster_info->master_fe_addr.port)
395
0
                .error(result.status);
396
0
        return false;
397
0
    }
398
399
2.64k
    return true;
400
2.64k
}
401
402
Status _submit_task(const TAgentTaskRequest& task,
403
68.4k
                    std::function<Status(const TAgentTaskRequest&)> submit_op) {
404
68.4k
    const TTaskType::type task_type = task.task_type;
405
68.4k
    int64_t signature = task.signature;
406
407
68.4k
    std::string type_str;
408
68.4k
    EnumToString(TTaskType, task_type, type_str);
409
18.4E
    VLOG_CRITICAL << "submitting task. type=" << type_str << ", signature=" << signature;
410
411
68.4k
    if (!register_task_info(task_type, signature)) {
412
1.51k
        LOG_WARNING("failed to register task").tag("type", type_str).tag("signature", signature);
413
        // Duplicated task request, just return OK
414
1.51k
        return Status::OK();
415
1.51k
    }
416
417
    // TODO(plat1ko): check task request member
418
419
    // Set the receiving time of task so that we can determine whether it is timed out later
420
    // exist a path task_worker_pool <- agent_server <- backend_service <- BackendService
421
    // use the arg BackendService_submit_tasks_args.tasks is not const, so modify is ok
422
66.9k
    (const_cast<TAgentTaskRequest&>(task)).__set_recv_time(time(nullptr));
423
66.9k
    auto st = submit_op(task);
424
66.9k
    if (!st.ok()) [[unlikely]] {
425
1
        LOG_INFO("failed to submit task").tag("type", type_str).tag("signature", signature);
426
1
        return st;
427
1
    }
428
429
66.9k
    LOG_INFO("successfully submit task").tag("type", type_str).tag("signature", signature);
430
66.9k
    return Status::OK();
431
66.9k
}
432
433
bvar::LatencyRecorder g_publish_version_latency("doris_pk", "publish_version");
434
435
bvar::Adder<uint64_t> ALTER_INVERTED_INDEX_count("task", "ALTER_INVERTED_INDEX");
436
bvar::Adder<uint64_t> CHECK_CONSISTENCY_count("task", "CHECK_CONSISTENCY");
437
bvar::Adder<uint64_t> UPLOAD_count("task", "UPLOAD");
438
bvar::Adder<uint64_t> DOWNLOAD_count("task", "DOWNLOAD");
439
bvar::Adder<uint64_t> MAKE_SNAPSHOT_count("task", "MAKE_SNAPSHOT");
440
bvar::Adder<uint64_t> RELEASE_SNAPSHOT_count("task", "RELEASE_SNAPSHOT");
441
bvar::Adder<uint64_t> MOVE_count("task", "MOVE");
442
bvar::Adder<uint64_t> COMPACTION_count("task", "COMPACTION");
443
bvar::Adder<uint64_t> PUSH_STORAGE_POLICY_count("task", "PUSH_STORAGE_POLICY");
444
bvar::Adder<uint64_t> PUSH_INDEX_POLICY_count("task", "PUSH_INDEX_POLICY");
445
bvar::Adder<uint64_t> PUSH_COOLDOWN_CONF_count("task", "PUSH_COOLDOWN_CONF");
446
bvar::Adder<uint64_t> CREATE_count("task", "CREATE_TABLE");
447
bvar::Adder<uint64_t> DROP_count("task", "DROP_TABLE");
448
bvar::Adder<uint64_t> PUBLISH_VERSION_count("task", "PUBLISH_VERSION");
449
bvar::Adder<uint64_t> CLEAR_TRANSACTION_TASK_count("task", "CLEAR_TRANSACTION_TASK");
450
bvar::Adder<uint64_t> DELETE_count("task", "DELETE");
451
bvar::Adder<uint64_t> PUSH_count("task", "PUSH");
452
bvar::Adder<uint64_t> UPDATE_TABLET_META_INFO_count("task", "UPDATE_TABLET_META_INFO");
453
bvar::Adder<uint64_t> ALTER_count("task", "ALTER_TABLE");
454
bvar::Adder<uint64_t> CLONE_count("task", "CLONE");
455
bvar::Adder<uint64_t> STORAGE_MEDIUM_MIGRATE_count("task", "STORAGE_MEDIUM_MIGRATE");
456
bvar::Adder<uint64_t> GC_BINLOG_count("task", "GC_BINLOG");
457
bvar::Adder<uint64_t> UPDATE_VISIBLE_VERSION_count("task", "UPDATE_VISIBLE_VERSION");
458
bvar::Adder<uint64_t> CALCULATE_DELETE_BITMAP_count("task", "CALCULATE_DELETE_BITMAP");
459
460
134k
void add_task_count(const TAgentTaskRequest& task, int n) {
461
    // clang-format off
462
134k
    switch (task.task_type) {
463
0
    #define ADD_TASK_COUNT(type) \
464
52.7k
    case TTaskType::type:        \
465
105k
        type##_count << n;       \
466
52.7k
        return;
467
1.17k
    ADD_TASK_COUNT(ALTER_INVERTED_INDEX)
468
0
    ADD_TASK_COUNT(CHECK_CONSISTENCY)
469
32
    ADD_TASK_COUNT(UPLOAD)
470
44
    ADD_TASK_COUNT(DOWNLOAD)
471
640
    ADD_TASK_COUNT(MAKE_SNAPSHOT)
472
640
    ADD_TASK_COUNT(RELEASE_SNAPSHOT)
473
344
    ADD_TASK_COUNT(MOVE)
474
6
    ADD_TASK_COUNT(COMPACTION)
475
20
    ADD_TASK_COUNT(PUSH_STORAGE_POLICY)
476
30
    ADD_TASK_COUNT(PUSH_INDEX_POLICY)
477
4
    ADD_TASK_COUNT(PUSH_COOLDOWN_CONF)
478
13.2k
    ADD_TASK_COUNT(CREATE)
479
9.38k
    ADD_TASK_COUNT(DROP)
480
5.26k
    ADD_TASK_COUNT(PUBLISH_VERSION)
481
40
    ADD_TASK_COUNT(CLEAR_TRANSACTION_TASK)
482
0
    ADD_TASK_COUNT(UPDATE_TABLET_META_INFO)
483
0
    ADD_TASK_COUNT(CLONE)
484
0
    ADD_TASK_COUNT(STORAGE_MEDIUM_MIGRATE)
485
0
    ADD_TASK_COUNT(GC_BINLOG)
486
5.08k
    ADD_TASK_COUNT(UPDATE_VISIBLE_VERSION)
487
16.8k
    ADD_TASK_COUNT(CALCULATE_DELETE_BITMAP)
488
0
    #undef ADD_TASK_COUNT
489
6.47k
    case TTaskType::REALTIME_PUSH:
490
6.47k
    case TTaskType::PUSH:
491
6.47k
        if (task.push_req.push_type == TPushType::LOAD_V2) {
492
0
            PUSH_count << n;
493
6.47k
        } else if (task.push_req.push_type == TPushType::DELETE) {
494
6.47k
            DELETE_count << n;
495
6.47k
        }
496
6.47k
        return;
497
21.3k
    case TTaskType::ALTER:
498
21.3k
    {
499
21.3k
        ALTER_count << n;
500
        // cloud auto stop need sc jobs, a tablet's sc can also be considered a fragment
501
21.3k
        if (n > 0) {
502
            // only count fragment when task is actually starting
503
10.6k
            doris::g_fragment_executing_count << 1;
504
10.6k
            int64_t now = duration_cast<std::chrono::milliseconds>(
505
10.6k
                                std::chrono::system_clock::now().time_since_epoch())
506
10.6k
                                .count();
507
10.6k
            g_fragment_last_active_time.set_value(now);
508
10.6k
        }
509
21.3k
        return;
510
6.47k
    }
511
53.7k
    default:
512
53.7k
        return;
513
134k
    }
514
    // clang-format on
515
134k
}
516
517
bvar::Adder<uint64_t> report_task_total("report", "task_total");
518
bvar::Adder<uint64_t> report_task_failed("report", "task_failed");
519
bvar::Adder<uint64_t> report_disk_total("report", "disk_total");
520
bvar::Adder<uint64_t> report_disk_failed("report", "disk_failed");
521
bvar::Adder<uint64_t> report_tablet_total("report", "tablet_total");
522
bvar::Adder<uint64_t> report_tablet_failed("report", "tablet_failed");
523
bvar::Adder<uint64_t> report_index_policy_total("report", "index_policy_total");
524
bvar::Adder<uint64_t> report_index_policy_failed("report", "index_policy_failed");
525
526
} // namespace
527
528
TaskWorkerPool::TaskWorkerPool(
529
        std::string_view name, int worker_count,
530
        std::function<void(const TAgentTaskRequest& task)> callback,
531
        std::function<void(const TAgentTaskRequest& task)> pre_submit_callback)
532
152
        : _callback(std::move(callback)), _pre_submit_callback(std::move(pre_submit_callback)) {
533
152
    auto st = ThreadPoolBuilder(fmt::format("TaskWP_{}", name))
534
152
                      .set_min_threads(worker_count)
535
152
                      .set_max_threads(worker_count)
536
152
                      .build(&_thread_pool);
537
152
    CHECK(st.ok()) << name << ": " << st;
538
152
}
539
540
72
TaskWorkerPool::~TaskWorkerPool() {
541
72
    stop();
542
72
}
543
544
78
void TaskWorkerPool::stop() {
545
78
    if (_stopped.exchange(true)) {
546
6
        return;
547
6
    }
548
549
72
    if (_thread_pool) {
550
72
        _thread_pool->shutdown();
551
72
    }
552
72
}
553
554
68.4k
Status TaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
555
68.4k
    return _submit_task(task, [this](auto&& task) {
556
67.2k
        if (_pre_submit_callback) {
557
10.6k
            _pre_submit_callback(task);
558
10.6k
        }
559
67.2k
        add_task_count(task, 1);
560
67.2k
        return _thread_pool->submit_func([this, task]() {
561
67.1k
            _callback(task);
562
67.1k
            add_task_count(task, -1);
563
67.1k
        });
564
67.2k
    });
565
68.4k
}
566
567
PriorTaskWorkerPool::PriorTaskWorkerPool(
568
        const std::string& name, int normal_worker_count, int high_prior_worker_count,
569
        std::function<void(const TAgentTaskRequest& task)> callback)
570
13
        : _callback(std::move(callback)) {
571
50
    for (int i = 0; i < normal_worker_count; ++i) {
572
37
        auto st = Thread::create(
573
37
                "Normal", name, [this] { normal_loop(); }, &_workers.emplace_back());
574
37
        CHECK(st.ok()) << name << ": " << st;
575
37
    }
576
577
50
    for (int i = 0; i < high_prior_worker_count; ++i) {
578
37
        auto st = Thread::create(
579
37
                "HighPrior", name, [this] { high_prior_loop(); }, &_workers.emplace_back());
580
37
        CHECK(st.ok()) << name << ": " << st;
581
37
    }
582
13
}
583
584
7
PriorTaskWorkerPool::~PriorTaskWorkerPool() {
585
7
    stop();
586
7
}
587
588
11
void PriorTaskWorkerPool::stop() {
589
11
    {
590
11
        std::lock_guard lock(_mtx);
591
11
        if (_stopped) {
592
4
            return;
593
4
        }
594
595
7
        _stopped = true;
596
7
    }
597
0
    _normal_condv.notify_all();
598
7
    _high_prior_condv.notify_all();
599
600
38
    for (auto&& w : _workers) {
601
38
        if (w) {
602
38
            w->join();
603
38
        }
604
38
    }
605
7
}
606
607
6
Status PriorTaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
608
6
    return _submit_task(task, [this](auto&& task) {
609
6
        auto req = std::make_unique<TAgentTaskRequest>(task);
610
6
        add_task_count(*req, 1);
611
6
        if (req->__isset.priority && req->priority == TPriority::HIGH) {
612
4
            std::lock_guard lock(_mtx);
613
4
            _high_prior_queue.push_back(std::move(req));
614
4
            _high_prior_condv.notify_one();
615
4
            _normal_condv.notify_one();
616
4
        } else {
617
2
            std::lock_guard lock(_mtx);
618
2
            _normal_queue.push_back(std::move(req));
619
2
            _normal_condv.notify_one();
620
2
        }
621
6
        return Status::OK();
622
6
    });
623
6
}
624
625
0
Status PriorTaskWorkerPool::submit_high_prior_and_cancel_low(TAgentTaskRequest& task) {
626
0
    const TTaskType::type task_type = task.task_type;
627
0
    int64_t signature = task.signature;
628
0
    std::string type_str;
629
0
    EnumToString(TTaskType, task_type, type_str);
630
0
    auto req = std::make_unique<TAgentTaskRequest>(task);
631
632
0
    DCHECK(req->__isset.priority && req->priority == TPriority::HIGH);
633
0
    do {
634
0
        std::lock_guard lock(s_task_signatures_mtx);
635
0
        auto& set = s_task_signatures[task_type];
636
0
        if (!set.contains(signature)) {
637
            // If it doesn't exist, put it directly into the priority queue
638
0
            add_task_count(*req, 1);
639
0
            set.insert(signature);
640
0
            std::lock_guard temp_lock(_mtx);
641
0
            _high_prior_queue.push_back(std::move(req));
642
0
            _high_prior_condv.notify_one();
643
0
            _normal_condv.notify_one();
644
0
            break;
645
0
        } else {
646
0
            std::lock_guard temp_lock(_mtx);
647
0
            for (auto it = _normal_queue.begin(); it != _normal_queue.end();) {
648
                // If it exists in the normal queue, cancel the task in the normal queue
649
0
                if ((*it)->signature == signature) {
650
0
                    _normal_queue.erase(it);                     // cancel the original task
651
0
                    _high_prior_queue.push_back(std::move(req)); // add the new task to the queue
652
0
                    _high_prior_condv.notify_one();
653
0
                    _normal_condv.notify_one();
654
0
                    break;
655
0
                } else {
656
0
                    ++it; // doesn't meet the condition, continue to the next one
657
0
                }
658
0
            }
659
            // If it exists in the high priority queue, no operation is needed
660
0
            LOG_INFO("task has already existed in high prior queue.").tag("signature", signature);
661
0
        }
662
0
    } while (false);
663
664
    // Set the receiving time of task so that we can determine whether it is timed out later
665
0
    task.__set_recv_time(time(nullptr));
666
667
0
    LOG_INFO("successfully submit task").tag("type", type_str).tag("signature", signature);
668
0
    return Status::OK();
669
0
}
670
671
37
void PriorTaskWorkerPool::normal_loop() {
672
58
    while (true) {
673
40
        std::unique_ptr<TAgentTaskRequest> req;
674
675
40
        {
676
40
            std::unique_lock lock(_mtx);
677
62
            _normal_condv.wait(lock, [&] {
678
62
                return !_normal_queue.empty() || !_high_prior_queue.empty() || _stopped;
679
62
            });
680
681
40
            if (_stopped) {
682
19
                return;
683
19
            }
684
685
21
            if (!_high_prior_queue.empty()) {
686
1
                req = std::move(_high_prior_queue.front());
687
1
                _high_prior_queue.pop_front();
688
20
            } else if (!_normal_queue.empty()) {
689
2
                req = std::move(_normal_queue.front());
690
2
                _normal_queue.pop_front();
691
18
            } else {
692
18
                continue;
693
18
            }
694
21
        }
695
696
3
        _callback(*req);
697
3
        add_task_count(*req, -1);
698
3
    }
699
37
}
700
701
37
void PriorTaskWorkerPool::high_prior_loop() {
702
57
    while (true) {
703
39
        std::unique_ptr<TAgentTaskRequest> req;
704
705
39
        {
706
39
            std::unique_lock lock(_mtx);
707
60
            _high_prior_condv.wait(lock, [&] { return !_high_prior_queue.empty() || _stopped; });
708
709
39
            if (_stopped) {
710
19
                return;
711
19
            }
712
713
20
            if (_high_prior_queue.empty()) {
714
0
                continue;
715
0
            }
716
717
20
            req = std::move(_high_prior_queue.front());
718
20
            _high_prior_queue.pop_front();
719
20
        }
720
721
0
        _callback(*req);
722
20
        add_task_count(*req, -1);
723
20
    }
724
37
}
725
726
ReportWorker::ReportWorker(std::string name, const ClusterInfo* cluster_info, int report_interval_s,
727
                           std::function<void()> callback)
728
29
        : _name(std::move(name)) {
729
29
    auto report_loop = [this, cluster_info, report_interval_s, callback = std::move(callback)] {
730
29
        auto& engine = ExecEnv::GetInstance()->storage_engine();
731
29
        engine.register_report_listener(this);
732
2.75k
        while (true) {
733
2.73k
            {
734
2.73k
                std::unique_lock lock(_mtx);
735
2.73k
                _condv.wait_for(lock, std::chrono::seconds(report_interval_s),
736
5.45k
                                [&] { return _stopped || _signal; });
737
738
2.73k
                if (_stopped) {
739
13
                    break;
740
13
                }
741
742
2.72k
                if (_signal) {
743
                    // Consume received signal
744
59
                    _signal = false;
745
59
                }
746
2.72k
            }
747
748
2.72k
            if (cluster_info->master_fe_addr.port == 0) {
749
                // port == 0 means not received heartbeat yet
750
51
                LOG(INFO) << "waiting to receive first heartbeat from frontend before doing report";
751
51
                continue;
752
51
            }
753
754
2.67k
            callback();
755
2.67k
        }
756
29
        engine.deregister_report_listener(this);
757
29
    };
758
759
29
    auto st = Thread::create("ReportWorker", _name, report_loop, &_thread);
760
29
    CHECK(st.ok()) << _name << ": " << st;
761
29
}
762
763
13
ReportWorker::~ReportWorker() {
764
13
    stop();
765
13
}
766
767
61
void ReportWorker::notify() {
768
61
    {
769
61
        std::lock_guard lock(_mtx);
770
61
        _signal = true;
771
61
    }
772
61
    _condv.notify_all();
773
61
}
774
775
14
void ReportWorker::stop() {
776
14
    {
777
14
        std::lock_guard lock(_mtx);
778
14
        if (_stopped) {
779
1
            return;
780
1
        }
781
782
13
        _stopped = true;
783
13
    }
784
0
    _condv.notify_all();
785
13
    if (_thread) {
786
13
        _thread->join();
787
13
    }
788
13
}
789
790
586
void alter_cloud_index_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
791
586
    const auto& alter_inverted_index_rq = req.alter_inverted_index_req;
792
586
    LOG(INFO) << "[index_change]get alter index task. signature=" << req.signature
793
586
              << ", tablet_id=" << alter_inverted_index_rq.tablet_id
794
586
              << ", job_id=" << alter_inverted_index_rq.job_id;
795
796
586
    Status status = Status::OK();
797
586
    auto tablet_ptr = engine.tablet_mgr().get_tablet(alter_inverted_index_rq.tablet_id);
798
586
    if (tablet_ptr != nullptr) {
799
585
        EngineCloudIndexChangeTask engine_task(engine, req.alter_inverted_index_req);
800
585
        status = engine_task.execute();
801
585
    } else {
802
1
        status = Status::NotFound("could not find tablet {}", alter_inverted_index_rq.tablet_id);
803
1
    }
804
805
    // Return result to fe
806
586
    TFinishTaskRequest finish_task_request;
807
586
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
808
586
    finish_task_request.__set_task_type(req.task_type);
809
586
    finish_task_request.__set_signature(req.signature);
810
586
    if (!status.ok()) {
811
9
        LOG(WARNING) << "[index_change]failed to alter inverted index task, signature="
812
9
                     << req.signature << ", tablet_id=" << alter_inverted_index_rq.tablet_id
813
9
                     << ", job_id=" << alter_inverted_index_rq.job_id << ", error=" << status;
814
577
    } else {
815
577
        LOG(INFO) << "[index_change]successfully alter inverted index task, signature="
816
577
                  << req.signature << ", tablet_id=" << alter_inverted_index_rq.tablet_id
817
577
                  << ", job_id=" << alter_inverted_index_rq.job_id;
818
577
    }
819
586
    finish_task_request.__set_task_status(status.to_thrift());
820
586
    finish_task(finish_task_request);
821
586
    remove_task_info(req.task_type, req.signature);
822
586
}
823
824
0
void alter_inverted_index_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
825
0
    const auto& alter_inverted_index_rq = req.alter_inverted_index_req;
826
0
    LOG(INFO) << "get alter inverted index task. signature=" << req.signature
827
0
              << ", tablet_id=" << alter_inverted_index_rq.tablet_id
828
0
              << ", job_id=" << alter_inverted_index_rq.job_id;
829
830
0
    Status status = Status::OK();
831
0
    auto tablet_ptr = engine.tablet_manager()->get_tablet(alter_inverted_index_rq.tablet_id);
832
0
    if (tablet_ptr != nullptr) {
833
0
        EngineIndexChangeTask engine_task(engine, alter_inverted_index_rq);
834
0
        SCOPED_ATTACH_TASK(engine_task.mem_tracker());
835
0
        status = engine_task.execute();
836
0
    } else {
837
0
        status = Status::NotFound("could not find tablet {}", alter_inverted_index_rq.tablet_id);
838
0
    }
839
840
    // Return result to fe
841
0
    TFinishTaskRequest finish_task_request;
842
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
843
0
    finish_task_request.__set_task_type(req.task_type);
844
0
    finish_task_request.__set_signature(req.signature);
845
0
    std::vector<TTabletInfo> finish_tablet_infos;
846
0
    if (!status.ok()) {
847
0
        LOG(WARNING) << "failed to alter inverted index task, signature=" << req.signature
848
0
                     << ", tablet_id=" << alter_inverted_index_rq.tablet_id
849
0
                     << ", job_id=" << alter_inverted_index_rq.job_id << ", error=" << status;
850
0
    } else {
851
0
        LOG(INFO) << "successfully alter inverted index task, signature=" << req.signature
852
0
                  << ", tablet_id=" << alter_inverted_index_rq.tablet_id
853
0
                  << ", job_id=" << alter_inverted_index_rq.job_id;
854
0
        TTabletInfo tablet_info;
855
0
        status = get_tablet_info(engine, alter_inverted_index_rq.tablet_id,
856
0
                                 alter_inverted_index_rq.schema_hash, &tablet_info);
857
0
        if (status.ok()) {
858
0
            finish_tablet_infos.push_back(tablet_info);
859
0
        }
860
0
        finish_task_request.__set_finish_tablet_infos(finish_tablet_infos);
861
0
    }
862
0
    finish_task_request.__set_task_status(status.to_thrift());
863
0
    finish_task(finish_task_request);
864
0
    remove_task_info(req.task_type, req.signature);
865
0
}
866
867
0
void update_tablet_meta_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
868
0
    LOG(INFO) << "get update tablet meta task. signature=" << req.signature;
869
870
0
    Status status;
871
0
    const auto& update_tablet_meta_req = req.update_tablet_meta_info_req;
872
0
    for (const auto& tablet_meta_info : update_tablet_meta_req.tabletMetaInfos) {
873
0
        auto tablet = engine.tablet_manager()->get_tablet(tablet_meta_info.tablet_id);
874
0
        if (tablet == nullptr) {
875
0
            status = Status::NotFound("tablet not found");
876
0
            LOG(WARNING) << "could not find tablet when update tablet meta. tablet_id="
877
0
                         << tablet_meta_info.tablet_id;
878
0
            continue;
879
0
        }
880
0
        bool need_to_save = false;
881
0
        if (tablet_meta_info.__isset.partition_id) {
882
            // for fix partition_id = 0
883
0
            LOG(WARNING) << "change be tablet id: " << tablet->tablet_meta()->tablet_id()
884
0
                         << "partition id from : " << tablet->tablet_meta()->partition_id()
885
0
                         << " to : " << tablet_meta_info.partition_id;
886
0
            auto succ = engine.tablet_manager()->update_tablet_partition_id(
887
0
                    tablet_meta_info.partition_id, tablet->tablet_meta()->tablet_id());
888
0
            if (!succ) {
889
0
                std::string err_msg = fmt::format(
890
0
                        "change be tablet id : {} partition_id : {} failed",
891
0
                        tablet->tablet_meta()->tablet_id(), tablet_meta_info.partition_id);
892
0
                LOG(WARNING) << err_msg;
893
0
                status = Status::InvalidArgument(err_msg);
894
0
                continue;
895
0
            }
896
0
            need_to_save = true;
897
0
        }
898
0
        if (tablet_meta_info.__isset.storage_policy_id) {
899
0
            tablet->tablet_meta()->set_storage_policy_id(tablet_meta_info.storage_policy_id);
900
0
            need_to_save = true;
901
0
        }
902
0
        if (tablet_meta_info.__isset.is_in_memory) {
903
0
            tablet->tablet_meta()->mutable_tablet_schema()->set_is_in_memory(
904
0
                    tablet_meta_info.is_in_memory);
905
0
            std::shared_lock rlock(tablet->get_header_lock());
906
0
            for (auto& [_, rowset_meta] : tablet->tablet_meta()->all_mutable_rs_metas()) {
907
0
                rowset_meta->tablet_schema()->set_is_in_memory(tablet_meta_info.is_in_memory);
908
0
            }
909
0
            tablet->tablet_schema_unlocked()->set_is_in_memory(tablet_meta_info.is_in_memory);
910
0
            need_to_save = true;
911
0
        }
912
0
        if (tablet_meta_info.__isset.compaction_policy) {
913
0
            if (tablet_meta_info.compaction_policy != CUMULATIVE_SIZE_BASED_POLICY &&
914
0
                tablet_meta_info.compaction_policy != CUMULATIVE_TIME_SERIES_POLICY) {
915
0
                status = Status::InvalidArgument(
916
0
                        "invalid compaction policy, only support for size_based or "
917
0
                        "time_series");
918
0
                continue;
919
0
            }
920
0
            tablet->tablet_meta()->set_compaction_policy(tablet_meta_info.compaction_policy);
921
0
            need_to_save = true;
922
0
        }
923
0
        if (tablet_meta_info.__isset.time_series_compaction_goal_size_mbytes) {
924
0
            if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) {
925
0
                status = Status::InvalidArgument(
926
0
                        "only time series compaction policy support time series config");
927
0
                continue;
928
0
            }
929
0
            tablet->tablet_meta()->set_time_series_compaction_goal_size_mbytes(
930
0
                    tablet_meta_info.time_series_compaction_goal_size_mbytes);
931
0
            need_to_save = true;
932
0
        }
933
0
        if (tablet_meta_info.__isset.time_series_compaction_file_count_threshold) {
934
0
            if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) {
935
0
                status = Status::InvalidArgument(
936
0
                        "only time series compaction policy support time series config");
937
0
                continue;
938
0
            }
939
0
            tablet->tablet_meta()->set_time_series_compaction_file_count_threshold(
940
0
                    tablet_meta_info.time_series_compaction_file_count_threshold);
941
0
            need_to_save = true;
942
0
        }
943
0
        if (tablet_meta_info.__isset.time_series_compaction_time_threshold_seconds) {
944
0
            if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) {
945
0
                status = Status::InvalidArgument(
946
0
                        "only time series compaction policy support time series config");
947
0
                continue;
948
0
            }
949
0
            tablet->tablet_meta()->set_time_series_compaction_time_threshold_seconds(
950
0
                    tablet_meta_info.time_series_compaction_time_threshold_seconds);
951
0
            need_to_save = true;
952
0
        }
953
0
        if (tablet_meta_info.__isset.time_series_compaction_empty_rowsets_threshold) {
954
0
            if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) {
955
0
                status = Status::InvalidArgument(
956
0
                        "only time series compaction policy support time series config");
957
0
                continue;
958
0
            }
959
0
            tablet->tablet_meta()->set_time_series_compaction_empty_rowsets_threshold(
960
0
                    tablet_meta_info.time_series_compaction_empty_rowsets_threshold);
961
0
            need_to_save = true;
962
0
        }
963
0
        if (tablet_meta_info.__isset.time_series_compaction_level_threshold) {
964
0
            if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) {
965
0
                status = Status::InvalidArgument(
966
0
                        "only time series compaction policy support time series config");
967
0
                continue;
968
0
            }
969
0
            tablet->tablet_meta()->set_time_series_compaction_level_threshold(
970
0
                    tablet_meta_info.time_series_compaction_level_threshold);
971
0
            need_to_save = true;
972
0
        }
973
0
        if (tablet_meta_info.__isset.vertical_compaction_num_columns_per_group) {
974
0
            tablet->tablet_meta()->set_vertical_compaction_num_columns_per_group(
975
0
                    tablet_meta_info.vertical_compaction_num_columns_per_group);
976
0
            need_to_save = true;
977
0
        }
978
0
        if (tablet_meta_info.__isset.replica_id) {
979
0
            tablet->tablet_meta()->set_replica_id(tablet_meta_info.replica_id);
980
0
        }
981
0
        if (tablet_meta_info.__isset.binlog_config) {
982
            // check binlog_config require fields: enable, ttl_seconds, max_bytes, max_history_nums
983
0
            const auto& t_binlog_config = tablet_meta_info.binlog_config;
984
0
            if (!t_binlog_config.__isset.enable || !t_binlog_config.__isset.ttl_seconds ||
985
0
                !t_binlog_config.__isset.max_bytes || !t_binlog_config.__isset.max_history_nums) {
986
0
                status = Status::InvalidArgument("invalid binlog config, some fields not set");
987
0
                LOG(WARNING) << fmt::format(
988
0
                        "invalid binlog config, some fields not set, tablet_id={}, "
989
0
                        "t_binlog_config={}",
990
0
                        tablet_meta_info.tablet_id,
991
0
                        apache::thrift::ThriftDebugString(t_binlog_config));
992
0
                continue;
993
0
            }
994
995
0
            BinlogConfig new_binlog_config;
996
0
            new_binlog_config = tablet_meta_info.binlog_config;
997
0
            LOG(INFO) << fmt::format(
998
0
                    "update tablet meta binlog config. tablet_id={}, old_binlog_config={}, "
999
0
                    "new_binlog_config={}",
1000
0
                    tablet_meta_info.tablet_id, tablet->tablet_meta()->binlog_config().to_string(),
1001
0
                    new_binlog_config.to_string());
1002
0
            tablet->set_binlog_config(new_binlog_config);
1003
0
            need_to_save = true;
1004
0
        }
1005
0
        if (tablet_meta_info.__isset.enable_single_replica_compaction) {
1006
0
            std::shared_lock rlock(tablet->get_header_lock());
1007
0
            tablet->tablet_meta()->mutable_tablet_schema()->set_enable_single_replica_compaction(
1008
0
                    tablet_meta_info.enable_single_replica_compaction);
1009
0
            for (auto& [_, rowset_meta] : tablet->tablet_meta()->all_mutable_rs_metas()) {
1010
0
                rowset_meta->tablet_schema()->set_enable_single_replica_compaction(
1011
0
                        tablet_meta_info.enable_single_replica_compaction);
1012
0
            }
1013
0
            tablet->tablet_schema_unlocked()->set_enable_single_replica_compaction(
1014
0
                    tablet_meta_info.enable_single_replica_compaction);
1015
0
            need_to_save = true;
1016
0
        }
1017
0
        if (tablet_meta_info.__isset.disable_auto_compaction) {
1018
0
            std::shared_lock rlock(tablet->get_header_lock());
1019
0
            tablet->tablet_meta()->mutable_tablet_schema()->set_disable_auto_compaction(
1020
0
                    tablet_meta_info.disable_auto_compaction);
1021
0
            for (auto& [_, rowset_meta] : tablet->tablet_meta()->all_mutable_rs_metas()) {
1022
0
                rowset_meta->tablet_schema()->set_disable_auto_compaction(
1023
0
                        tablet_meta_info.disable_auto_compaction);
1024
0
            }
1025
0
            tablet->tablet_schema_unlocked()->set_disable_auto_compaction(
1026
0
                    tablet_meta_info.disable_auto_compaction);
1027
0
            need_to_save = true;
1028
0
        }
1029
1030
0
        if (tablet_meta_info.__isset.skip_write_index_on_load) {
1031
0
            std::shared_lock rlock(tablet->get_header_lock());
1032
0
            tablet->tablet_meta()->mutable_tablet_schema()->set_skip_write_index_on_load(
1033
0
                    tablet_meta_info.skip_write_index_on_load);
1034
0
            for (auto& [_, rowset_meta] : tablet->tablet_meta()->all_mutable_rs_metas()) {
1035
0
                rowset_meta->tablet_schema()->set_skip_write_index_on_load(
1036
0
                        tablet_meta_info.skip_write_index_on_load);
1037
0
            }
1038
0
            tablet->tablet_schema_unlocked()->set_skip_write_index_on_load(
1039
0
                    tablet_meta_info.skip_write_index_on_load);
1040
0
            need_to_save = true;
1041
0
        }
1042
0
        if (need_to_save) {
1043
0
            std::shared_lock rlock(tablet->get_header_lock());
1044
0
            tablet->save_meta();
1045
0
        }
1046
0
    }
1047
1048
0
    LOG(INFO) << "finish update tablet meta task. signature=" << req.signature;
1049
0
    if (req.signature != -1) {
1050
0
        TFinishTaskRequest finish_task_request;
1051
0
        finish_task_request.__set_task_status(status.to_thrift());
1052
0
        finish_task_request.__set_backend(BackendOptions::get_local_backend());
1053
0
        finish_task_request.__set_task_type(req.task_type);
1054
0
        finish_task_request.__set_signature(req.signature);
1055
0
        finish_task(finish_task_request);
1056
0
        remove_task_info(req.task_type, req.signature);
1057
0
    }
1058
0
}
1059
1060
0
void check_consistency_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1061
0
    uint32_t checksum = 0;
1062
0
    const auto& check_consistency_req = req.check_consistency_req;
1063
0
    EngineChecksumTask engine_task(engine, check_consistency_req.tablet_id,
1064
0
                                   check_consistency_req.schema_hash, check_consistency_req.version,
1065
0
                                   &checksum);
1066
0
    SCOPED_ATTACH_TASK(engine_task.mem_tracker());
1067
0
    Status status = engine_task.execute();
1068
0
    if (!status.ok()) {
1069
0
        LOG_WARNING("failed to check consistency")
1070
0
                .tag("signature", req.signature)
1071
0
                .tag("tablet_id", check_consistency_req.tablet_id)
1072
0
                .error(status);
1073
0
    } else {
1074
0
        LOG_INFO("successfully check consistency")
1075
0
                .tag("signature", req.signature)
1076
0
                .tag("tablet_id", check_consistency_req.tablet_id)
1077
0
                .tag("checksum", checksum);
1078
0
    }
1079
1080
0
    TFinishTaskRequest finish_task_request;
1081
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1082
0
    finish_task_request.__set_task_type(req.task_type);
1083
0
    finish_task_request.__set_signature(req.signature);
1084
0
    finish_task_request.__set_task_status(status.to_thrift());
1085
0
    finish_task_request.__set_tablet_checksum(static_cast<int64_t>(checksum));
1086
0
    finish_task_request.__set_request_version(check_consistency_req.version);
1087
1088
0
    finish_task(finish_task_request);
1089
0
    remove_task_info(req.task_type, req.signature);
1090
0
}
1091
1092
904
void report_task_callback(const ClusterInfo* cluster_info) {
1093
904
    TReportRequest request;
1094
904
    if (config::report_random_wait) {
1095
904
        random_sleep(5);
1096
904
    }
1097
904
    request.__isset.tasks = true;
1098
904
    {
1099
904
        std::lock_guard lock(s_task_signatures_mtx);
1100
904
        auto& tasks = request.tasks;
1101
5.46k
        for (auto&& [task_type, signatures] : s_task_signatures) {
1102
5.46k
            auto& set = tasks[task_type];
1103
4.00M
            for (auto&& signature : signatures) {
1104
4.00M
                set.insert(signature);
1105
4.00M
            }
1106
5.46k
        }
1107
904
    }
1108
904
    request.__set_backend(BackendOptions::get_local_backend());
1109
904
    request.__set_running_tasks(ExecEnv::GetInstance()->fragment_mgr()->running_query_num());
1110
904
    bool succ = handle_report(request, cluster_info, "task");
1111
904
    report_task_total << 1;
1112
904
    if (!succ) [[unlikely]] {
1113
0
        report_task_failed << 1;
1114
0
    }
1115
904
}
1116
1117
277
void report_disk_callback(StorageEngine& engine, const ClusterInfo* cluster_info) {
1118
277
    TReportRequest request;
1119
277
    request.__set_backend(BackendOptions::get_local_backend());
1120
277
    request.__isset.disks = true;
1121
1122
277
    std::vector<DataDirInfo> data_dir_infos;
1123
277
    static_cast<void>(engine.get_all_data_dir_info(&data_dir_infos, true /* update */));
1124
1125
315
    for (auto& root_path_info : data_dir_infos) {
1126
315
        TDisk disk;
1127
315
        disk.__set_root_path(root_path_info.path);
1128
315
        disk.__set_path_hash(root_path_info.path_hash);
1129
315
        disk.__set_storage_medium(root_path_info.storage_medium);
1130
315
        disk.__set_disk_total_capacity(root_path_info.disk_capacity);
1131
315
        disk.__set_data_used_capacity(root_path_info.local_used_capacity);
1132
315
        disk.__set_remote_used_capacity(root_path_info.remote_used_capacity);
1133
315
        disk.__set_disk_available_capacity(root_path_info.available);
1134
315
        disk.__set_trash_used_capacity(root_path_info.trash_used_capacity);
1135
315
        disk.__set_used(root_path_info.is_used);
1136
315
        request.disks[root_path_info.path] = disk;
1137
315
    }
1138
277
    request.__set_num_cores(CpuInfo::num_cores());
1139
277
    request.__set_pipeline_executor_size(config::pipeline_executor_size > 0
1140
277
                                                 ? config::pipeline_executor_size
1141
277
                                                 : CpuInfo::num_cores());
1142
277
    bool succ = handle_report(request, cluster_info, "disk");
1143
277
    report_disk_total << 1;
1144
277
    if (!succ) [[unlikely]] {
1145
0
        report_disk_failed << 1;
1146
0
    }
1147
277
}
1148
1149
107
void report_disk_callback(CloudStorageEngine& engine, const ClusterInfo* cluster_info) {
1150
    // Random sleep 1~5 seconds before doing report.
1151
    // In order to avoid the problem that the FE receives many report requests at the same time
1152
    // and can not be processed.
1153
107
    if (config::report_random_wait) {
1154
107
        random_sleep(5);
1155
107
    }
1156
107
    (void)engine; // To be used in the future
1157
1158
107
    TReportRequest request;
1159
107
    request.__set_backend(BackendOptions::get_local_backend());
1160
107
    request.__isset.disks = true;
1161
1162
    // TODO(deardeng): report disk info in cloud mode. And make it more clear
1163
    //                 that report CPU by using a separte report procedure
1164
    //                 or abstracting disk report as "host info report"
1165
107
    request.__set_num_cores(CpuInfo::num_cores());
1166
107
    request.__set_pipeline_executor_size(config::pipeline_executor_size > 0
1167
107
                                                 ? config::pipeline_executor_size
1168
107
                                                 : CpuInfo::num_cores());
1169
107
    bool succ = handle_report(request, cluster_info, "disk");
1170
107
    report_disk_total << 1;
1171
107
    report_disk_failed << !succ;
1172
107
}
1173
1174
133
void report_tablet_callback(StorageEngine& engine, const ClusterInfo* cluster_info) {
1175
133
    if (config::report_random_wait) {
1176
133
        random_sleep(5);
1177
133
    }
1178
1179
133
    TReportRequest request;
1180
133
    request.__set_backend(BackendOptions::get_local_backend());
1181
133
    request.__isset.tablets = true;
1182
1183
133
    increase_report_version();
1184
133
    uint64_t report_version;
1185
133
    for (int i = 0; i < 5; i++) {
1186
133
        request.tablets.clear();
1187
133
        report_version = s_report_version;
1188
133
        engine.tablet_manager()->build_all_report_tablets_info(&request.tablets);
1189
133
        if (report_version == s_report_version) {
1190
133
            break;
1191
133
        }
1192
133
    }
1193
1194
133
    if (report_version < s_report_version) {
1195
        // TODO llj This can only reduce the possibility for report error, but can't avoid it.
1196
        // If FE create a tablet in FE meta and send CREATE task to this BE, the tablet may not be included in this
1197
        // report, and the report version has a small probability that it has not been updated in time. When FE
1198
        // receives this report, it is possible to delete the new tablet.
1199
0
        LOG(WARNING) << "report version " << report_version << " change to " << s_report_version;
1200
0
        DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1);
1201
0
        return;
1202
0
    }
1203
1204
133
    std::map<int64_t, int64_t> partitions_version;
1205
133
    engine.tablet_manager()->get_partitions_visible_version(&partitions_version);
1206
133
    request.__set_partitions_version(std::move(partitions_version));
1207
1208
133
    int64_t max_compaction_score =
1209
133
            std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score->value(),
1210
133
                     DorisMetrics::instance()->tablet_base_max_compaction_score->value());
1211
133
    request.__set_tablet_max_compaction_score(max_compaction_score);
1212
133
    request.__set_report_version(report_version);
1213
1214
    // report storage policy and resource
1215
133
    auto& storage_policy_list = request.storage_policy;
1216
133
    for (auto [id, version] : get_storage_policy_ids()) {
1217
72
        auto& storage_policy = storage_policy_list.emplace_back();
1218
72
        storage_policy.__set_id(id);
1219
72
        storage_policy.__set_version(version);
1220
72
    }
1221
133
    request.__isset.storage_policy = true;
1222
133
    auto& resource_list = request.resource;
1223
190
    for (auto [id_str, version] : get_storage_resource_ids()) {
1224
190
        auto& resource = resource_list.emplace_back();
1225
190
        int64_t id = -1;
1226
190
        if (auto [_, ec] = std::from_chars(id_str.data(), id_str.data() + id_str.size(), id);
1227
190
            ec != std::errc {}) [[unlikely]] {
1228
0
            LOG(ERROR) << "invalid resource id format: " << id_str;
1229
190
        } else {
1230
190
            resource.__set_id(id);
1231
190
            resource.__set_version(version);
1232
190
        }
1233
190
    }
1234
133
    request.__isset.resource = true;
1235
1236
133
    bool succ = handle_report(request, cluster_info, "tablet");
1237
133
    report_tablet_total << 1;
1238
133
    if (!succ) [[unlikely]] {
1239
0
        report_tablet_failed << 1;
1240
0
    }
1241
133
}
1242
1243
56
void report_tablet_callback(CloudStorageEngine& engine, const ClusterInfo* cluster_info) {
1244
    // Random sleep 1~5 seconds before doing report.
1245
    // In order to avoid the problem that the FE receives many report requests at the same time
1246
    // and can not be processed.
1247
56
    if (config::report_random_wait) {
1248
56
        random_sleep(5);
1249
56
    }
1250
1251
56
    TReportRequest request;
1252
56
    request.__set_backend(BackendOptions::get_local_backend());
1253
56
    request.__isset.tablets = true;
1254
1255
56
    increase_report_version();
1256
56
    uint64_t report_version;
1257
56
    uint64_t total_num_tablets = 0;
1258
70
    for (int i = 0; i < 5; i++) {
1259
68
        request.tablets.clear();
1260
68
        report_version = s_report_version;
1261
68
        engine.tablet_mgr().build_all_report_tablets_info(&request.tablets, &total_num_tablets);
1262
68
        if (report_version == s_report_version) {
1263
54
            break;
1264
54
        }
1265
68
    }
1266
1267
56
    if (report_version < s_report_version) {
1268
2
        LOG(WARNING) << "report version " << report_version << " change to " << s_report_version;
1269
2
        DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1);
1270
2
        return;
1271
2
    }
1272
1273
54
    request.__set_report_version(report_version);
1274
54
    request.__set_num_tablets(total_num_tablets);
1275
1276
54
    bool succ = handle_report(request, cluster_info, "tablet");
1277
54
    report_tablet_total << 1;
1278
54
    if (!succ) [[unlikely]] {
1279
0
        report_tablet_failed << 1;
1280
0
    }
1281
54
}
1282
1283
16
void upload_callback(StorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) {
1284
16
    const auto& upload_request = req.upload_req;
1285
1286
16
    LOG(INFO) << "get upload task. signature=" << req.signature
1287
16
              << ", job_id=" << upload_request.job_id;
1288
1289
16
    std::map<int64_t, std::vector<std::string>> tablet_files;
1290
16
    std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>(
1291
16
            engine, env, upload_request.job_id, req.signature, upload_request.broker_addr,
1292
16
            upload_request.broker_prop);
1293
16
    SCOPED_ATTACH_TASK(loader->resource_ctx());
1294
16
    Status status =
1295
16
            loader->init(upload_request.__isset.storage_backend ? upload_request.storage_backend
1296
16
                                                                : TStorageBackendType::type::BROKER,
1297
16
                         upload_request.__isset.location ? upload_request.location : "");
1298
16
    if (status.ok()) {
1299
16
        status = loader->upload(upload_request.src_dest_map, &tablet_files);
1300
16
    }
1301
1302
16
    if (!status.ok()) {
1303
0
        LOG_WARNING("failed to upload")
1304
0
                .tag("signature", req.signature)
1305
0
                .tag("job_id", upload_request.job_id)
1306
0
                .error(status);
1307
16
    } else {
1308
16
        LOG_INFO("successfully upload")
1309
16
                .tag("signature", req.signature)
1310
16
                .tag("job_id", upload_request.job_id);
1311
16
    }
1312
1313
16
    TFinishTaskRequest finish_task_request;
1314
16
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1315
16
    finish_task_request.__set_task_type(req.task_type);
1316
16
    finish_task_request.__set_signature(req.signature);
1317
16
    finish_task_request.__set_task_status(status.to_thrift());
1318
16
    finish_task_request.__set_tablet_files(tablet_files);
1319
1320
16
    finish_task(finish_task_request);
1321
16
    remove_task_info(req.task_type, req.signature);
1322
16
}
1323
1324
22
void download_callback(StorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) {
1325
22
    const auto& download_request = req.download_req;
1326
22
    LOG(INFO) << "get download task. signature=" << req.signature
1327
22
              << ", job_id=" << download_request.job_id
1328
22
              << ", task detail: " << apache::thrift::ThriftDebugString(download_request);
1329
1330
    // TODO: download
1331
22
    std::vector<int64_t> downloaded_tablet_ids;
1332
1333
22
    auto status = Status::OK();
1334
22
    if (download_request.__isset.remote_tablet_snapshots) {
1335
0
        std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>(
1336
0
                engine, env, download_request.job_id, req.signature);
1337
0
        SCOPED_ATTACH_TASK(loader->resource_ctx());
1338
0
        status = loader->remote_http_download(download_request.remote_tablet_snapshots,
1339
0
                                              &downloaded_tablet_ids);
1340
22
    } else {
1341
22
        std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>(
1342
22
                engine, env, download_request.job_id, req.signature, download_request.broker_addr,
1343
22
                download_request.broker_prop);
1344
22
        SCOPED_ATTACH_TASK(loader->resource_ctx());
1345
22
        status = loader->init(download_request.__isset.storage_backend
1346
22
                                      ? download_request.storage_backend
1347
22
                                      : TStorageBackendType::type::BROKER,
1348
22
                              download_request.__isset.location ? download_request.location : "");
1349
22
        if (status.ok()) {
1350
22
            status = loader->download(download_request.src_dest_map, &downloaded_tablet_ids);
1351
22
        }
1352
22
    }
1353
1354
22
    if (!status.ok()) {
1355
0
        LOG_WARNING("failed to download")
1356
0
                .tag("signature", req.signature)
1357
0
                .tag("job_id", download_request.job_id)
1358
0
                .error(status);
1359
22
    } else {
1360
22
        LOG_INFO("successfully download")
1361
22
                .tag("signature", req.signature)
1362
22
                .tag("job_id", download_request.job_id);
1363
22
    }
1364
1365
22
    TFinishTaskRequest finish_task_request;
1366
22
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1367
22
    finish_task_request.__set_task_type(req.task_type);
1368
22
    finish_task_request.__set_signature(req.signature);
1369
22
    finish_task_request.__set_task_status(status.to_thrift());
1370
22
    finish_task_request.__set_downloaded_tablet_ids(downloaded_tablet_ids);
1371
1372
22
    finish_task(finish_task_request);
1373
22
    remove_task_info(req.task_type, req.signature);
1374
22
}
1375
1376
0
void download_callback(CloudStorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) {
1377
0
    const auto& download_request = req.download_req;
1378
0
    LOG(INFO) << "get download task. signature=" << req.signature
1379
0
              << ", job_id=" << download_request.job_id
1380
0
              << ", task detail: " << apache::thrift::ThriftDebugString(download_request);
1381
1382
0
    std::vector<int64_t> transferred_tablet_ids;
1383
1384
0
    auto status = Status::OK();
1385
0
    if (download_request.__isset.remote_tablet_snapshots) {
1386
0
        status = Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>(
1387
0
                "remote tablet snapshot is not supported.");
1388
0
    } else {
1389
0
        std::unique_ptr<CloudSnapshotLoader> loader = std::make_unique<CloudSnapshotLoader>(
1390
0
                engine, env, download_request.job_id, req.signature, download_request.broker_addr,
1391
0
                download_request.broker_prop);
1392
0
        SCOPED_ATTACH_TASK(loader->resource_ctx());
1393
0
        status = loader->init(download_request.__isset.storage_backend
1394
0
                                      ? download_request.storage_backend
1395
0
                                      : TStorageBackendType::type::BROKER,
1396
0
                              download_request.__isset.location ? download_request.location : "",
1397
0
                              download_request.vault_id);
1398
0
        if (status.ok()) {
1399
0
            status = loader->download(download_request.src_dest_map, &transferred_tablet_ids);
1400
0
        }
1401
1402
0
        if (!status.ok()) {
1403
0
            LOG_WARNING("failed to download")
1404
0
                    .tag("signature", req.signature)
1405
0
                    .tag("job_id", download_request.job_id)
1406
0
                    .error(status);
1407
0
        } else {
1408
0
            LOG_INFO("successfully download")
1409
0
                    .tag("signature", req.signature)
1410
0
                    .tag("job_id", download_request.job_id);
1411
0
        }
1412
1413
0
        TFinishTaskRequest finish_task_request;
1414
0
        finish_task_request.__set_backend(BackendOptions::get_local_backend());
1415
0
        finish_task_request.__set_task_type(req.task_type);
1416
0
        finish_task_request.__set_signature(req.signature);
1417
0
        finish_task_request.__set_task_status(status.to_thrift());
1418
0
        finish_task_request.__set_downloaded_tablet_ids(transferred_tablet_ids);
1419
1420
0
        finish_task(finish_task_request);
1421
0
        remove_task_info(req.task_type, req.signature);
1422
0
    }
1423
0
}
1424
1425
320
void make_snapshot_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1426
320
    const auto& snapshot_request = req.snapshot_req;
1427
1428
320
    LOG(INFO) << "get snapshot task. signature=" << req.signature;
1429
1430
320
    std::string snapshot_path;
1431
320
    bool allow_incremental_clone = false; // not used
1432
320
    std::vector<std::string> snapshot_files;
1433
320
    Status status = engine.snapshot_mgr()->make_snapshot(snapshot_request, &snapshot_path,
1434
320
                                                         &allow_incremental_clone);
1435
320
    if (status.ok() && snapshot_request.__isset.list_files) {
1436
        // list and save all snapshot files
1437
        // snapshot_path like: data/snapshot/20180417205230.1.86400
1438
        // we need to add subdir: tablet_id/schema_hash/
1439
320
        std::vector<io::FileInfo> files;
1440
320
        bool exists = true;
1441
320
        io::Path path = fmt::format("{}/{}/{}/", snapshot_path, snapshot_request.tablet_id,
1442
320
                                    snapshot_request.schema_hash);
1443
320
        status = io::global_local_filesystem()->list(path, true, &files, &exists);
1444
320
        if (status.ok()) {
1445
340
            for (auto& file : files) {
1446
340
                snapshot_files.push_back(file.file_name);
1447
340
            }
1448
318
        }
1449
320
    }
1450
320
    if (!status.ok()) {
1451
0
        LOG_WARNING("failed to make snapshot")
1452
0
                .tag("signature", req.signature)
1453
0
                .tag("tablet_id", snapshot_request.tablet_id)
1454
0
                .tag("version", snapshot_request.version)
1455
0
                .error(status);
1456
320
    } else {
1457
320
        LOG_INFO("successfully make snapshot")
1458
320
                .tag("signature", req.signature)
1459
320
                .tag("tablet_id", snapshot_request.tablet_id)
1460
320
                .tag("version", snapshot_request.version)
1461
320
                .tag("snapshot_path", snapshot_path);
1462
320
    }
1463
1464
320
    TFinishTaskRequest finish_task_request;
1465
320
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1466
320
    finish_task_request.__set_task_type(req.task_type);
1467
320
    finish_task_request.__set_signature(req.signature);
1468
320
    finish_task_request.__set_snapshot_path(snapshot_path);
1469
320
    finish_task_request.__set_snapshot_files(snapshot_files);
1470
320
    finish_task_request.__set_task_status(status.to_thrift());
1471
1472
320
    finish_task(finish_task_request);
1473
320
    remove_task_info(req.task_type, req.signature);
1474
320
}
1475
1476
320
void release_snapshot_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1477
320
    const auto& release_snapshot_request = req.release_snapshot_req;
1478
1479
320
    LOG(INFO) << "get release snapshot task. signature=" << req.signature;
1480
1481
320
    const std::string& snapshot_path = release_snapshot_request.snapshot_path;
1482
320
    Status status = engine.snapshot_mgr()->release_snapshot(snapshot_path);
1483
320
    if (!status.ok()) {
1484
0
        LOG_WARNING("failed to release snapshot")
1485
0
                .tag("signature", req.signature)
1486
0
                .tag("snapshot_path", snapshot_path)
1487
0
                .error(status);
1488
320
    } else {
1489
320
        LOG_INFO("successfully release snapshot")
1490
320
                .tag("signature", req.signature)
1491
320
                .tag("snapshot_path", snapshot_path);
1492
320
    }
1493
1494
320
    TFinishTaskRequest finish_task_request;
1495
320
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1496
320
    finish_task_request.__set_task_type(req.task_type);
1497
320
    finish_task_request.__set_signature(req.signature);
1498
320
    finish_task_request.__set_task_status(status.to_thrift());
1499
1500
320
    finish_task(finish_task_request);
1501
320
    remove_task_info(req.task_type, req.signature);
1502
320
}
1503
1504
0
void release_snapshot_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
1505
0
    const auto& release_snapshot_request = req.release_snapshot_req;
1506
1507
0
    LOG(INFO) << "get release snapshot task. signature=" << req.signature;
1508
1509
0
    Status status = engine.cloud_snapshot_mgr().release_snapshot(
1510
0
            release_snapshot_request.tablet_id, release_snapshot_request.is_job_completed);
1511
1512
0
    if (!status.ok()) {
1513
0
        LOG_WARNING("failed to release snapshot")
1514
0
                .tag("signature", req.signature)
1515
0
                .tag("tablet_id", release_snapshot_request.tablet_id)
1516
0
                .tag("is_job_completed", release_snapshot_request.is_job_completed)
1517
0
                .error(status);
1518
0
    } else {
1519
0
        LOG_INFO("successfully release snapshot")
1520
0
                .tag("signature", req.signature)
1521
0
                .tag("tablet_id", release_snapshot_request.tablet_id)
1522
0
                .tag("is_job_completed", release_snapshot_request.is_job_completed);
1523
0
    }
1524
1525
0
    TFinishTaskRequest finish_task_request;
1526
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1527
0
    finish_task_request.__set_task_type(req.task_type);
1528
0
    finish_task_request.__set_signature(req.signature);
1529
0
    finish_task_request.__set_task_status(status.to_thrift());
1530
1531
0
    finish_task(finish_task_request);
1532
0
    remove_task_info(req.task_type, req.signature);
1533
0
}
1534
1535
172
void move_dir_callback(StorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) {
1536
172
    const auto& move_dir_req = req.move_dir_req;
1537
1538
172
    LOG(INFO) << "get move dir task. signature=" << req.signature
1539
172
              << ", job_id=" << move_dir_req.job_id;
1540
172
    Status status;
1541
172
    auto tablet = engine.tablet_manager()->get_tablet(move_dir_req.tablet_id);
1542
172
    if (tablet == nullptr) {
1543
0
        status = Status::InvalidArgument("Could not find tablet");
1544
172
    } else {
1545
172
        SnapshotLoader loader(engine, env, move_dir_req.job_id, move_dir_req.tablet_id);
1546
172
        SCOPED_ATTACH_TASK(loader.resource_ctx());
1547
172
        status = loader.move(move_dir_req.src, tablet, true);
1548
172
    }
1549
1550
172
    if (!status.ok()) {
1551
0
        LOG_WARNING("failed to move dir")
1552
0
                .tag("signature", req.signature)
1553
0
                .tag("job_id", move_dir_req.job_id)
1554
0
                .tag("tablet_id", move_dir_req.tablet_id)
1555
0
                .tag("src", move_dir_req.src)
1556
0
                .error(status);
1557
172
    } else {
1558
172
        LOG_INFO("successfully move dir")
1559
172
                .tag("signature", req.signature)
1560
172
                .tag("job_id", move_dir_req.job_id)
1561
172
                .tag("tablet_id", move_dir_req.tablet_id)
1562
172
                .tag("src", move_dir_req.src);
1563
172
    }
1564
1565
172
    TFinishTaskRequest finish_task_request;
1566
172
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1567
172
    finish_task_request.__set_task_type(req.task_type);
1568
172
    finish_task_request.__set_signature(req.signature);
1569
172
    finish_task_request.__set_task_status(status.to_thrift());
1570
1571
172
    finish_task(finish_task_request);
1572
172
    remove_task_info(req.task_type, req.signature);
1573
172
}
1574
1575
0
void move_dir_callback(CloudStorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) {
1576
0
    const auto& move_dir_req = req.move_dir_req;
1577
1578
0
    LOG(INFO) << "get move dir task. signature=" << req.signature
1579
0
              << ", job_id=" << move_dir_req.job_id;
1580
1581
0
    Status status = engine.cloud_snapshot_mgr().commit_snapshot(move_dir_req.tablet_id);
1582
0
    if (!status.ok()) {
1583
0
        LOG_WARNING("failed to move dir")
1584
0
                .tag("signature", req.signature)
1585
0
                .tag("job_id", move_dir_req.job_id)
1586
0
                .tag("tablet_id", move_dir_req.tablet_id)
1587
0
                .error(status);
1588
0
    } else {
1589
0
        LOG_INFO("successfully move dir")
1590
0
                .tag("signature", req.signature)
1591
0
                .tag("job_id", move_dir_req.job_id)
1592
0
                .tag("tablet_id", move_dir_req.tablet_id);
1593
0
    }
1594
1595
0
    TFinishTaskRequest finish_task_request;
1596
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1597
0
    finish_task_request.__set_task_type(req.task_type);
1598
0
    finish_task_request.__set_signature(req.signature);
1599
0
    finish_task_request.__set_task_status(status.to_thrift());
1600
1601
0
    finish_task(finish_task_request);
1602
0
    remove_task_info(req.task_type, req.signature);
1603
0
}
1604
1605
0
void submit_table_compaction_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1606
0
    const auto& compaction_req = req.compaction_req;
1607
1608
0
    LOG(INFO) << "get compaction task. signature=" << req.signature
1609
0
              << ", compaction_type=" << compaction_req.type
1610
0
              << ", tablet_id=" << compaction_req.tablet_id;
1611
1612
0
    CompactionType compaction_type;
1613
0
    if (compaction_req.type == "base") {
1614
0
        compaction_type = CompactionType::BASE_COMPACTION;
1615
0
    } else if (compaction_req.type == "cumulative") {
1616
0
        compaction_type = CompactionType::CUMULATIVE_COMPACTION;
1617
0
    } else if (compaction_req.type == "full") {
1618
0
        compaction_type = CompactionType::FULL_COMPACTION;
1619
0
    } else {
1620
0
        LOG(WARNING) << "unknown compaction type: " << compaction_req.type
1621
0
                     << ", tablet_id=" << compaction_req.tablet_id;
1622
0
        return;
1623
0
    }
1624
1625
0
    auto tablet_ptr = engine.tablet_manager()->get_tablet(compaction_req.tablet_id);
1626
0
    if (tablet_ptr == nullptr) {
1627
0
        LOG(WARNING) << "tablet not found. tablet_id=" << compaction_req.tablet_id;
1628
0
        return;
1629
0
    }
1630
1631
0
    if (compaction_type == CompactionType::FULL_COMPACTION) {
1632
        // Full compaction goes through the dedicated threadpool path (align with
1633
        // compaction_action.cpp _handle_run_compaction). `force=false` keeps the
1634
        // admission under permit limiter, matching the HTTP API default.
1635
0
        tablet_ptr->set_last_full_compaction_schedule_time(UnixMillis());
1636
0
        Status status = engine.submit_compaction_task(tablet_ptr, CompactionType::FULL_COMPACTION,
1637
0
                                                      /*force=*/false, /*eager=*/true,
1638
0
                                                      /*trigger_method=*/1);
1639
0
        if (!status.ok()) {
1640
0
            LOG(WARNING) << "failed to submit full compaction task. tablet_id="
1641
0
                         << tablet_ptr->tablet_id() << ", error=" << status;
1642
0
        }
1643
0
        return;
1644
0
    }
1645
1646
    // base / cumulative
1647
0
    auto* data_dir = tablet_ptr->data_dir();
1648
0
    if (!tablet_ptr->can_do_compaction(data_dir->path_hash(), compaction_type)) {
1649
0
        LOG(WARNING) << "could not do compaction. tablet_id=" << tablet_ptr->tablet_id()
1650
0
                     << ", compaction_type=" << compaction_type;
1651
0
        return;
1652
0
    }
1653
1654
0
    Status status = engine.submit_compaction_task(tablet_ptr, compaction_type, false);
1655
0
    if (!status.ok()) {
1656
0
        LOG(WARNING) << "failed to submit table compaction task. error=" << status;
1657
0
    }
1658
0
}
1659
1660
void cloud_submit_table_compaction_callback(CloudStorageEngine& engine,
1661
3
                                            const TAgentTaskRequest& req) {
1662
3
    const auto& compaction_req = req.compaction_req;
1663
1664
3
    LOG(INFO) << "get cloud compaction task. signature=" << req.signature
1665
3
              << ", compaction_type=" << compaction_req.type
1666
3
              << ", tablet_id=" << compaction_req.tablet_id;
1667
1668
3
    CompactionType compaction_type;
1669
3
    if (compaction_req.type == "base") {
1670
1
        compaction_type = CompactionType::BASE_COMPACTION;
1671
2
    } else if (compaction_req.type == "cumulative") {
1672
1
        compaction_type = CompactionType::CUMULATIVE_COMPACTION;
1673
1
    } else if (compaction_req.type == "full") {
1674
1
        compaction_type = CompactionType::FULL_COMPACTION;
1675
1
    } else {
1676
0
        LOG(WARNING) << "unknown cloud compaction type: " << compaction_req.type
1677
0
                     << ", tablet_id=" << compaction_req.tablet_id;
1678
0
        return;
1679
0
    }
1680
1681
    // Mirror cloud_compaction_action::_handle_run_compaction: base/cumu needs the
1682
    // delete bitmap synced eagerly, full does not (FullCompaction re-syncs itself).
1683
3
    bool sync_delete_bitmap = compaction_type != CompactionType::FULL_COMPACTION;
1684
3
    auto tablet_res = engine.tablet_mgr().get_tablet(compaction_req.tablet_id,
1685
3
                                                     /*warmup_data=*/false, sync_delete_bitmap);
1686
3
    if (!tablet_res.has_value()) {
1687
0
        LOG(WARNING) << "failed to get cloud tablet. tablet_id=" << compaction_req.tablet_id
1688
0
                     << ", error=" << tablet_res.error();
1689
0
        return;
1690
0
    }
1691
3
    CloudTabletSPtr tablet = std::move(tablet_res).value();
1692
3
    if (tablet == nullptr) {
1693
0
        LOG(WARNING) << "cloud tablet not found. tablet_id=" << compaction_req.tablet_id;
1694
0
        return;
1695
0
    }
1696
1697
3
    switch (compaction_type) {
1698
1
    case CompactionType::BASE_COMPACTION:
1699
1
        tablet->set_last_base_compaction_schedule_time(UnixMillis());
1700
1
        break;
1701
1
    case CompactionType::CUMULATIVE_COMPACTION:
1702
1
        tablet->set_last_cumu_compaction_schedule_time(UnixMillis());
1703
1
        break;
1704
1
    case CompactionType::FULL_COMPACTION:
1705
1
        tablet->set_last_full_compaction_schedule_time(UnixMillis());
1706
1
        break;
1707
0
    default:
1708
0
        break;
1709
3
    }
1710
1711
3
    Status status = engine.submit_compaction_task(tablet, compaction_type,
1712
3
                                                  /*trigger_method=*/1);
1713
3
    if (!status.ok()) {
1714
1
        LOG(WARNING) << "failed to submit cloud compaction task. tablet_id=" << tablet->tablet_id()
1715
1
                     << ", type=" << compaction_req.type << ", error=" << status;
1716
1
    }
1717
3
}
1718
1719
namespace {
1720
1721
34
void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr existed_fs) {
1722
34
    Status st;
1723
34
    io::RemoteFileSystemSPtr fs;
1724
1725
34
    if (!existed_fs) {
1726
        // No such FS instance on BE
1727
34
        auto res = io::S3FileSystem::create(S3Conf::get_s3_conf(param.s3_storage_param),
1728
34
                                            std::to_string(param.id));
1729
34
        if (!res.has_value()) {
1730
8
            st = std::move(res).error();
1731
26
        } else {
1732
26
            fs = std::move(res).value();
1733
26
        }
1734
34
    } else {
1735
0
        DCHECK_EQ(existed_fs->type(), io::FileSystemType::S3) << param.id << ' ' << param.name;
1736
0
        auto client = static_cast<io::S3FileSystem*>(existed_fs.get())->client_holder();
1737
0
        auto new_s3_conf = S3Conf::get_s3_conf(param.s3_storage_param);
1738
0
        S3ClientConf conf = std::move(new_s3_conf.client_conf);
1739
0
        st = client->reset(conf);
1740
0
        fs = std::move(existed_fs);
1741
0
    }
1742
1743
34
    if (!st.ok()) {
1744
8
        LOG(WARNING) << "update s3 resource failed: " << st;
1745
26
    } else {
1746
26
        LOG_INFO("successfully update s3 resource")
1747
26
                .tag("resource_id", param.id)
1748
26
                .tag("resource_name", param.name);
1749
26
        put_storage_resource(param.id, {std::move(fs)}, param.version);
1750
26
    }
1751
34
}
1752
1753
38
void update_hdfs_resource(const TStorageResource& param, io::RemoteFileSystemSPtr existed_fs) {
1754
38
    Status st;
1755
38
    io::RemoteFileSystemSPtr fs;
1756
38
    std::string root_path =
1757
38
            param.hdfs_storage_param.__isset.root_path ? param.hdfs_storage_param.root_path : "";
1758
1759
38
    if (!existed_fs) {
1760
        // No such FS instance on BE
1761
38
        auto res = io::HdfsFileSystem::create(
1762
38
                param.hdfs_storage_param, param.hdfs_storage_param.fs_name,
1763
38
                std::to_string(param.id), nullptr, std::move(root_path));
1764
38
        if (!res.has_value()) {
1765
16
            st = std::move(res).error();
1766
22
        } else {
1767
22
            fs = std::move(res).value();
1768
22
        }
1769
1770
38
    } else {
1771
0
        DCHECK_EQ(existed_fs->type(), io::FileSystemType::HDFS) << param.id << ' ' << param.name;
1772
        // TODO(plat1ko): update hdfs conf
1773
0
        fs = std::move(existed_fs);
1774
0
    }
1775
1776
38
    if (!st.ok()) {
1777
16
        LOG(WARNING) << "update hdfs resource failed: " << st;
1778
22
    } else {
1779
22
        LOG_INFO("successfully update hdfs resource")
1780
22
                .tag("resource_id", param.id)
1781
22
                .tag("resource_name", param.name)
1782
22
                .tag("root_path", fs->root_path().string());
1783
22
        put_storage_resource(param.id, {std::move(fs)}, param.version);
1784
22
    }
1785
38
}
1786
1787
} // namespace
1788
1789
10
void push_storage_policy_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1790
10
    const auto& push_storage_policy_req = req.push_storage_policy_req;
1791
    // refresh resource
1792
72
    for (auto&& param : push_storage_policy_req.resource) {
1793
72
        io::RemoteFileSystemSPtr fs;
1794
72
        if (auto existed_resource = get_storage_resource(param.id); existed_resource) {
1795
0
            if (existed_resource->second >= param.version) {
1796
                // Stale request, ignore
1797
0
                continue;
1798
0
            }
1799
1800
0
            fs = std::move(existed_resource->first.fs);
1801
0
        }
1802
1803
72
        if (param.__isset.s3_storage_param) {
1804
34
            update_s3_resource(param, std::move(fs));
1805
38
        } else if (param.__isset.hdfs_storage_param) {
1806
38
            update_hdfs_resource(param, std::move(fs));
1807
38
        } else {
1808
0
            LOG(WARNING) << "unknown resource=" << param;
1809
0
        }
1810
72
    }
1811
    // drop storage policy
1812
10
    for (auto policy_id : push_storage_policy_req.dropped_storage_policy) {
1813
0
        delete_storage_policy(policy_id);
1814
0
    }
1815
    // refresh storage policy
1816
24
    for (auto&& storage_policy : push_storage_policy_req.storage_policy) {
1817
24
        auto existed_storage_policy = get_storage_policy(storage_policy.id);
1818
24
        if (existed_storage_policy == nullptr ||
1819
24
            existed_storage_policy->version < storage_policy.version) {
1820
24
            auto storage_policy1 = std::make_shared<StoragePolicy>();
1821
24
            storage_policy1->name = storage_policy.name;
1822
24
            storage_policy1->version = storage_policy.version;
1823
24
            storage_policy1->cooldown_datetime = storage_policy.cooldown_datetime;
1824
24
            storage_policy1->cooldown_ttl = storage_policy.cooldown_ttl;
1825
24
            storage_policy1->resource_id = storage_policy.resource_id;
1826
24
            LOG_INFO("successfully update storage policy")
1827
24
                    .tag("storage_policy_id", storage_policy.id)
1828
24
                    .tag("storage_policy", storage_policy1->to_string());
1829
24
            put_storage_policy(storage_policy.id, std::move(storage_policy1));
1830
24
        }
1831
24
    }
1832
10
}
1833
1834
15
void push_index_policy_callback(const TAgentTaskRequest& req) {
1835
15
    const auto& request = req.push_index_policy_req;
1836
15
    doris::ExecEnv::GetInstance()->index_policy_mgr()->apply_policy_changes(
1837
15
            request.index_policys, request.dropped_index_policys);
1838
15
}
1839
1840
2
void push_cooldown_conf_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1841
2
    const auto& push_cooldown_conf_req = req.push_cooldown_conf;
1842
326
    for (const auto& cooldown_conf : push_cooldown_conf_req.cooldown_confs) {
1843
326
        int64_t tablet_id = cooldown_conf.tablet_id;
1844
326
        TabletSharedPtr tablet = engine.tablet_manager()->get_tablet(tablet_id);
1845
326
        if (tablet == nullptr) {
1846
0
            LOG(WARNING) << "failed to get tablet. tablet_id=" << tablet_id;
1847
0
            continue;
1848
0
        }
1849
326
        if (tablet->update_cooldown_conf(cooldown_conf.cooldown_term,
1850
326
                                         cooldown_conf.cooldown_replica_id) &&
1851
326
            cooldown_conf.cooldown_replica_id == tablet->replica_id() &&
1852
326
            tablet->tablet_meta()->cooldown_meta_id().initialized()) {
1853
2
            Tablet::async_write_cooldown_meta(tablet);
1854
2
        }
1855
326
    }
1856
2
}
1857
1858
6.60k
void create_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1859
6.60k
    const auto& create_tablet_req = req.create_tablet_req;
1860
6.60k
    RuntimeProfile runtime_profile("CreateTablet");
1861
6.60k
    RuntimeProfile* profile = &runtime_profile;
1862
6.60k
    MonotonicStopWatch watch;
1863
6.60k
    watch.start();
1864
6.60k
    Defer defer = [&] {
1865
6.60k
        auto elapsed_time = static_cast<double>(watch.elapsed_time());
1866
6.60k
        if (elapsed_time / 1e9 > config::agent_task_trace_threshold_sec) {
1867
0
#include "common/compile_check_avoid_begin.h"
1868
0
            COUNTER_UPDATE(profile->total_time_counter(), elapsed_time);
1869
0
#include "common/compile_check_avoid_end.h"
1870
0
            std::stringstream ss;
1871
0
            profile->pretty_print(&ss);
1872
0
            LOG(WARNING) << "create tablet cost(s) " << elapsed_time / 1e9 << std::endl << ss.str();
1873
0
        }
1874
6.60k
    };
1875
6.60k
    DorisMetrics::instance()->create_tablet_requests_total->increment(1);
1876
6.60k
    VLOG_NOTICE << "start to create tablet " << create_tablet_req.tablet_id;
1877
1878
6.60k
    std::vector<TTabletInfo> finish_tablet_infos;
1879
6.60k
    VLOG_NOTICE << "create tablet: " << create_tablet_req;
1880
6.60k
    Status status = engine.create_tablet(create_tablet_req, profile);
1881
6.60k
    if (!status.ok()) {
1882
0
        DorisMetrics::instance()->create_tablet_requests_failed->increment(1);
1883
0
        LOG_WARNING("failed to create tablet, reason={}", status.to_string())
1884
0
                .tag("signature", req.signature)
1885
0
                .tag("tablet_id", create_tablet_req.tablet_id)
1886
0
                .error(status);
1887
6.60k
    } else {
1888
6.60k
        increase_report_version();
1889
        // get path hash of the created tablet
1890
6.60k
        TabletSharedPtr tablet;
1891
6.60k
        {
1892
6.60k
            SCOPED_TIMER(ADD_TIMER(profile, "GetTablet"));
1893
6.60k
            tablet = engine.tablet_manager()->get_tablet(create_tablet_req.tablet_id);
1894
6.60k
        }
1895
6.60k
        DCHECK(tablet != nullptr);
1896
6.60k
        TTabletInfo tablet_info;
1897
6.60k
        tablet_info.tablet_id = tablet->tablet_id();
1898
6.60k
        tablet_info.schema_hash = tablet->schema_hash();
1899
6.60k
        tablet_info.version = create_tablet_req.version;
1900
        // Useless but it is a required field in TTabletInfo
1901
6.60k
        tablet_info.version_hash = 0;
1902
6.60k
        tablet_info.row_count = 0;
1903
6.60k
        tablet_info.data_size = 0;
1904
6.60k
        tablet_info.__set_path_hash(tablet->data_dir()->path_hash());
1905
6.60k
        tablet_info.__set_replica_id(tablet->replica_id());
1906
6.60k
        finish_tablet_infos.push_back(tablet_info);
1907
6.60k
        LOG_INFO("successfully create tablet")
1908
6.60k
                .tag("signature", req.signature)
1909
6.60k
                .tag("tablet_id", create_tablet_req.tablet_id);
1910
6.60k
    }
1911
6.60k
    TFinishTaskRequest finish_task_request;
1912
6.60k
    finish_task_request.__set_finish_tablet_infos(finish_tablet_infos);
1913
6.60k
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1914
6.60k
    finish_task_request.__set_report_version(s_report_version);
1915
6.60k
    finish_task_request.__set_task_type(req.task_type);
1916
6.60k
    finish_task_request.__set_signature(req.signature);
1917
6.60k
    finish_task_request.__set_task_status(status.to_thrift());
1918
6.60k
    finish_task(finish_task_request);
1919
6.60k
    remove_task_info(req.task_type, req.signature);
1920
6.60k
}
1921
1922
4.69k
void drop_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
1923
4.69k
    const auto& drop_tablet_req = req.drop_tablet_req;
1924
4.69k
    Status status;
1925
4.69k
    auto dropped_tablet = engine.tablet_manager()->get_tablet(drop_tablet_req.tablet_id, false);
1926
4.69k
    if (dropped_tablet != nullptr) {
1927
4.66k
        status = engine.tablet_manager()->drop_tablet(drop_tablet_req.tablet_id,
1928
4.66k
                                                      drop_tablet_req.replica_id,
1929
4.66k
                                                      drop_tablet_req.is_drop_table_or_partition);
1930
4.66k
    } else {
1931
26
        status = Status::NotFound("could not find tablet {}", drop_tablet_req.tablet_id);
1932
26
    }
1933
4.69k
    if (status.ok()) {
1934
        // if tablet is dropped by fe, then the related txn should also be removed
1935
4.66k
        engine.txn_manager()->force_rollback_tablet_related_txns(
1936
4.66k
                dropped_tablet->data_dir()->get_meta(), drop_tablet_req.tablet_id,
1937
4.66k
                dropped_tablet->tablet_uid());
1938
4.66k
        LOG_INFO("successfully drop tablet")
1939
4.66k
                .tag("signature", req.signature)
1940
4.66k
                .tag("tablet_id", drop_tablet_req.tablet_id)
1941
4.66k
                .tag("replica_id", drop_tablet_req.replica_id);
1942
4.66k
    } else {
1943
26
        LOG_WARNING("failed to drop tablet")
1944
26
                .tag("signature", req.signature)
1945
26
                .tag("tablet_id", drop_tablet_req.tablet_id)
1946
26
                .tag("replica_id", drop_tablet_req.replica_id)
1947
26
                .error(status);
1948
26
    }
1949
1950
4.69k
    TFinishTaskRequest finish_task_request;
1951
4.69k
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
1952
4.69k
    finish_task_request.__set_task_type(req.task_type);
1953
4.69k
    finish_task_request.__set_signature(req.signature);
1954
4.69k
    finish_task_request.__set_task_status(status.to_thrift());
1955
1956
4.69k
    TTabletInfo tablet_info;
1957
4.69k
    tablet_info.tablet_id = drop_tablet_req.tablet_id;
1958
4.69k
    tablet_info.schema_hash = drop_tablet_req.schema_hash;
1959
4.69k
    tablet_info.version = 0;
1960
    // Useless but it is a required field in TTabletInfo
1961
4.69k
    tablet_info.version_hash = 0;
1962
4.69k
    tablet_info.row_count = 0;
1963
4.69k
    tablet_info.data_size = 0;
1964
1965
4.69k
    finish_task_request.__set_finish_tablet_infos({tablet_info});
1966
4.69k
    LOG_INFO("successfully drop tablet")
1967
4.69k
            .tag("signature", req.signature)
1968
4.69k
            .tag("tablet_id", drop_tablet_req.tablet_id);
1969
1970
4.69k
    finish_task(finish_task_request);
1971
4.69k
    remove_task_info(req.task_type, req.signature);
1972
4.69k
}
1973
1974
0
void drop_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
1975
0
    const auto& drop_tablet_req = req.drop_tablet_req;
1976
    // here drop_tablet_req.tablet_id is the signature of the task, see DropReplicaTask in fe
1977
0
    Defer defer = [&] { remove_task_info(req.task_type, req.signature); };
1978
0
    DBUG_EXECUTE_IF("WorkPoolCloudDropTablet.drop_tablet_callback.failed", {
1979
0
        LOG_WARNING("WorkPoolCloudDropTablet.drop_tablet_callback.failed")
1980
0
                .tag("tablet_id", drop_tablet_req.tablet_id);
1981
0
        return;
1982
0
    });
1983
0
    MonotonicStopWatch watch;
1984
0
    watch.start();
1985
0
    auto weak_tablets = engine.tablet_mgr().get_weak_tablets();
1986
0
    std::ostringstream rowset_ids_stream;
1987
0
    bool found = false;
1988
0
    for (auto& weak_tablet : weak_tablets) {
1989
0
        auto tablet = weak_tablet.lock();
1990
0
        if (tablet == nullptr) {
1991
0
            continue;
1992
0
        }
1993
0
        if (tablet->tablet_id() != drop_tablet_req.tablet_id) {
1994
0
            continue;
1995
0
        }
1996
0
        found = true;
1997
0
        auto clean_rowsets = tablet->get_snapshot_rowset(true);
1998
        // Get first 10 rowset IDs as comma-separated string, just for log
1999
0
        int count = 0;
2000
0
        for (const auto& rowset : clean_rowsets) {
2001
0
            if (count >= 10) break;
2002
0
            if (count > 0) {
2003
0
                rowset_ids_stream << ",";
2004
0
            }
2005
0
            rowset_ids_stream << rowset->rowset_id().to_string();
2006
0
            count++;
2007
0
        }
2008
2009
0
        CloudTablet::recycle_cached_data(clean_rowsets);
2010
0
        break;
2011
0
    }
2012
2013
0
    if (!found) {
2014
0
        LOG(WARNING) << "tablet not found when dropping tablet_id=" << drop_tablet_req.tablet_id
2015
0
                     << ", cost " << static_cast<double>(watch.elapsed_time()) / 1e9 << "(s)";
2016
0
        return;
2017
0
    }
2018
2019
0
    engine.tablet_mgr().erase_tablet(drop_tablet_req.tablet_id);
2020
0
    LOG(INFO) << "drop cloud tablet_id=" << drop_tablet_req.tablet_id
2021
0
              << " and clean file cache first 10 rowsets {" << rowset_ids_stream.str() << "}, cost "
2022
0
              << static_cast<double>(watch.elapsed_time()) / 1e9 << "(s)";
2023
0
}
2024
2025
22
void push_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
2026
22
    const auto& push_req = req.push_req;
2027
2028
22
    LOG(INFO) << "get push task. signature=" << req.signature
2029
22
              << " push_type=" << push_req.push_type;
2030
22
    std::vector<TTabletInfo> tablet_infos;
2031
2032
    // exist a path task_worker_pool <- agent_server <- backend_service <- BackendService
2033
    // use the arg BackendService_submit_tasks_args.tasks is not const
2034
    // and push_req will be modify, so modify is ok
2035
22
    EngineBatchLoadTask engine_task(engine, const_cast<TPushReq&>(push_req), &tablet_infos);
2036
22
    SCOPED_ATTACH_TASK(engine_task.mem_tracker());
2037
22
    auto status = engine_task.execute();
2038
2039
    // Return result to fe
2040
22
    TFinishTaskRequest finish_task_request;
2041
22
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
2042
22
    finish_task_request.__set_task_type(req.task_type);
2043
22
    finish_task_request.__set_signature(req.signature);
2044
22
    if (push_req.push_type == TPushType::DELETE) {
2045
22
        finish_task_request.__set_request_version(push_req.version);
2046
22
    }
2047
2048
22
    if (status.ok()) {
2049
22
        LOG_INFO("successfully execute push task")
2050
22
                .tag("signature", req.signature)
2051
22
                .tag("tablet_id", push_req.tablet_id)
2052
22
                .tag("push_type", push_req.push_type);
2053
22
        increase_report_version();
2054
22
        finish_task_request.__set_finish_tablet_infos(tablet_infos);
2055
22
    } else {
2056
0
        LOG_WARNING("failed to execute push task")
2057
0
                .tag("signature", req.signature)
2058
0
                .tag("tablet_id", push_req.tablet_id)
2059
0
                .tag("push_type", push_req.push_type)
2060
0
                .error(status);
2061
0
    }
2062
22
    finish_task_request.__set_task_status(status.to_thrift());
2063
22
    finish_task_request.__set_report_version(s_report_version);
2064
2065
22
    finish_task(finish_task_request);
2066
22
    remove_task_info(req.task_type, req.signature);
2067
22
}
2068
2069
3.21k
void cloud_push_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
2070
3.21k
    const auto& push_req = req.push_req;
2071
2072
3.21k
    LOG(INFO) << "get push task. signature=" << req.signature
2073
3.21k
              << " push_type=" << push_req.push_type;
2074
2075
    // Return result to fe
2076
3.21k
    TFinishTaskRequest finish_task_request;
2077
3.21k
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
2078
3.21k
    finish_task_request.__set_task_type(req.task_type);
2079
3.21k
    finish_task_request.__set_signature(req.signature);
2080
2081
    // Only support DELETE in cloud mode now
2082
3.21k
    if (push_req.push_type != TPushType::DELETE) {
2083
0
        finish_task_request.__set_task_status(
2084
0
                Status::NotSupported("push_type {} not is supported",
2085
0
                                     std::to_string(push_req.push_type))
2086
0
                        .to_thrift());
2087
0
        return;
2088
0
    }
2089
2090
3.21k
    finish_task_request.__set_request_version(push_req.version);
2091
2092
3.21k
    DorisMetrics::instance()->delete_requests_total->increment(1);
2093
3.21k
    auto st = CloudDeleteTask::execute(engine, req.push_req);
2094
3.21k
    if (st.ok()) {
2095
3.21k
        LOG_INFO("successfully execute push task")
2096
3.21k
                .tag("signature", req.signature)
2097
3.21k
                .tag("tablet_id", push_req.tablet_id)
2098
3.21k
                .tag("push_type", push_req.push_type);
2099
3.21k
        increase_report_version();
2100
3.21k
        auto& tablet_info = finish_task_request.finish_tablet_infos.emplace_back();
2101
        // Just need tablet_id
2102
3.21k
        tablet_info.tablet_id = push_req.tablet_id;
2103
3.21k
        finish_task_request.__isset.finish_tablet_infos = true;
2104
3.21k
    } else {
2105
4
        DorisMetrics::instance()->delete_requests_failed->increment(1);
2106
4
        LOG_WARNING("failed to execute push task")
2107
4
                .tag("signature", req.signature)
2108
4
                .tag("tablet_id", push_req.tablet_id)
2109
4
                .tag("push_type", push_req.push_type)
2110
4
                .error(st);
2111
4
    }
2112
2113
3.21k
    finish_task_request.__set_task_status(st.to_thrift());
2114
3.21k
    finish_task_request.__set_report_version(s_report_version);
2115
2116
3.21k
    finish_task(finish_task_request);
2117
3.21k
    remove_task_info(req.task_type, req.signature);
2118
3.21k
}
2119
2120
PublishVersionWorkerPool::PublishVersionWorkerPool(StorageEngine& engine)
2121
6
        : TaskWorkerPool("PUBLISH_VERSION", config::publish_version_worker_count,
2122
2.63k
                         [this](const TAgentTaskRequest& task) { publish_version_callback(task); }),
2123
6
          _engine(engine) {}
2124
2125
PublishVersionWorkerPool::~PublishVersionWorkerPool() = default;
2126
2127
2.63k
void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest& req) {
2128
2.63k
    const auto& publish_version_req = req.publish_version_req;
2129
2.63k
    DorisMetrics::instance()->publish_task_request_total->increment(1);
2130
2.63k
    VLOG_NOTICE << "get publish version task. signature=" << req.signature;
2131
2132
2.63k
    std::set<TTabletId> error_tablet_ids;
2133
2.63k
    std::map<TTabletId, TVersion> succ_tablets;
2134
    // partition_id, tablet_id, publish_version, commit_tso
2135
2.63k
    std::vector<DiscontinuousVersionTablet> discontinuous_version_tablets;
2136
2.63k
    std::map<TTableId, std::map<TTabletId, int64_t>> table_id_to_tablet_id_to_num_delta_rows;
2137
2.63k
    uint32_t retry_time = 0;
2138
2.63k
    Status status;
2139
2.63k
    constexpr uint32_t PUBLISH_VERSION_MAX_RETRY = 3;
2140
2.63k
    while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
2141
2.63k
        succ_tablets.clear();
2142
2.63k
        error_tablet_ids.clear();
2143
2.63k
        table_id_to_tablet_id_to_num_delta_rows.clear();
2144
2.63k
        EnginePublishVersionTask engine_task(_engine, publish_version_req, &error_tablet_ids,
2145
2.63k
                                             &succ_tablets, &discontinuous_version_tablets,
2146
2.63k
                                             &table_id_to_tablet_id_to_num_delta_rows);
2147
2.63k
        SCOPED_ATTACH_TASK(engine_task.mem_tracker());
2148
2.63k
        status = engine_task.execute();
2149
2.63k
        if (status.ok()) {
2150
2.63k
            break;
2151
2.63k
        }
2152
2153
4
        if (status.is<PUBLISH_VERSION_NOT_CONTINUOUS>()) {
2154
            // there are too many missing versions, it has been be added to async
2155
            // publish task, so no need to retry here.
2156
4
            if (discontinuous_version_tablets.empty()) {
2157
0
                break;
2158
0
            }
2159
4
            LOG_EVERY_SECOND(INFO) << "wait for previous publish version task to be done, "
2160
2
                                   << "transaction_id: " << publish_version_req.transaction_id;
2161
2162
4
            int64_t time_elapsed = time(nullptr) - req.recv_time;
2163
4
            if (time_elapsed > config::publish_version_task_timeout_s) {
2164
0
                LOG(INFO) << "task elapsed " << time_elapsed
2165
0
                          << " seconds since it is inserted to queue, it is timeout";
2166
0
                break;
2167
0
            }
2168
2169
            // Version not continuous, put to queue and wait pre version publish task execute
2170
4
            PUBLISH_VERSION_count << 1;
2171
4
            auto st = _thread_pool->submit_func([this, req] {
2172
4
                this->publish_version_callback(req);
2173
4
                PUBLISH_VERSION_count << -1;
2174
4
            });
2175
4
            if (!st.ok()) [[unlikely]] {
2176
0
                PUBLISH_VERSION_count << -1;
2177
0
                status = std::move(st);
2178
4
            } else {
2179
4
                return;
2180
4
            }
2181
4
        }
2182
2183
0
        LOG_WARNING("failed to publish version")
2184
0
                .tag("transaction_id", publish_version_req.transaction_id)
2185
0
                .tag("error_tablets_num", error_tablet_ids.size())
2186
0
                .tag("retry_time", retry_time)
2187
0
                .error(status);
2188
0
        ++retry_time;
2189
0
    }
2190
2191
2.63k
    for (auto& item : discontinuous_version_tablets) {
2192
0
        _engine.add_async_publish_task(item.partition_id, item.tablet_id, item.publish_version,
2193
0
                                       publish_version_req.transaction_id, false, item.commit_tso);
2194
0
    }
2195
2.63k
    TFinishTaskRequest finish_task_request;
2196
2.63k
    if (!status.ok()) [[unlikely]] {
2197
0
        DorisMetrics::instance()->publish_task_failed_total->increment(1);
2198
        // if publish failed, return failed, FE will ignore this error and
2199
        // check error tablet ids and FE will also republish this task
2200
0
        LOG_WARNING("failed to publish version")
2201
0
                .tag("signature", req.signature)
2202
0
                .tag("transaction_id", publish_version_req.transaction_id)
2203
0
                .tag("error_tablets_num", error_tablet_ids.size())
2204
0
                .error(status);
2205
2.63k
    } else {
2206
2.63k
        if (!config::disable_auto_compaction &&
2207
2.63k
            (!config::enable_compaction_pause_on_high_memory ||
2208
2.63k
             !GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE))) {
2209
16.0k
            for (auto [tablet_id, _] : succ_tablets) {
2210
16.0k
                TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id);
2211
16.0k
                if (tablet != nullptr) {
2212
16.0k
                    if (!tablet->tablet_meta()->tablet_schema()->disable_auto_compaction()) {
2213
16.0k
                        tablet->published_count.fetch_add(1);
2214
16.0k
                        int64_t published_count = tablet->published_count.load();
2215
16.0k
                        int32_t max_version_config = tablet->max_version_config();
2216
16.0k
                        if (tablet->exceed_version_limit(
2217
16.0k
                                    max_version_config *
2218
16.0k
                                    config::load_trigger_compaction_version_percent / 100) &&
2219
16.0k
                            published_count % 20 == 0) {
2220
0
                            auto st = _engine.submit_compaction_task(
2221
0
                                    tablet, CompactionType::CUMULATIVE_COMPACTION, true, false);
2222
0
                            if (!st.ok()) [[unlikely]] {
2223
0
                                LOG(WARNING) << "trigger compaction failed, tablet_id=" << tablet_id
2224
0
                                             << ", published=" << published_count << " : " << st;
2225
0
                            } else {
2226
0
                                LOG(INFO) << "trigger compaction succ, tablet_id:" << tablet_id
2227
0
                                          << ", published:" << published_count;
2228
0
                            }
2229
0
                        }
2230
16.0k
                    }
2231
16.0k
                } else {
2232
0
                    LOG(WARNING) << "trigger compaction failed, tablet_id:" << tablet_id;
2233
0
                }
2234
16.0k
            }
2235
2.63k
        }
2236
2.63k
        int64_t cost_second = time(nullptr) - req.recv_time;
2237
2.63k
        g_publish_version_latency << cost_second;
2238
2.63k
        LOG_INFO("successfully publish version")
2239
2.63k
                .tag("signature", req.signature)
2240
2.63k
                .tag("transaction_id", publish_version_req.transaction_id)
2241
2.63k
                .tag("tablets_num", succ_tablets.size())
2242
2.63k
                .tag("cost(s)", cost_second);
2243
2.63k
    }
2244
2245
2.63k
    status.to_thrift(&finish_task_request.task_status);
2246
2.63k
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
2247
2.63k
    finish_task_request.__set_task_type(req.task_type);
2248
2.63k
    finish_task_request.__set_signature(req.signature);
2249
2.63k
    finish_task_request.__set_report_version(s_report_version);
2250
2.63k
    finish_task_request.__set_succ_tablets(succ_tablets);
2251
2.63k
    finish_task_request.__set_error_tablet_ids(
2252
2.63k
            std::vector<TTabletId>(error_tablet_ids.begin(), error_tablet_ids.end()));
2253
2.63k
    finish_task_request.__set_table_id_to_tablet_id_to_delta_num_rows(
2254
2.63k
            table_id_to_tablet_id_to_num_delta_rows);
2255
2.63k
    finish_task(finish_task_request);
2256
2.63k
    remove_task_info(req.task_type, req.signature);
2257
2.63k
}
2258
2259
20
void clear_transaction_task_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
2260
20
    const auto& clear_transaction_task_req = req.clear_transaction_task_req;
2261
20
    LOG(INFO) << "get clear transaction task. signature=" << req.signature
2262
20
              << ", transaction_id=" << clear_transaction_task_req.transaction_id
2263
20
              << ", partition_id_size=" << clear_transaction_task_req.partition_id.size();
2264
2265
20
    Status status;
2266
2267
20
    if (clear_transaction_task_req.transaction_id > 0) {
2268
        // transaction_id should be greater than zero.
2269
        // If it is not greater than zero, no need to execute
2270
        // the following clear_transaction_task() function.
2271
20
        if (!clear_transaction_task_req.partition_id.empty()) {
2272
2
            engine.clear_transaction_task(clear_transaction_task_req.transaction_id,
2273
2
                                          clear_transaction_task_req.partition_id);
2274
18
        } else {
2275
18
            engine.clear_transaction_task(clear_transaction_task_req.transaction_id);
2276
18
        }
2277
20
        LOG(INFO) << "finish to clear transaction task. signature=" << req.signature
2278
20
                  << ", transaction_id=" << clear_transaction_task_req.transaction_id;
2279
20
    } else {
2280
0
        LOG(WARNING) << "invalid transaction id " << clear_transaction_task_req.transaction_id
2281
0
                     << ". signature= " << req.signature;
2282
0
    }
2283
2284
20
    TFinishTaskRequest finish_task_request;
2285
20
    finish_task_request.__set_task_status(status.to_thrift());
2286
20
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
2287
20
    finish_task_request.__set_task_type(req.task_type);
2288
20
    finish_task_request.__set_signature(req.signature);
2289
2290
20
    finish_task(finish_task_request);
2291
20
    remove_task_info(req.task_type, req.signature);
2292
20
}
2293
2294
0
void alter_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
2295
0
    int64_t signature = req.signature;
2296
0
    LOG(INFO) << "get alter table task, signature: " << signature;
2297
0
    bool is_task_timeout = false;
2298
0
    if (req.__isset.recv_time) {
2299
0
        int64_t time_elapsed = time(nullptr) - req.recv_time;
2300
0
        if (time_elapsed > config::report_task_interval_seconds * 20) {
2301
0
            LOG(INFO) << "task elapsed " << time_elapsed
2302
0
                      << " seconds since it is inserted to queue, it is timeout";
2303
0
            is_task_timeout = true;
2304
0
        }
2305
0
    }
2306
0
    if (!is_task_timeout) {
2307
0
        TFinishTaskRequest finish_task_request;
2308
0
        TTaskType::type task_type = req.task_type;
2309
0
        alter_tablet(engine, req, signature, task_type, &finish_task_request);
2310
0
        finish_task(finish_task_request);
2311
0
    }
2312
0
    doris::g_fragment_executing_count << -1;
2313
0
    int64_t now = duration_cast<std::chrono::milliseconds>(
2314
0
                          std::chrono::system_clock::now().time_since_epoch())
2315
0
                          .count();
2316
0
    g_fragment_last_active_time.set_value(now);
2317
0
    remove_task_info(req.task_type, req.signature);
2318
0
}
2319
2320
10.6k
void alter_cloud_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
2321
10.6k
    int64_t signature = req.signature;
2322
10.6k
    LOG(INFO) << "get alter table task, signature: " << signature;
2323
10.6k
    bool is_task_timeout = false;
2324
10.6k
    if (req.__isset.recv_time) {
2325
10.6k
        int64_t time_elapsed = time(nullptr) - req.recv_time;
2326
10.6k
        if (time_elapsed > config::report_task_interval_seconds * 20) {
2327
0
            LOG(INFO) << "task elapsed " << time_elapsed
2328
0
                      << " seconds since it is inserted to queue, it is timeout";
2329
0
            is_task_timeout = true;
2330
0
        }
2331
10.6k
    }
2332
10.6k
    if (!is_task_timeout) {
2333
10.6k
        TFinishTaskRequest finish_task_request;
2334
10.6k
        TTaskType::type task_type = req.task_type;
2335
10.6k
        alter_cloud_tablet(engine, req, signature, task_type, &finish_task_request);
2336
10.6k
        finish_task(finish_task_request);
2337
10.6k
    }
2338
10.6k
    doris::g_fragment_executing_count << -1;
2339
10.6k
    int64_t now = duration_cast<std::chrono::milliseconds>(
2340
10.6k
                          std::chrono::system_clock::now().time_since_epoch())
2341
10.6k
                          .count();
2342
10.6k
    g_fragment_last_active_time.set_value(now);
2343
2344
    // Clean up alter_version before remove_task_info to avoid race:
2345
    // remove_task_info allows same-signature re-submit, whose pre_submit_callback
2346
    // would set alter_version, then this cleanup would wipe it.
2347
10.6k
    if (req.__isset.alter_tablet_req_v2) {
2348
10.6k
        const auto& alter_req = req.alter_tablet_req_v2;
2349
10.6k
        auto new_tablet = engine.tablet_mgr().get_tablet(alter_req.new_tablet_id);
2350
10.6k
        auto base_tablet = engine.tablet_mgr().get_tablet(alter_req.base_tablet_id);
2351
10.6k
        if (new_tablet.has_value()) {
2352
10.6k
            new_tablet.value()->set_alter_version(-1);
2353
10.6k
        }
2354
10.6k
        if (base_tablet.has_value()) {
2355
10.6k
            base_tablet.value()->set_alter_version(-1);
2356
10.6k
        }
2357
10.6k
    }
2358
2359
10.6k
    remove_task_info(req.task_type, req.signature);
2360
10.6k
}
2361
2362
10.6k
void set_alter_version_before_enqueue(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
2363
10.6k
    if (!req.__isset.alter_tablet_req_v2) {
2364
0
        return;
2365
0
    }
2366
10.6k
    const auto& alter_req = req.alter_tablet_req_v2;
2367
10.6k
    if (alter_req.alter_version <= 1) {
2368
5.43k
        return;
2369
5.43k
    }
2370
5.24k
    auto new_tablet = engine.tablet_mgr().get_tablet(alter_req.new_tablet_id);
2371
5.24k
    if (!new_tablet.has_value() || new_tablet.value()->tablet_state() == TABLET_RUNNING) {
2372
0
        return;
2373
0
    }
2374
5.24k
    auto base_tablet = engine.tablet_mgr().get_tablet(alter_req.base_tablet_id);
2375
5.24k
    if (!base_tablet.has_value()) {
2376
0
        return;
2377
0
    }
2378
5.24k
    new_tablet.value()->set_alter_version(alter_req.alter_version);
2379
5.24k
    base_tablet.value()->set_alter_version(alter_req.alter_version);
2380
5.24k
    LOG(INFO) << "set alter_version=" << alter_req.alter_version
2381
5.24k
              << " before enqueue, base_tablet=" << alter_req.base_tablet_id
2382
5.24k
              << ", new_tablet=" << alter_req.new_tablet_id;
2383
5.24k
}
2384
2385
0
void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
2386
0
    std::unordered_map<int64_t, int64_t> gc_tablet_infos;
2387
0
    if (!req.__isset.gc_binlog_req) {
2388
0
        LOG(WARNING) << "gc binlog task is not valid";
2389
0
        return;
2390
0
    }
2391
0
    if (!req.gc_binlog_req.__isset.tablet_gc_binlog_infos) {
2392
0
        LOG(WARNING) << "gc binlog task tablet_gc_binlog_infos is not valid";
2393
0
        return;
2394
0
    }
2395
2396
0
    const auto& tablet_gc_binlog_infos = req.gc_binlog_req.tablet_gc_binlog_infos;
2397
0
    for (auto&& tablet_info : tablet_gc_binlog_infos) {
2398
        // gc_tablet_infos.emplace(tablet_info.tablet_id, tablet_info.schema_hash);
2399
0
        gc_tablet_infos.emplace(tablet_info.tablet_id, tablet_info.version);
2400
0
    }
2401
2402
0
    engine.gc_binlogs(gc_tablet_infos);
2403
0
}
2404
2405
2.54k
void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
2406
2.54k
    const TVisibleVersionReq& visible_version_req = req.visible_version_req;
2407
2.54k
    engine.tablet_manager()->update_partitions_visible_version(
2408
2.54k
            visible_version_req.partition_version);
2409
2.54k
}
2410
2411
void clone_callback(StorageEngine& engine, const ClusterInfo* cluster_info,
2412
0
                    const TAgentTaskRequest& req) {
2413
0
    const auto& clone_req = req.clone_req;
2414
2415
0
    DorisMetrics::instance()->clone_requests_total->increment(1);
2416
0
    LOG(INFO) << "get clone task. signature=" << req.signature;
2417
2418
0
    std::vector<TTabletInfo> tablet_infos;
2419
0
    EngineCloneTask engine_task(engine, clone_req, cluster_info, req.signature, &tablet_infos);
2420
0
    SCOPED_ATTACH_TASK(engine_task.mem_tracker());
2421
0
    auto status = engine_task.execute();
2422
    // Return result to fe
2423
0
    TFinishTaskRequest finish_task_request;
2424
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
2425
0
    finish_task_request.__set_task_type(req.task_type);
2426
0
    finish_task_request.__set_signature(req.signature);
2427
0
    finish_task_request.__set_task_status(status.to_thrift());
2428
2429
0
    if (!status.ok()) {
2430
0
        DorisMetrics::instance()->clone_requests_failed->increment(1);
2431
0
        LOG_WARNING("failed to clone tablet")
2432
0
                .tag("signature", req.signature)
2433
0
                .tag("tablet_id", clone_req.tablet_id)
2434
0
                .error(status);
2435
0
    } else {
2436
0
        LOG_INFO("successfully clone tablet")
2437
0
                .tag("signature", req.signature)
2438
0
                .tag("tablet_id", clone_req.tablet_id)
2439
0
                .tag("copy_size", engine_task.get_copy_size())
2440
0
                .tag("copy_time_ms", engine_task.get_copy_time_ms());
2441
2442
0
        if (engine_task.is_new_tablet()) {
2443
0
            increase_report_version();
2444
0
            finish_task_request.__set_report_version(s_report_version);
2445
0
        }
2446
0
        finish_task_request.__set_finish_tablet_infos(tablet_infos);
2447
0
        finish_task_request.__set_copy_size(engine_task.get_copy_size());
2448
0
        finish_task_request.__set_copy_time_ms(engine_task.get_copy_time_ms());
2449
0
    }
2450
2451
0
    finish_task(finish_task_request);
2452
0
    remove_task_info(req.task_type, req.signature);
2453
0
}
2454
2455
0
void storage_medium_migrate_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
2456
0
    const auto& storage_medium_migrate_req = req.storage_medium_migrate_req;
2457
2458
    // check request and get info
2459
0
    TabletSharedPtr tablet;
2460
0
    DataDir* dest_store = nullptr;
2461
2462
0
    auto status = check_migrate_request(engine, storage_medium_migrate_req, tablet, &dest_store);
2463
0
    if (status.ok()) {
2464
0
        EngineStorageMigrationTask engine_task(engine, tablet, dest_store);
2465
0
        SCOPED_ATTACH_TASK(engine_task.mem_tracker());
2466
0
        status = engine_task.execute();
2467
0
    }
2468
    // fe should ignore this err
2469
0
    if (status.is<FILE_ALREADY_EXIST>()) {
2470
0
        status = Status::OK();
2471
0
    }
2472
0
    if (!status.ok()) {
2473
0
        LOG_WARNING("failed to migrate storage medium")
2474
0
                .tag("signature", req.signature)
2475
0
                .tag("tablet_id", storage_medium_migrate_req.tablet_id)
2476
0
                .error(status);
2477
0
    } else {
2478
0
        LOG_INFO("successfully migrate storage medium")
2479
0
                .tag("signature", req.signature)
2480
0
                .tag("tablet_id", storage_medium_migrate_req.tablet_id);
2481
0
    }
2482
2483
0
    TFinishTaskRequest finish_task_request;
2484
0
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
2485
0
    finish_task_request.__set_task_type(req.task_type);
2486
0
    finish_task_request.__set_signature(req.signature);
2487
0
    finish_task_request.__set_task_status(status.to_thrift());
2488
2489
0
    finish_task(finish_task_request);
2490
0
    remove_task_info(req.task_type, req.signature);
2491
0
}
2492
2493
8.45k
void calc_delete_bitmap_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) {
2494
8.45k
    std::vector<TTabletId> error_tablet_ids;
2495
8.45k
    std::vector<TTabletId> succ_tablet_ids;
2496
8.45k
    Status status;
2497
8.45k
    error_tablet_ids.clear();
2498
8.45k
    const auto& calc_delete_bitmap_req = req.calc_delete_bitmap_req;
2499
8.45k
    CloudEngineCalcDeleteBitmapTask engine_task(engine, calc_delete_bitmap_req, &error_tablet_ids,
2500
8.45k
                                                &succ_tablet_ids);
2501
8.45k
    SCOPED_ATTACH_TASK(engine_task.mem_tracker());
2502
8.45k
    if (req.signature != calc_delete_bitmap_req.transaction_id) {
2503
        // transaction_id may not be the same as req.signature, so add a log here
2504
0
        LOG_INFO("begin to execute calc delete bitmap task")
2505
0
                .tag("signature", req.signature)
2506
0
                .tag("transaction_id", calc_delete_bitmap_req.transaction_id);
2507
0
    }
2508
8.45k
    status = engine_task.execute();
2509
2510
8.45k
    TFinishTaskRequest finish_task_request;
2511
8.45k
    if (!status) {
2512
0
        DorisMetrics::instance()->publish_task_failed_total->increment(1);
2513
0
        LOG_WARNING("failed to calculate delete bitmap")
2514
0
                .tag("signature", req.signature)
2515
0
                .tag("transaction_id", calc_delete_bitmap_req.transaction_id)
2516
0
                .tag("error_tablets_num", error_tablet_ids.size())
2517
0
                .error(status);
2518
0
    }
2519
2520
8.45k
    status.to_thrift(&finish_task_request.task_status);
2521
8.45k
    finish_task_request.__set_backend(BackendOptions::get_local_backend());
2522
8.45k
    finish_task_request.__set_task_type(req.task_type);
2523
8.45k
    finish_task_request.__set_signature(req.signature);
2524
8.45k
    finish_task_request.__set_report_version(s_report_version);
2525
8.45k
    finish_task_request.__set_error_tablet_ids(error_tablet_ids);
2526
8.45k
    finish_task_request.__set_resp_partitions(calc_delete_bitmap_req.partitions);
2527
2528
8.45k
    finish_task(finish_task_request);
2529
8.45k
    remove_task_info(req.task_type, req.signature);
2530
8.45k
}
2531
2532
void make_cloud_committed_rs_visible_callback(CloudStorageEngine& engine,
2533
26.8k
                                              const TAgentTaskRequest& req) {
2534
26.8k
    if (!config::enable_cloud_make_rs_visible_on_be) {
2535
0
        return;
2536
0
    }
2537
26.8k
    LOG(INFO) << "begin to make cloud tmp rs visible, txn_id="
2538
26.8k
              << req.make_cloud_tmp_rs_visible_req.txn_id
2539
26.8k
              << ", tablet_count=" << req.make_cloud_tmp_rs_visible_req.tablet_ids.size();
2540
2541
26.8k
    const auto& make_visible_req = req.make_cloud_tmp_rs_visible_req;
2542
26.8k
    auto& tablet_mgr = engine.tablet_mgr();
2543
2544
26.8k
    int64_t txn_id = make_visible_req.txn_id;
2545
26.8k
    int64_t version_update_time_ms = make_visible_req.__isset.version_update_time_ms
2546
26.8k
                                             ? make_visible_req.version_update_time_ms
2547
26.8k
                                             : 0;
2548
2549
    // Process each tablet involved in this transaction on this BE
2550
172k
    for (int64_t tablet_id : make_visible_req.tablet_ids) {
2551
172k
        auto tablet_result =
2552
172k
                tablet_mgr.get_tablet(tablet_id, /* warmup_data */ false,
2553
172k
                                      /* sync_delete_bitmap */ false,
2554
172k
                                      /* sync_stats */ nullptr, /* force_use_only_cached */ true,
2555
172k
                                      /* cache_on_miss */ false);
2556
172k
        if (!tablet_result.has_value()) {
2557
0
            continue;
2558
0
        }
2559
172k
        auto cloud_tablet = tablet_result.value();
2560
2561
172k
        int64_t partition_id = cloud_tablet->partition_id();
2562
172k
        auto version_iter = make_visible_req.partition_version_map.find(partition_id);
2563
172k
        if (version_iter == make_visible_req.partition_version_map.end()) {
2564
65
            continue;
2565
65
        }
2566
172k
        int64_t visible_version = version_iter->second;
2567
172k
        DBUG_EXECUTE_IF("make_cloud_committed_rs_visible_callback.block", {
2568
172k
            auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
2569
172k
            auto target_table_id = dp->param<int64_t>("table_id", -1);
2570
172k
            auto version = dp->param<int64_t>("version", -1);
2571
172k
            if ((target_tablet_id == tablet_id || target_table_id == cloud_tablet->table_id()) &&
2572
172k
                version == visible_version) {
2573
172k
                DBUG_BLOCK
2574
172k
            }
2575
172k
        });
2576
172k
        cloud_tablet->try_make_committed_rs_visible(txn_id, visible_version,
2577
172k
                                                    version_update_time_ms);
2578
172k
    }
2579
26.8k
    LOG(INFO) << "make cloud tmp rs visible finished, txn_id=" << txn_id
2580
26.8k
              << ", processed_tablets=" << make_visible_req.tablet_ids.size();
2581
26.8k
}
2582
2583
0
void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
2584
0
    LOG(INFO) << "clean trash start";
2585
0
    DBUG_EXECUTE_IF("clean_trash_callback_sleep", { sleep(100); })
2586
0
    static_cast<void>(engine.start_trash_sweep(nullptr, true));
2587
0
    static_cast<void>(engine.notify_listener("REPORT_DISK_STATE"));
2588
0
    LOG(INFO) << "clean trash finish";
2589
0
}
2590
2591
4
void clean_udf_cache_callback(const TAgentTaskRequest& req) {
2592
4
    const auto& clean_req = req.clean_udf_cache_req;
2593
2594
4
    if (doris::config::enable_java_support) {
2595
4
        static_cast<void>(Jni::Util::clean_udf_class_load_cache(clean_req.function_signature));
2596
4
    }
2597
2598
4
    if (clean_req.__isset.function_id && clean_req.function_id > 0) {
2599
0
        UserFunctionCache::instance()->drop_function_cache(clean_req.function_id);
2600
0
        PythonServerManager::instance().clear_udaf_state_cache(clean_req.function_id);
2601
0
    }
2602
2603
4
    LOG(INFO) << "clean udf cache finish: function_signature=" << clean_req.function_signature;
2604
4
}
2605
2606
1.17k
void report_index_policy_callback(const ClusterInfo* cluster_info) {
2607
1.17k
    TReportRequest request;
2608
1.17k
    auto& index_policy_list = request.index_policy;
2609
1.17k
    const auto& policys = doris::ExecEnv::GetInstance()->index_policy_mgr()->get_index_policys();
2610
20.7k
    for (const auto& policy : policys) {
2611
20.7k
        index_policy_list.emplace_back(policy.second);
2612
20.7k
    }
2613
1.17k
    request.__isset.index_policy = true;
2614
1.17k
    request.__set_backend(BackendOptions::get_local_backend());
2615
1.17k
    bool succ = handle_report(request, cluster_info, "index_policy");
2616
1.17k
    report_index_policy_total << 1;
2617
1.17k
    if (!succ) [[unlikely]] {
2618
0
        report_index_policy_failed << 1;
2619
0
    }
2620
1.17k
}
2621
2622
} // namespace doris